In [6]:
import pyspark
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
from time import time
from pathlib import Path
import pickle
from google.cloud import storage
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
import math

BUCKET_NAME = 'ir-wiki-hadar' 

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]
all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
from inverted_index_gcp import InvertedIndex

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
26/01/08 15:42:42 WARN org.apache.spark.SparkContext: The path /home/dataproc/inverted_index_gcp.py has been added already. Overwriting of added paths is not supported in the current version.


In [7]:
full_path = "gs://ir-wiki-hadar/multistream*_preprocessed.parquet"

parquetFile = spark.read.parquet(full_path)


id_title_pairs = parquetFile.select("id", "title").rdd
id_to_title_dict = id_title_pairs.collectAsMap()

with open('id_to_title.pkl', 'wb') as f:
    pickle.dump(id_to_title_dict, f)
    
client = storage.Client()
bucket = client.bucket(BUCKET_NAME)
blob = bucket.blob('id_to_title.pkl')
blob.upload_from_filename('id_to_title.pkl')

print("Saved id_to_title.pkl to bucket!")

                                                                                

Saved id_to_title.pkl to bucket!


In [8]:
import math

def calculate_norm(text):
    tokens = [m.group() for m in RE_WORD.finditer(text.lower())]
    tokens = [t for t in tokens if t not in all_stopwords]
    return math.sqrt(len(tokens))

doc_norms_rdd = parquetFile.select("id", "text").rdd.map(lambda x: (x.id, calculate_norm(x.text)))
doc_norms_dict = doc_norms_rdd.collectAsMap()

with open('doc_norms.pkl', 'wb') as f:
    pickle.dump(doc_norms_dict, f)

blob = bucket.blob('doc_norms.pkl')
blob.upload_from_filename('doc_norms.pkl')

print("Saved doc_norms.pkl to bucket!")

                                                                                

Saved doc_norms.pkl to bucket!


In [9]:
pr_csv_path = "gs://ir-wiki-hadar/pr/*" 
pr_df = spark.read.csv(pr_csv_path)

pr_rdd = pr_df.rdd.map(lambda x: (int(x[0]), float(x[1])))
pr_dict = pr_rdd.collectAsMap()

with open('page_rank.pkl', 'wb') as f:
    pickle.dump(pr_dict, f)

blob = bucket.blob('page_rank.pkl')
blob.upload_from_filename('page_rank.pkl')

print("Saved page_rank.pkl to bucket!")

                                                                                

Saved page_rank.pkl to bucket!


In [12]:
import os
from contextlib import closing
import itertools
from pathlib import Path
import pickle
from collections import defaultdict
import hashlib

BLOCK_SIZE = 1999998
TUPLE_SIZE = 6
TF_MASK = 2 ** 16 - 1

def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

def get_bucket(bucket_name):
    return storage.Client().bucket(bucket_name)

class MultiFileWriter:
    def __init__(self, base_dir, name, bucket_name=None):
        self._base_dir = Path(base_dir)
        self._name = name
        self._bucket = None if bucket_name is None else get_bucket(bucket_name)
        self._file_gen = itertools.count()
        self._f = None
        self._current_part = -1
        self._next_file()

    def _next_file(self):
        if self._f:
            self._f.close()
            self._upload_file(self._f.name)
        
        self._current_part = next(self._file_gen)
        fname = f'{self._name}_{self._current_part:03}.bin'
        self._f = open(fname, 'wb') 
        self._current_blob_name = str(self._base_dir / fname)

    def _upload_file(self, local_filename):
        if self._bucket:
            blob = self._bucket.blob(self._current_blob_name)
            blob.upload_from_filename(local_filename)
        if os.path.exists(local_filename):
            os.remove(local_filename)

    def write(self, b):
        locs = []
        while len(b) > 0:
            pos = self._f.tell()
            remaining = BLOCK_SIZE - pos
            if remaining == 0:
                self._next_file()
                pos, remaining = 0, BLOCK_SIZE
            self._f.write(b[:remaining])
            locs.append((self._current_blob_name, pos))
            b = b[remaining:]
        return locs

    def close(self):
        if self._f:
            self._f.close()
            self._upload_file(self._f.name)

def write_a_posting_list(b_w_pl, base_dir, bucket_name=None):
    posting_locs = defaultdict(list)
    bucket_id, list_w_pl = b_w_pl
    
    with closing(MultiFileWriter(base_dir, bucket_id, bucket_name)) as writer:
        for w, pl in list_w_pl: 
            b = b''.join([(doc_id << 16 | (tf & TF_MASK)).to_bytes(TUPLE_SIZE, 'big')
                          for doc_id, tf in pl])
            locs = writer.write(b)
            posting_locs[w].extend(locs)
    
    path = str(Path(base_dir) / f'{bucket_id}_posting_locs.pickle')
    local_pickle_path = f'{bucket_id}_posting_locs.pickle'
    with open(local_pickle_path, 'wb') as f:
        pickle.dump(posting_locs, f)
    
    if bucket_name:
        bucket = get_bucket(bucket_name)
        blob = bucket.blob(path)
        blob.upload_from_filename(local_pickle_path)
        if os.path.exists(local_pickle_path):
            os.remove(local_pickle_path)
            
    return bucket_id

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

def word_count_title(doc_id, text):
    tokens = [m.group() for m in RE_WORD.finditer(text.lower())]
    tokens = [t for t in tokens if t not in all_stopwords]
    return [(token, (doc_id, tf)) for token, tf in Counter(tokens).items()]

def reduce_word_counts(unsorted_pl):
    return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
    return postings.mapValues(lambda posting_list: len(posting_list))

def partition_postings_and_write(postings, base_name):
    bucketed = postings.map(lambda x: (token2bucket_id(x[0]), (x[0], x[1])))
    buckets = bucketed.groupByKey()
    def write_bucket(bucket):
        bucket_id, token_postings = bucket
        return write_a_posting_list((bucket_id, list(token_postings)), base_name, BUCKET_NAME)
    return buckets.map(write_bucket)

print("Starting Title Indexing (Patched Version)...")
title_counts = parquetFile.select("id", "title").rdd.flatMap(lambda x: word_count_title(x.id, x.title))
title_postings = title_counts.groupByKey().mapValues(reduce_word_counts)

w2df_title = calculate_df(title_postings)
w2df_title_dict = w2df_title.collectAsMap()

_ = partition_postings_and_write(title_postings, "title_index").collect()

index_title = InvertedIndex()
index_title.df = w2df_title_dict
index_title.write_index('.', 'index_title')

!gsutil cp index_title.pkl gs://{BUCKET_NAME}/title_index/index.pkl

print("Title Index Done!")

Starting Title Indexing (Patched Version)...


                                                                                

Copying file://index_title.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 23.0 MiB/ 23.0 MiB]                                                
Operation completed over 1 objects/23.0 MiB.                                     
Title Index Done!


In [13]:

print("Starting Anchor Indexing...")

anchor_docs = parquetFile.select("id", "anchor_text").rdd.flatMap(lambda x: 
    [(link['id'], link['text']) for link in x.anchor_text if link['id'] is not None and link['text'] is not None]
)

anchor_text_grouped = anchor_docs.reduceByKey(lambda x,y: x + " " + y)


anchor_counts = anchor_text_grouped.flatMap(lambda x: word_count_title(x[0], x[1]))
anchor_postings = anchor_counts.groupByKey().mapValues(reduce_word_counts)

_ = partition_postings_and_write(anchor_postings, "anchor_index").collect()

w2df_anchor = calculate_df(anchor_postings)
w2df_anchor_dict = w2df_anchor.collectAsMap()

index_anchor = InvertedIndex()
index_anchor.df = w2df_anchor_dict
index_anchor.write_index('.', 'index_anchor')

!gsutil cp index_anchor.pkl gs://{BUCKET_NAME}/anchor_index/index.pkl

print("Anchor Index Done!")

Starting Anchor Indexing...


                                                                                

Copying file://index_anchor.pkl [Content-Type=application/octet-stream]...
- [1 files][ 32.4 MiB/ 32.4 MiB]                                                
Operation completed over 1 objects/32.4 MiB.                                     
Anchor Index Done!


In [17]:
!gsutil ls -lh gs://ir-wiki-hadar/**

  1.91 MiB  2026-01-08T16:23:07Z  gs://ir-wiki-hadar/anchor_index/0_000.bin
  1.91 MiB  2026-01-08T16:23:07Z  gs://ir-wiki-hadar/anchor_index/0_001.bin
  1.91 MiB  2026-01-08T16:23:08Z  gs://ir-wiki-hadar/anchor_index/0_002.bin
  1.67 MiB  2026-01-08T16:23:08Z  gs://ir-wiki-hadar/anchor_index/0_003.bin
498.05 KiB  2026-01-08T16:23:08Z  gs://ir-wiki-hadar/anchor_index/0_posting_locs.pickle
  1.91 MiB  2026-01-08T16:23:31Z  gs://ir-wiki-hadar/anchor_index/100_000.bin
  1.91 MiB  2026-01-08T16:23:31Z  gs://ir-wiki-hadar/anchor_index/100_001.bin
  1.91 MiB  2026-01-08T16:23:31Z  gs://ir-wiki-hadar/anchor_index/100_002.bin
  1.91 MiB  2026-01-08T16:23:32Z  gs://ir-wiki-hadar/anchor_index/100_003.bin
  1.17 MiB  2026-01-08T16:23:32Z  gs://ir-wiki-hadar/anchor_index/100_004.bin
509.62 KiB  2026-01-08T16:23:32Z  gs://ir-wiki-hadar/anchor_index/100_posting_locs.pickle
  1.91 MiB  2026-01-08T16:23:31Z  gs://ir-wiki-hadar/anchor_index/101_000.bin
  1.91 MiB  2026-01-08T16:23:31Z  gs:/