***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [3]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-e3dd  GCE       4                                             RUNNING  us-central1-a


# Imports & Setup

In [1]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes
!pip install -q gensim

[0m

In [49]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from gensim.parsing.porter import PorterStemmer
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
from math import log

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  9 12:06 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
spark

In [7]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'bgu-ir-ass3-fab-stem' 
full_path = f"gs://{bucket_name}/"

client = storage.Client()

In [9]:
paths=[]

blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.startswith('multistream'):
        paths.append(full_path+b.name)

***GCP setup is complete!*** If you got here without any errors you've earned 10 out of the 35 points of this part.

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [10]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd

                                                                                

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [11]:
# Count number of wiki pages
N = parquetFile.count()

                                                                                

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [12]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [13]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [14]:
from inverted_index_gcp import InvertedIndex

**YOUR TASK (10 POINTS)**: Use your implementation of `word_count`, `reduce_word_counts`, `calculate_df`, and `partition_postings_and_write` functions from Colab to build an inverted index for all of English Wikipedia in under 2 hours.

A few notes: 
1. The number of corpus stopwords below is a bit bigger than the colab version since we are working on the whole corpus and not just on one file.
2. You need to slightly modify your implementation of  `partition_postings_and_write` because the signature of `InvertedIndex.write_a_posting_list` has changed and now includes an additional argument called `bucket_name` for the target bucket. See the module for more details.
3. You are not allowed to change any of the code not coming from Colab. 

In [15]:
# stemmer
stemmer = PorterStemmer()



english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

In [16]:
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS  # if we plan on running this again, create seperate funcs for text, title

def word_count(text, id):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    Parameters:
    -----------
    text: str
      Text of one document
    id: int
      Document id
    Returns:
    --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
    '''
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    tokens = [stemmer.stem(token) for token in tokens if token not in all_stopwords]
    word_count = Counter(tokens)
    return [(token, (id, tf)) for token, tf in word_count.items()]

def reduce_word_counts(unsorted_pl):
    ''' Returns a sorted posting list by wiki_id.
    Parameters:
    -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
    Returns:
    --------
    list of tuples
      A sorted posting list.
    '''
    # YOUR CODE HERE
    return sorted(unsorted_pl, key=itemgetter(0))

def partition_postings_and_write(postings, base_dir):
    ''' A function that partitions the posting lists into buckets, writes out
    all posting lists in a bucket to disk, and returns the posting locations for
    each bucket. Partitioning should be done through the use of `token2bucket`
    above. Writing to disk should use the function  `write_a_posting_list`, a
    static method implemented in inverted_index_colab.py under the InvertedIndex
    class.
    Parameters:
    -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    Returns:
    --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
    '''
    buckets = postings.map(lambda x: (token2bucket_id(x[0]), x))
    buckets = buckets.groupByKey().mapValues(list)
    return buckets.map(lambda x: InvertedIndex.write_a_posting_list(x, base_dir, bucket_name))

def calculate_df(postings):
    ''' Takes a posting list RDD and calculate the df for each token.
    Parameters:
    -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
    Returns:
    --------
    RDD
      An RDD where each element is a (token, df) pair.
    '''
    return postings.mapValues(len)

# def calculate_tf(postings):
#     return postings.mapValues(lambda x: x[1]).reduceByKey(lambda a, b: a + b)

# def calculate_tf_per_doc(postings):
#     return postings.groupByKey().mapValues(dict)

def calculate_document_length(postings):
    return postings.map(lambda x: (x[1][0], x[1][1])).reduceByKey(lambda a, b: a + b)

In [17]:
# word counts map
word_counts_text = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings_text = word_counts_text.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_text_filtered = postings_text.filter(lambda x: len(x[1])>50)
w2df_text = calculate_df(postings_text_filtered)
w2df_text_dict = w2df_text.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_text_filtered, './postings_gcp/text/').collect()

                                                                                

In [18]:
# word counts map
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_title_filtered = postings_title.filter(lambda x: len(x[1])>10)
w2df_title = calculate_df(postings_title_filtered)
w2df_title_dict = w2df_title.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_title_filtered, './postings_gcp/title/').collect()

                                                                                

In [19]:
# collect all posting lists locations into one super-set
super_posting_text_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/text'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_text_locs[k].extend(v)
    
    
# collect all posting lists locations into one super-set
super_posting_title_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/title'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_title_locs[k].extend(v)

In [20]:
# =============================================
# takes a while so run only when you need it!!!
# =============================================

# calculate document lengths:
title_doc_length = calculate_document_length(word_counts_title)
text_doc_length = calculate_document_length(word_counts_text)

# title_length = title_doc_length.collectAsMap()
# document_length = text_doc_length.collectAsMap()

total_title_length = title_doc_length.map(lambda x: x[1]).sum()
total_title_docs = title_doc_length.count()

# Calculate total length and total number of documents for texts:
total_text_length = text_doc_length.map(lambda x: x[1]).sum()
total_text_docs = text_doc_length.count()

# Calculate avdl:
average_title_length = total_title_length / total_title_docs
average_text_length = total_text_length / total_text_docs

                                                                                

Putting it all together

In [None]:
def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
    Parameters:
    -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
    Returns:
    --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
    '''
    # YOUR CODE HERE
    edges = pages.flatMapValues(lambda x: x).mapValues(lambda x: x[0]).distinct()
    vertices = edges.flatMap(lambda x: x).distinct().map(lambda x: (x, ))
    return edges, vertices

In [None]:
# pages_links = parquetFile.select ("id","anchor_text").rdd
# # construct the graph 
# edges, vertices = generate_graph(pages_links)
# # compute PageRank
# edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
# verticesDF = vertices.toDF(['id']).repartition(124, 'id')
# g = GraphFrame(verticesDF, edgesDF)
# pr_results = g.pageRank(resetProbability=0.15, maxIter=10)
# pr = pr_results.vertices.select("id", "pagerank")
# page_rank = {row['id']: row['pagerank'] for row in pr.collect()}

In [None]:
# # Paths
# # Using user page views (as opposed to spiders and automated traffic) for the
# # month of August 2021
# pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
# p = Path(pv_path)
# # pv_name = p.name
# # pv_temp = f'{p.stem}-4dedup.txt'
# pv_clean = f'{p.stem}.pkl'

In [None]:
# # Download the file (2.3GB)
# !wget -N $pv_path

In [None]:
# # Filter for English pages, and keep just two fields: article ID (3) and monthly
# # total number of page views (5). Then, remove lines with article id or page
# # view values that are not a sequence of digits.
# !bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp

In [None]:
# # Create a Counter (dictionary) that sums up the pages views for the same
# # article, resulting in a mapping from article id to total page views.
# wid2pv = Counter()
# with open(pv_temp, 'rt') as f:
#     for line in f:
#         parts = line.split(' ')
#         wid2pv.update({int(parts[0]): int(parts[1])})

In [None]:
# # write out the counter as binary file (pickle it)
# with open(pv_clean, 'wb') as f:
#     pickle.dump(wid2pv, f)
# # read in the counter
# with open('pageviews-202108-user.pkl', 'rb') as f:
#     wid2pv = pickle.loads(f.read())

In [21]:
# ======================================================
# not needed if you download the pickles from the bucket
# ======================================================
 
# Create inverted index instance
inverted_title = InvertedIndex(page_rank=None)
inverted_text = InvertedIndex(page_rank=None)

# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_title_locs
inverted_text.posting_locs = super_posting_text_locs

# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_title_dict
inverted_text.df = w2df_text_dict

inverted_title.corpus_size = N
inverted_text.corpus_size = N

# add these later!!
# inverted_title.doc_len = title_length
# inverted_text.doc_len = document_length

inverted_title.avdl = average_title_length
inverted_text.avdl = average_text_length

inverted_title.set_idf()
inverted_title.set_idf_bm25()

inverted_text.set_idf()
inverted_text.set_idf_bm25()

# write the global stats out
inverted_title.write_index('.', 'index_title')
inverted_text.write_index('.', 'index_text')

In [25]:
#################
# ADD BACK LATER#
#################

# # upload to gs
# ############################################################
# # Only uncomment this if you updated the index properly!!!!#
# ############################################################
index_title_src = "index_title.pkl"
index_text_src = "index_text.pkl"
index_title_dst = f'gs://{bucket_name}/pickles/{index_title_src}'
index_text_dst = f'gs://{bucket_name}/pickles/{index_text_src}'
!gsutil cp $index_title_src $index_title_dst
!gsutil cp $index_text_src $index_text_dst

Copying file://index_title.pkl [Content-Type=application/octet-stream]...
/ [1 files][  5.9 MiB/  5.9 MiB]                                                
Operation completed over 1 objects/5.9 MiB.                                      
Copying file://index_text.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 27.6 MiB/ 27.6 MiB]                                                
Operation completed over 1 objects/27.6 MiB.                                     


In [None]:
#################
# ADD BACK LATER#
#################

# !gsutil ls -lh $index_title_dst
# !gsutil ls -lh $index_text_dst

In [89]:
!gsutil cp gs://bgu-ir-ass3-fab/postings_gcp/title/inverted_title_v1.pkl /home/dataproc

Copying gs://bgu-ir-ass3-fab/postings_gcp/title/inverted_title_v1.pkl...
/ [1 files][314.9 MiB/314.9 MiB]                                                
Operation completed over 1 objects/314.9 MiB.                                    


In [94]:
!gsutil cp /home/dataproc/doc_len_title.pkl gs://bgu-ir-ass3-fab-stem/pickles/

Copying file:///home/dataproc/doc_len_title.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 42.3 MiB/ 42.3 MiB]                                                
Operation completed over 1 objects/42.3 MiB.                                     


In [None]:
# # path = '/home/dataproc'
# for blob in client.list_blobs(bucket_name):
#     if blob.name.endswith('pageviews-202108-user.pkl'):
#         file_name = blob.name[blob.name.rfind('/')+1:]
#         blob.download_to_filename(file_name)

In [None]:
# with open('inverted_title_v1.pkl', 'rb') as file:
#     inverted_title = pickle.load(file)
    
# with open('inverted_text_v1.pkl', 'rb') as file:
#     inverted_text = pickle.load(file)

In [None]:
# text_doc_len = inverted_text.doc_len.copy()
# title_doc_len = inverted_title.doc_len.copy()

In [None]:
# k1 = 1.2
# k3 = 0
# b = 0.5


# # testing:

# def retrieve_posting_list_title(query_word: str, bucket_name: str):
#     return inverted_title.read_a_posting_list(base_dir='.', w=query_word, bucket_name=bucket_name)

# def retrieve_posting_list_text(query_word: str, bucket_name: str):
#     pl = inverted_text.read_a_posting_list(base_dir='.', w=query_word, bucket_name=bucket_name)
#     # if len(pl) > 1000:
#         # pl = sorted(pl, key=itemgetter(1))
#         # pl = sorted(pl[:1000], key=itemgetter(0))
#     return pl

# def calculate_bm25_per_tf(token, doc_id_tf, inverted):
#     doc_id, tf = doc_id_tf
#     B = 1 - b + b * inverted.doc_len.get(doc_id, inverted.avdl)
#     return ((k1 + 1) / (B * k1 + tf)) * ((inverted.corpus_size + 1) /inverted.idf_bm25[token])


# def reduce_by_key(postings_lists):
#     # print(postings_lists)
#     # print(postings_lists)
#     bad_indices = []
#     for index, pl in enumerate(postings_lists):
#         if len(pl) == 0:
#             bad_indices.append(index)
#     for bad_index in bad_indices[::-1]:
#         del postings_lists[bad_index]
#     gens = [(item for item in pl) for pl in postings_lists]
#     items = [next(gen) for gen in gens]

#     combined = []
#     while any(tf != 0 for _, tf in items) and len(postings_lists) > 0:
#         # index, (doc_id, _) = min(enumerate(items), key=lambda x: x[1][0])
        
#         min_index = None
#         min_value = float('inf')  # Initialize with a very large value

#         for i, (doc_id, _) in enumerate(items):
#             if doc_id < min_value:  # Check if we have a new minimum
#                 min_index = i
#                 min_value = doc_id

#         index, doc_id = min_index, min_value
#         for index, item in enumerate(items):
#             if item[0] == doc_id:
#                 try:
#                     items[index] = next(gens[index])
#                 except StopIteration:
#                     items[index] = (float('inf'), 0)
#     return combined

# def query(query: str, bucket_name: str):
#     print('begin')
#     tokens = [token.group() for token in RE_WORD.finditer(query.lower())]
#     tokens = [token for token in tokens if token not in all_stopwords]
#     print('pl')
#     token_pl_title = {token: retrieve_posting_list_title(token, bucket_name) for token in tokens}
#     token_pl_text = {token: retrieve_posting_list_text(token, bucket_name) for token in tokens}

#     print('bm25 title')
#     bm25_per_term_doc_title = []
#     for token in token_pl_title.keys():
#         curr_term = []
#         for doc_id, tf in token_pl_title[token]:
#             curr_term.append((doc_id, calculate_bm25_per_tf(token, (doc_id, tf), inverted_title)))
#         bm25_per_term_doc_title.append(curr_term)

#     print('bm25 text')
#     bm25_per_term_doc_text = []
#     for token in token_pl_title.keys():
#         curr_term = []
#         for doc_id, tf in token_pl_text[token]:
#             curr_term.append((doc_id, calculate_bm25_per_tf(token, (doc_id, tf), inverted_text)))
#         bm25_per_term_doc_text.append(curr_term)

#     print('reduce title')
#     # print(bm25_per_term_doc)
#     bm25_title = reduce_by_key(bm25_per_term_doc_title)
#     print('reduce text')
#     bm25_title = list(map(lambda x: (x[0], x[1]), bm25_title))
#     bm25_text = reduce_by_key(bm25_per_term_doc_text)
#     print('reduce together')
#     # bm25_text = list(map(lambda x: (x[0], x[1]), bm25_text))
#     bm25 = reduce_by_key([bm25_title, bm25_text])
#     bm25 = list(map(lambda x: (x[0], sqrt(inverted_text.pagerank_normalized.get(x[0],0.2) + 1) * (x[1] ** 3)), bm25))

#     return sorted(bm25, key=lambda x: x[1], reverse=True)

In [None]:
#################
# ADD BACK LATER#
#################

# def update_attribute(instance, attribute_name, new_data):
#     """
#     Updates the specified attribute of an object instance with new data.

#     Args:
#         instance: The object instance to modify.
#         attribute_name (str): The name of the attribute to update.
#         new_data: The new value to assign to the attribute.

#     Raises:
#         AttributeError: If the object does not have the specified attribute.
#     """

#     if hasattr(instance, attribute_name) and len(getattr(instance, attribute_name)) > 0:
#         raise AttributeError(f"Object already has '{attribute_name}'")

#     setattr(instance, attribute_name, new_data)

# def update_pickle(instance, filename, new_data):
#     for attr, data in new_data.items():
#         update_attribute(instance, attr, data)
#     instance._posting_list = []
#     instance.write_index('.', filename)
#     index_title_src = f'{filename}.pkl'
#     index_title_dst = f'gs://{bucket_name}/postings_gcp/title/{index_title_src}'
#     !gsutil cp $index_title_src $index_title_dst

# if False:
#     text_new_data = {
#     }

#     title_new_data = {
#     }
    
#     update_pickle(inverted_text, 'inverted_text_v1', text_new_data)
#     update_pickle(inverted_title, 'inverted_title_v1', title_new_data)

In [78]:
import search_backend as se
from importlib import reload
reload(se)

<module 'search_backend' from '/home/dataproc/search_backend.py'>

In [79]:
search = se.Search()

In [123]:
text = 'Capitalism'
print(tokens := [token.group() for token in RE_WORD.finditer(text.lower())])
print([stemmer.stem(token) for token in tokens if token not in all_stopwords])
search.search_query(text)

['capitalism']
['capit']
begin
pl
reduce together


[('5902', 1104.7302586959106),
 ('5416', 1078.9425688858114),
 ('257210', 966.2845587255763),
 ('1944', 941.6985231026414),
 ('255627', 906.7860887159975),
 ('33728', 902.1291634936983),
 ('412425', 892.1223383106166),
 ('58829', 886.6012795996546),
 ('14094649', 882.5025738645519),
 ('2371868', 868.197504425741),
 ('2384714', 865.7312475987383),
 ('4504070', 849.7613040183572),
 ('181337', 843.8233859650637),
 ('1168152', 842.9052865367054),
 ('67243440', 833.2585325829679),
 ('4967816', 828.6953617549281),
 ('33652271', 827.383036726791),
 ('41914', 826.4755028746036),
 ('1461479', 825.1591939822392),
 ('505878', 824.2450667606756),
 ('2988488', 823.4799844102241),
 ('1075971', 821.3561859897849),
 ('83496', 818.185982751052),
 ('275150', 816.7305814138087),
 ('29665', 816.0940768718632),
 ('181293', 815.0447797834568),
 ('1757203', 813.5820284674861),
 ('3326582', 810.9426173529515),
 ('11817227', 808.3633174601748),
 ('24627377', 807.4667432982505),
 ('413896', 807.0799107354933),


In [90]:
with open('inverted_title_v1.pkl', 'rb') as f:
    old_title_index = pickle.load(f)

In [41]:
pagerank = old_text_index.pagerank_normalized.copy()

In [42]:
pagerank[12]

0.2542812351747581

In [43]:
with open('pagerank_normalized.pkl', 'wb') as f:
    pickle.dump(pagerank, f)

['apollo', 'moon', 'landing', 'date']


['apollo', 'moon', 'land', 'date']

In [91]:
doc_len_title = old_title_index.doc_len.copy()

In [93]:
with open('doc_len_title.pkl', 'wb') as file:
    pickle.dump(doc_len_title, file)

In [109]:
search.page_views[24544]

4.864006484330584

In [108]:
search.pagerank[24544]

0.31983406101967243

In [111]:
from BM25 import BM25

In [None]:
bm25 = BM25()

In [120]:
search.retrieve_posting_list('photosynthesi', bucket_name, search.inverted_title)

[]