# Wikipedia Index Builder
This notebook processes 60 Parquet files from GCS and builds the inverted index for search.

In [None]:
# Install required packages
!pip install -q google-cloud-storage==1.43.0
!pip install -q pyarrow pandas

: 

In [None]:
# Download NLTK stopwords
import nltk
nltk.download('stopwords')

In [None]:
# Authenticate for GCS access
from google.colab import auth
auth.authenticate_user()

In [None]:
# Upload inverted_index_gcp.py to Colab
# Make sure to update PROJECT_ID in inverted_index_gcp.py before uploading
from google.colab import files
uploaded = files.upload()
print("Please upload inverted_index_gcp.py")

In [None]:
import pandas as pd
import pickle
import re
from collections import Counter, defaultdict
from google.cloud import storage
from nltk.corpus import stopwords
from inverted_index_gcp import InvertedIndex, MultiFileWriter
from pathlib import Path
import numpy as np
from contextlib import closing
import time

# Configuration
BUCKET_NAME = '208894444'
NUM_PARQUET_FILES = 60
OUTPUT_DIR = './index_output'

# Create output directory
!mkdir -p {OUTPUT_DIR}

In [None]:
# Tokenization setup (must match search_frontend.py)
RE_WORD = re.compile(r"""[\#\@\w](['\/\-]?\w){2,24}""", re.UNICODE)
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                   "may", "first", "see", "history", "people", "one", "two",
                   "part", "thumb", "including", "second", "following", 
                   "many", "however", "would", "became"]
all_stopwords = english_stopwords.union(corpus_stopwords)

def tokenize(text):
    """Tokenize text and remove stopwords."""
    if not text:
        return []
    tokens = [token.group().lower() for token in RE_WORD.finditer(text)]
    return [token for token in tokens if token not in all_stopwords]

# Test tokenization
print("Test tokenization:", tokenize("The quick brown fox jumps over the lazy dog"))

In [None]:
# Initialize GCS client
client = storage.Client()
bucket = client.bucket(BUCKET_NAME)

# List all parquet files in the bucket
blobs = list(bucket.list_blobs())
parquet_files = [blob.name for blob in blobs if blob.name.endswith('.parquet')]
print(f"Found {len(parquet_files)} parquet files in bucket {BUCKET_NAME}")
print("First few files:", parquet_files[:5])

In [None]:
# Main indexing loop
print("Starting indexing process...")
print("=" * 50)

# Initialize data structures
index_body = InvertedIndex()
doc_lengths = {}  # wiki_id -> document length
titles = {}       # wiki_id -> title
pagerank = {}     # wiki_id -> pagerank score (placeholder)

total_docs = 0
total_tokens = 0
start_time = time.time()

# Process each Parquet file
for i, parquet_file in enumerate(parquet_files[:NUM_PARQUET_FILES], 1):
    print(f"\n[{i}/{NUM_PARQUET_FILES}] Processing: {parquet_file}")
    
    try:
        # Download and read parquet file
        blob = bucket.blob(parquet_file)
        parquet_path = f'/tmp/{parquet_file.replace("/", "_")}'
        blob.download_to_filename(parquet_path)
        df = pd.read_parquet(parquet_path)
        
        print(f"  Loaded {len(df)} documents")
        
        # Process each document
        for idx, row in df.iterrows():
            # Extract fields
            wiki_id = int(row.get('id', row.get('wiki_id', idx)))
            title = str(row.get('title', ''))
            text = str(row.get('text', ''))
            
            # Tokenize text
            tokens = tokenize(text)
            
            if len(tokens) > 0:
                # Add to inverted index
                index_body.add_doc(wiki_id, tokens)
                
                # Store document length
                doc_lengths[wiki_id] = len(tokens)
                
                # Store title
                titles[wiki_id] = title
                
                # Placeholder PageRank (uniform distribution)
                pagerank[wiki_id] = 1.0
                
                total_docs += 1
                total_tokens += len(tokens)
        
        print(f"  Total docs so far: {total_docs:,}")
        
    except Exception as e:
        print(f"  ERROR processing {parquet_file}: {e}")
        continue

elapsed = time.time() - start_time
print("\n" + "=" * 50)
print(f"Indexing complete!")
print(f"Total documents: {total_docs:,}")
print(f"Total tokens: {total_tokens:,}")
print(f"Unique terms: {len(index_body.df):,}")
print(f"Time elapsed: {elapsed:.2f} seconds")

In [None]:
# Calculate BM25 statistics
N = total_docs
avg_dl = total_tokens / total_docs if total_docs > 0 else 0

bm25_data = {
    'doc_lengths': doc_lengths,
    'avg_dl': avg_dl,
    'N': N
}

print(f"BM25 Statistics:")
print(f"  N (total docs): {N:,}")
print(f"  Average doc length: {avg_dl:.2f} tokens")
print(f"  Min doc length: {min(doc_lengths.values())}")
print(f"  Max doc length: {max(doc_lengths.values())}")

In [None]:
# Write posting lists to binary files
print("\nWriting posting lists to binary files...")

# Group posting lists into buckets for parallel writing
from itertools import islice

def grouper(iterable, n, fillvalue=None):
    """Collect data into fixed-length chunks."""
    args = [iter(iterable)] * n
    return zip(*args)

# Get all posting lists
posting_lists = list(index_body._posting_list.items())
print(f"Total posting lists to write: {len(posting_lists):,}")

# Write posting lists using the static method
bucket_data = (0, posting_lists)
InvertedIndex.write_a_posting_list(bucket_data, OUTPUT_DIR, BUCKET_NAME)

# Load the posting_locs that were just written
posting_locs_path = f'{OUTPUT_DIR}/0_posting_locs.pickle'
with open(posting_locs_path, 'rb') as f:
    index_body.posting_locs = pickle.load(f)

print(f"Posting lists written successfully!")

In [None]:
# Save index_body.pkl
print("\nSaving index_body.pkl...")
index_body.write_index(OUTPUT_DIR, 'index_body', BUCKET_NAME)
print("✓ index_body.pkl saved")

In [None]:
# Save bm25_data.pkl
print("\nSaving bm25_data.pkl...")
bm25_path = f'{OUTPUT_DIR}/bm25_data.pkl'
with open(bm25_path, 'wb') as f:
    pickle.dump(bm25_data, f)

# Upload to GCS
blob = bucket.blob('bm25_data.pkl')
blob.upload_from_filename(bm25_path)
print("✓ bm25_data.pkl saved and uploaded")

In [None]:
# Save titles.pkl
print("\nSaving titles.pkl...")
titles_path = f'{OUTPUT_DIR}/titles.pkl'
with open(titles_path, 'wb') as f:
    pickle.dump(titles, f)

# Upload to GCS
blob = bucket.blob('titles.pkl')
blob.upload_from_filename(titles_path)
print(f"✓ titles.pkl saved and uploaded ({len(titles):,} entries)")

In [None]:
# Save pagerank.pkl (placeholder)
print("\nSaving pagerank.pkl (placeholder)...")
pagerank_path = f'{OUTPUT_DIR}/pagerank.pkl'
with open(pagerank_path, 'wb') as f:
    pickle.dump(pagerank, f)

# Upload to GCS
blob = bucket.blob('pagerank.pkl')
blob.upload_from_filename(pagerank_path)
print(f"✓ pagerank.pkl saved and uploaded ({len(pagerank):,} entries)")

In [None]:
# Summary of all saved files
print("\n" + "=" * 50)
print("INDEX BUILDING COMPLETE!")
print("=" * 50)
print("\nFiles saved to GCS bucket:", BUCKET_NAME)
print("  ✓ index_body.pkl")
print("  ✓ index_body_000.bin (and other .bin files)")
print("  ✓ bm25_data.pkl")
print("  ✓ titles.pkl")
print("  ✓ pagerank.pkl")
print("\nStatistics:")
print(f"  Total documents: {N:,}")
print(f"  Unique terms: {len(index_body.df):,}")
print(f"  Average doc length: {avg_dl:.2f} tokens")
print("\nYou can now use these files in search_frontend.py!")

In [None]:
# Validation: Test loading the index
print("\nValidation: Testing index loading...")
test_index = InvertedIndex.read_index(OUTPUT_DIR, 'index_body', BUCKET_NAME)
print(f"✓ Index loaded successfully")
print(f"  Terms in index: {len(test_index.df):,}")
print(f"  Sample term: {list(test_index.df.keys())[0]}")
print(f"  Sample df: {list(test_index.df.values())[0]}")