# Build Title Index (No Stemming)

**Important: DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!**

This notebook builds an inverted index for Wikipedia article **titles** WITHOUT stemming.

This index is used for the `search_title` endpoint which:
- Returns ALL search results containing query words in titles
- Orders by NUMBER OF DISTINCT QUERY WORDS in the title
- Does NOT use stemming
- Uses the staff-provided tokenizer to tokenize and remove stopwords

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
# Verify cluster is running
!gcloud dataproc clusters list --region us-central1

## Imports & Setup

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
# Verify graphframes jar is available
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *
spark

In [None]:
# Bucket name - CHANGE THIS TO YOUR BUCKET
bucket_name = 'db204905756'
full_path = f"gs://{bucket_name}/"
paths = []

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if "parquet" in b.name:
        paths.append(full_path + b.name)

print(f"Found {len(paths)} parquet files")

## Load Data

In [None]:
parquetFile = spark.read.parquet(*paths)
doc_title_pairs = parquetFile.select("title", "id").rdd

In [None]:
# Count number of wiki pages - should be more than 6M
print(f"Total documents: {parquetFile.count()}")

## Create Modified Inverted Index Module (for title_nostem folder)

We need to modify the inverted index module to save postings to a different folder (`postings_title_nostem/`) instead of the default `postings_gcp/`.

In [None]:
%%writefile /home/dataproc/inverted_index_title_nostem.py
import pyspark
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
from time import time
from pathlib import Path
import pickle
from google.cloud import storage
from collections import defaultdict
from contextlib import closing

# Block size for posting files
BLOCK_SIZE = 1999998

# Folder name for this index's postings
POSTINGS_FOLDER = "postings_title_nostem"

class MultiFileWriter:
    """ Sequential binary writer to multiple files of up to BLOCK_SIZE each. """
    def __init__(self, base_dir, name, bucket_name):
        self._base_dir = Path(base_dir)
        self._name = name
        self._file_gen = (open(self._base_dir / f'{name}_{i:03}.bin', 'wb') 
                          for i in itertools.count())
        self._f = next(self._file_gen)
        # Connecting to google storage bucket. 
        self.client = storage.Client()
        self.bucket = self.client.bucket(bucket_name)
        
    
    def write(self, b):
        locs = []
        while len(b) > 0:
            pos = self._f.tell()
            remaining = BLOCK_SIZE - pos
        # if the current file is full, close and open a new one.
            if remaining == 0:  
                self._f.close()
                self.upload_to_gcp()                
                self._f = next(self._file_gen)
                pos, remaining = 0, BLOCK_SIZE
            self._f.write(b[:remaining])
            locs.append((self._f.name, pos))
            b = b[remaining:]
        return locs

    def close(self):
        self._f.close()
    
    def upload_to_gcp(self):
        '''
            The function saves the posting files into the right bucket in google storage.
        '''
        file_name = self._f.name
        blob = self.bucket.blob(f"{POSTINGS_FOLDER}/{file_name}")
        blob.upload_from_filename(file_name)

        

class MultiFileReader:
    """ Sequential binary reader of multiple files of up to BLOCK_SIZE each. """
    def __init__(self):
        self._open_files = {}

    def read(self, locs, n_bytes):
        b = []
        for f_name, offset in locs:
            if f_name not in self._open_files:
                self._open_files[f_name] = open(f_name, 'rb')
            f = self._open_files[f_name]
            f.seek(offset)
            n_read = min(n_bytes, BLOCK_SIZE - offset)
            b.append(f.read(n_read))
            n_bytes -= n_read
        return b''.join(b)
  
    def close(self):
        for f in self._open_files.values():
            f.close()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()
        return False 


TUPLE_SIZE = 6       # We're going to pack the doc_id and tf values in this many bytes.
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer


class InvertedIndex:  
    def __init__(self, docs={}):
        """ Initializes the inverted index and add documents to it (if provided).
        Parameters:
        -----------
          docs: dict mapping doc_id to list of tokens
        """
        # stores document frequency per term
        self.df = Counter()
        # stores total frequency per term
        self.term_total = Counter()
        # stores posting list per term while building the index (internally), 
        # otherwise too big to store in memory.
        self._posting_list = defaultdict(list)
        # mapping a term to posting file locations
        self.posting_locs = defaultdict(list)

        for doc_id, tokens in docs.items():
            self.add_doc(doc_id, tokens)

    def add_doc(self, doc_id, tokens):
        """ Adds a document to the index with a given `doc_id` and tokens. """
        w2cnt = Counter(tokens)
        self.term_total.update(w2cnt)
        for w, cnt in w2cnt.items():
            self.df[w] = self.df.get(w, 0) + 1
            self._posting_list[w].append((doc_id, cnt))

    def write_index(self, base_dir, name):
        """ Write the in-memory index to disk. """
        self._write_globals(base_dir, name)

    def _write_globals(self, base_dir, name):
        with open(Path(base_dir) / f'{name}.pkl', 'wb') as f:
            pickle.dump(self, f)

    def __getstate__(self):
        """ Modify how the object is pickled by removing the internal posting lists. """
        state = self.__dict__.copy()
        del state['_posting_list']
        return state

    def posting_lists_iter(self):
        """ A generator that reads one posting list from disk and yields 
            a (word:str, [(doc_id:int, tf:int), ...]) tuple.
        """
        with closing(MultiFileReader()) as reader:
            for w, locs in self.posting_locs.items():
                b = reader.read(locs[0], self.df[w] * TUPLE_SIZE)
                posting_list = []
                for i in range(self.df[w]):
                    doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
                    tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
                    posting_list.append((doc_id, tf))
                yield w, posting_list

    @staticmethod
    def read_index(base_dir, name):
        with open(Path(base_dir) / f'{name}.pkl', 'rb') as f:
            return pickle.load(f)

    @staticmethod
    def delete_index(base_dir, name):
        path_globals = Path(base_dir) / f'{name}.pkl'
        path_globals.unlink()
        for p in Path(base_dir).rglob(f'{name}_*.bin'):
            p.unlink()


    @staticmethod
    def write_a_posting_list(b_w_pl, bucket_name):
        posting_locs = defaultdict(list)
        bucket_id, list_w_pl = b_w_pl
        
        with closing(MultiFileWriter(".", bucket_id, bucket_name)) as writer:
            for w, pl in list_w_pl: 
                # convert to bytes
                b = b''.join([(doc_id << 16 | (tf & TF_MASK)).to_bytes(TUPLE_SIZE, 'big')
                              for doc_id, tf in pl])
                # write to file(s)
                locs = writer.write(b)
                # save file locations to index
                posting_locs[w].extend(locs)
            writer.upload_to_gcp() 
            InvertedIndex._upload_posting_locs(bucket_id, posting_locs, bucket_name)
        return bucket_id

    
    @staticmethod
    def _upload_posting_locs(bucket_id, posting_locs, bucket_name):
        with open(f"{bucket_id}_posting_locs.pickle", "wb") as f:
            pickle.dump(posting_locs, f)
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        blob_posting_locs = bucket.blob(f"{POSTINGS_FOLDER}/{bucket_id}_posting_locs.pickle")
        blob_posting_locs.upload_from_filename(f"{bucket_id}_posting_locs.pickle")

In [None]:
# Verify the file was created
!ls -l /home/dataproc/inverted_index_title_nostem.py

In [None]:
# Add the modified module to the cluster
sc.addFile("/home/dataproc/inverted_index_title_nostem.py")
sys.path.insert(0, SparkFiles.getRootDirectory())
from inverted_index_title_nostem import InvertedIndex

## Define Tokenization (NO STEMMING)

Following the assignment requirements:
- Use the staff-provided tokenizer regex
- Remove stopwords
- **DO NOT use stemming**

In [None]:
# Stopwords setup
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also"', 'links', 'extrenal',
                 'first', 'see', 'new', 'two', 'list', 'may', 'one', 'district',
                 'including', 'became', 'however', 'com', 'many', 'began',
                 'make', 'made', 'part', 'would', 'people', 'second', 'also',
                 'following', 'history', 'thumb', 'external']

all_stopwords = english_stopwords.union(corpus_stopwords)

# Staff-provided tokenizer regex
RE_WORD = re.compile(r"""[\#\@\w](['\'\-]?\w){2,24}""", re.UNICODE)

# Number of buckets for partitioning
NUM_BUCKETS = 124

def token2bucket_id(token):
    return int(_hash(token), 16) % NUM_BUCKETS

In [None]:
def tokenize_no_stem(text):
    """
    Tokenize text WITHOUT stemming.
    - Uses staff-provided regex tokenizer
    - Converts to lowercase
    - Removes stopwords
    - Does NOT apply stemming
    """
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    # Filter out stopwords but DO NOT stem
    filtered = [token for token in tokens if token not in all_stopwords]
    return filtered


def word_count_no_stem(text, doc_id):
    """
    Count word occurrences in a document (title) without stemming.
    Returns list of (token, (doc_id, count)) tuples.
    """
    tokens = tokenize_no_stem(text)
    token_counts = Counter(tokens)
    return [(token, (doc_id, count)) for token, count in token_counts.items()]


def reduce_word_counts(unsorted_pl):
    """Sort posting list by doc_id."""
    return sorted(unsorted_pl, key=lambda x: x[0])


def calculate_df(postings):
    """Calculate document frequency for each term."""
    return postings.map(lambda token: (token[0], len(token[1])))


def partition_postings_and_write(postings, base_dir):
    """
    Partition posting lists by bucket and write to GCS.
    """
    bucket_rdd = postings.map(lambda x: (token2bucket_id(x[0]), x)).groupByKey()

    def write_bucket(b_w_pl):
        bucket_id, word_posting_pairs = b_w_pl
        return InvertedIndex.write_a_posting_list((bucket_id, list(word_posting_pairs)), bucket_name)

    posting_locs_list = bucket_rdd.map(write_bucket)
    return posting_locs_list

## Build the Index

In [None]:
# Time the index creation
start_time = time()

# Word counts map - using NO STEMMING
# doc_title_pairs has format: Row(title='...', id=...)
word_counts = doc_title_pairs.flatMap(lambda x: word_count_no_stem(x[0], x[1]))

# Group by word and create posting lists
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

# Calculate document frequencies
w2df = calculate_df(postings)
w2df_dict = w2df.collectAsMap()

print(f"Number of unique terms: {len(w2df_dict)}")
print(f"Time for word counting and DF calculation: {time() - start_time:.2f} seconds")

In [None]:
# Partition posting lists and write to GCS
write_start = time()
_ = partition_postings_and_write(postings, "title_nostem").collect()
print(f"Time for writing postings: {time() - write_start:.2f} seconds")

In [None]:
# Collect all posting list locations into one dictionary
super_posting_locs = defaultdict(list)

for blob in client.list_blobs(bucket_name, prefix='postings_title_nostem'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)

print(f"Collected posting locations for {len(super_posting_locs)} terms")

## Save the Index

In [None]:
# Create inverted index instance
inverted = InvertedIndex()

# Add posting locations and document frequencies
inverted.posting_locs = super_posting_locs
inverted.df = w2df_dict

# Write the index locally
inverted.write_index('.', 'index')

# Upload to GCS - to a NEW directory 'title_nostem'
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/title_nostem/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
# Verify the upload
!gsutil ls -lh $index_dst

## Save Document Lengths (for potential TF-IDF use)

In [None]:
def len_tokens_no_stem(doc_id, text):
    """
    Calculate the number of tokens in a title (no stemming).
    """
    tokens = tokenize_no_stem(text)
    return (doc_id, len(tokens))

# Calculate document lengths
# Note: doc_title_pairs has Row(title, id) format
title_len_pairs = doc_title_pairs.map(lambda x: len_tokens_no_stem(x[1], x[0]))
title_doc_lengths_dict = title_len_pairs.collectAsMap()

print(f"Calculated lengths for {len(title_doc_lengths_dict)} documents")

In [None]:
# Save document lengths
with open('title_doc_lengths.pickle', 'wb') as f:
    pickle.dump(title_doc_lengths_dict, f)

src = "title_doc_lengths.pickle"
dest = f'gs://{bucket_name}/title_nostem/{src}'
!gsutil cp $src $dest

In [None]:
# Verify
!gsutil ls -lh gs://{bucket_name}/title_nostem/

## Summary

This notebook created the following files in GCS:

1. **`gs://{bucket_name}/title_nostem/index.pkl`** - The inverted index containing:
   - `posting_locs`: Dictionary mapping terms to their posting list file locations
   - `df`: Document frequency dictionary

2. **`gs://{bucket_name}/title_nostem/title_doc_lengths.pickle`** - Document lengths dictionary

3. **`gs://{bucket_name}/postings_title_nostem/`** - Posting list binary files (124 buckets)

### Key Differences from Stemmed Index:
- **NO stemming** applied to tokens
- Stored in separate directory (`title_nostem/` instead of `title_stemmed/`)
- Posting files in `postings_title_nostem/` instead of `postings_gcp/`

### Usage in search_title:
When using this index for the `search_title` endpoint:
1. Tokenize query using `tokenize_no_stem()` (same function)
2. Look up each query term in the index
3. For each document, count the number of DISTINCT query terms that appear
4. Return ALL matching documents, ordered by distinct query term count (descending)

In [None]:
# Final timing
print(f"Total index creation time: {time() - start_time:.2f} seconds")
print(f"Total unique terms: {len(w2df_dict)}")
print(f"Total documents: {len(title_doc_lengths_dict)}")