## Title Inverted Index Setup


# Importing from google storage & setup inverted indacies
***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!
* don't forget to upload here "title_inverted_index_gcp.py"

In [1]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  WORKER_COUNT  PREEMPTIBLE_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-b3d0  GCE       4                                       RUNNING  us-central1-a


In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
import math
import sklearn
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Jan 14 07:46 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *
spark

When creating the bucket we importing from the bucket "wikidata20210801_preprocessed" which holds the dump files relevant for our setup for the curpus and the building of the inverted indices

In [6]:
bucket_name = 'bucket_ir_engine' 
full_path = f"gs://{bucket_name}/"
paths=[]
client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if 'multistream' in b.name:
        paths.append(full_path+b.name)
        

In [7]:
# print(paths)

***GCP setup is complete!***
If you got here without any errors you guys are wonderfull

# *Building the Indexes*

**Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.
In order to take entire corpus we will write "parquetFile = spark.read.parquet(*paths)", but first we will check on only one multistream file stored in 
varaible "path"**

In [8]:
parquetFile = spark.read.parquet(*paths)
doc_title_pairs = parquetFile.select("title", "id").rdd
doc_text_pairs = parquetFile.select("text","id").rdd
doc_anchor_pairs = parquetFile.select("anchor_text","id").rdd
doc2title = parquetFile.select("id", "title").rdd

                                                                                

In [9]:
# saving RDD in the form of dict to retrive titles to their doc id
doc2title_dict = doc2title.collectAsMap()
with open('/./home/dataproc/doc2title_dict.pkl', 'wb') as file:
    pickle.dump(doc2title_dict, file)

                                                                                

In [10]:
print(list(doc2title_dict.items())[:2])

[(4045403, 'Foster Air Force Base'), (4045413, 'Torino Palavela')]


* Counting the number of pages to make sure we on the right track (entire corpus should be more then 6M pages)
* Checking how "doc_title_pairs" look like
* Checking how "doc_text_pairs" look like
* Checking how "doc_anchor_pairs" look like

In [11]:
corpus_size = parquetFile.count()
print(f"corpus size: {corpus_size}")
# print("our RDDs at the begining:")
# doc_title_pairs.take(5)
# doc_text_pairs.take(5)
# doc_anchor_pairs.take(5)



corpus size: 6348910


                                                                                

importing *_inverted_index_gcp modules

In [12]:
# if nothing prints here you forgot to upload the file storage_backend.py to the home dir
%cd -q /home/dataproc
!ls title_inverted_index_gcp.py
!ls body_inverted_index_gcp.py
!ls anchors_inverted_index_gcp.py
!ls inverted_index_gcp.py

title_inverted_index_gcp.py
body_inverted_index_gcp.py
anchors_inverted_index_gcp.py
inverted_index_gcp.py


In [13]:
# adding our python modules to the cluster
sc.addFile("/home/dataproc/title_inverted_index_gcp.py")
sc.addFile("/home/dataproc/body_inverted_index_gcp.py")
sc.addFile("/home/dataproc/anchors_inverted_index_gcp.py")
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0, SparkFiles.getRootDirectory())

In [14]:
from title_inverted_index_gcp import InvertedIndex as Title_Inverted_Index
from body_inverted_index_gcp import InvertedIndex as Body_Inverted_Index
from anchors_inverted_index_gcp import InvertedIndex as Anchor_Inverted_Index
from inverted_index_gcp import InvertedIndex 

# Tokenization & More

In [15]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)


NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

#tf score
def word_count(text, _id):
    """
    calculating the term freq, not including stopwords
    Returns:
      List of tuples: [(token, (doc Id, tf)), ...]
    """
    #tokens = tokenize(text)
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())if token not in all_stopwords]
    tok_counter = Counter()
    tok_counter.update(tokens)
    return [(token[0],(_id, token[1])) for token in tok_counter.items()]

# sorting according to doc id
def reduce_word_counts(unsorted_pl):
    unsorted_pl = list(unsorted_pl)
    return sorted(unsorted_pl, key = lambda tf: tf[0])

def calculate_df(postings):
    return postings.map(lambda x: (x[0], len(x[1])))

def partition_postings_and_write_title(postings):
    postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    postings = postings.map(lambda x: Title_Inverted_Index.write_a_posting_list(x, bucket_name))
    return postings

def partition_postings_and_write_body(postings):
    postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    postings = postings.map(lambda x: Body_Inverted_Index.write_a_posting_list(x, bucket_name))
    return postings

def partition_postings_and_write_anchor(postings):
    postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    postings = postings.map(lambda x: Anchor_Inverted_Index.write_a_posting_list(x, bucket_name))
    return postings

def calculate_DocumentLengthXX(doc_id, text):
    Length_Counter = 0
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())if token not in all_stopwords]
    if(len(text) == 1):
        return((doc_id,1))
    for token in tokens:
        Length_Counter +=1
    return((doc_id,Length_Counter))  

def Calculate_DocumentLength_AnchorXX(text,doc_id):
    words_lst = []
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())if token not in all_stopwords]
    for token in tokens:
        words_lst.append(token)
    return((doc_id,len(words_lst))) 

def calculate_TermTotal(postings):
    def Term_Total_Count(postingList):
        TermTotalSum= 0
        for doc_id,tf in postingList:
            TermTotalSum+=tf
        return TermTotalSum   
    return postings.mapValues(Term_Total_Count)
    

# *** Let's get the TITLE Index Started! ***

In [16]:
# time the title index creation time
title_start = time()

In [17]:
# title word count map
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
#postings
title_postings = word_counts_title.groupByKey().mapValues(reduce_word_counts)

-> check how the title_postings filterd look like next to word_counts_title:

In [18]:
# print("At the begining:")
# word_counts_title.take(10)
# print("After sorting:")
# title_postings.take(10)

In [19]:
#calculating df
w2df_title = calculate_df(title_postings)
w2df_title_dict = w2df_title.collectAsMap()


# #calculating DL
DL_Array_Title = doc_title_pairs.map(lambda x:calculate_DocumentLengthXX(x[1],x[0]))
DL_Dict_Title = DL_Array_Title.collectAsMap()


#calculating TermTotal
w2termstotal_Title = calculate_TermTotal(title_postings)
dict_term_total_title = w2termstotal_Title.collectAsMap()


# partition posting lists and write out
posting_locs_Title = partition_postings_and_write_title(title_postings).collect()

                                                                                

-> check how the dictionaries (for the title index) that we created look like 

*** creating Title Inverted Index ***
* posting_locs = posting locations dictionary
* df = dictionary on the form {term: doc freq(how many docs it's in)}
* DL = dictionary in form of {doc_id: doc_length}
* term_total = dictionary in form of {term: total freq in corpus}

In [20]:
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=f'postings_gcp/Title_data'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)       
        for k, v in posting_locs.items():
            super_posting_locs_title[k].extend(v)

In [21]:
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_title_dict
#adding DL
inverted_title.DL = DL_Dict_Title
#adding Term Total
inverted_title.term_total = dict_term_total_title


# write the global stats out
inverted_title.write_index('.', 'Title_index')


#uploading to BUCKET
index_src = "Title_index.pkl"
index_dst = f'gs://{bucket_name}/Indices/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://Title_index.pkl [Content-Type=application/octet-stream]...
/ [1 files][132.9 MiB/132.9 MiB]                                                
Operation completed over 1 objects/132.9 MiB.                                    


In [22]:
title_const_time = time() - title_start
print(f"Title Inverted Index been created in {title_const_time} seconds")

Title Inverted Index been created in 84.63939571380615 seconds


In [23]:
# print(doc_title_pairs.take(10))

In [24]:
# # title doc frequency per term
# print(f"w2df_title_dict {list(islice(w2df_title_dict.items(),5))}")
# # docs lengths
# print(list(islice(DL_Dict_Title.items(), 5)))
# # total freq for term in corpus titles
# print(list(islice(dict_term_total_title.items(), 5)))

# * Let's get the BODY Index Started! *

In [25]:
# time the body index creation time
body_start = time()

In [26]:
# body word count map
word_counts_body = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
#postings
body_postings = word_counts_body.groupByKey().mapValues(reduce_word_counts)
# filter out rare words < 50
body_postings_filtered = body_postings.filter(lambda x: len(x[1])>50)

-> check how the body_postings filterd look like next to word_counts_body:

In [27]:
# print("At the begining:")
# word_counts_body.take(10)
# print("After sorting:")
# body_postings.take(10)
# body_postings_filtered.take(2)

In [None]:
#calculating df
w2df_body = calculate_df(body_postings_filtered)
w2df_body_dict = w2df_body.collectAsMap()

#calculating DL (IMPORTANT)
DL_Array_Body = doc_text_pairs.map(lambda x:calculate_DocumentLengthXX(x[1],x[0]))
DL_Dict_Body = DL_Array_Body.collectAsMap()
corpus_size = len(DL_Dict_Body)
DL_sum = 0
for  doc_id in DL_Dict_Body:
    DL_sum += DL_Dict_Body[doc_id]
avg_DL = DL_sum/corpus_size

#calculating TermTotal (IMPORTANT)
w2termstotal_body = calculate_TermTotal(body_postings_filtered)
dict_term_total_body = w2termstotal_body.collectAsMap()


                                                                                

In [None]:
#calculating idf
idf_dict = {}
for term, df_score in w2df_body_dict.items():
    idf_dict[term] = math.log10((corpus_size + 1)/w2df_body_dict[term])

# partition posting lists and write out
posting_locs_Body = partition_postings_and_write_body(body_postings_filtered).collect() #DK - body_postings_filtered instead of body_postings

                                                                                

-> check how to w2df dictionary for the title index looks like

In [None]:
# print(doc2title_dict.items())[:10]

*creating body Inverted Index*

In [None]:
super_posting_locs_body = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=f'postings_gcp/Body_data'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)       
        for k, v in posting_locs.items():
            super_posting_locs_body[k].extend(v)

In [None]:
# Create inverted index instance
inverted_body = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_body.posting_locs = super_posting_locs_body
# Add the token - df dictionary to the inverted index
inverted_body.df = w2df_body_dict
#adding DL
inverted_body.DL = DL_Dict_Body
#adding Term Total
inverted_body.term_total = dict_term_total_body

# idf dict
inverted_body.idf_dict = idf_dict
#Avg_DL
inverted_body.avg_DL = avg_DL
inverted_title.avg_DL = avg_DL
# corpus size
inverted_body.corpus_size = corpus_size
inverted_title.corpus_size = corpus_size

# write the global stats out
inverted_body.write_index('.', 'Body_index')


#uploading to BUCKET
index_src = "Body_index.pkl"
index_dst = f'gs://{bucket_name}/Indices/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://Body_index.pkl [Content-Type=application/octet-stream]...
- [1 files][ 76.2 MiB/ 76.2 MiB]                                                
Operation completed over 1 objects/76.2 MiB.                                     


In [None]:
body_const_time = time() - body_start
print(f"Body Inverted Index been created in {body_const_time} seconds")

Body Inverted Index been created in 2935.8650636672974 seconds


# * Let's get the ANCHOR Index Started! *

In [None]:
# time the anchor index creation time
anchor_start = time()

In [None]:
# Flat
flat_anchor_map = doc_anchor_pairs.flatMap(lambda x: x[0])
distict_group_anchor = flat_anchor_map.distinct().groupByKey().mapValues(lambda x: " ".join(x))

# anchor word count map
word_counts_anchor = distict_group_anchor.flatMap(lambda x: word_count(x[1], x[0]))
#postings
anchor_postings = word_counts_anchor.groupByKey().mapValues(reduce_word_counts)

-> check how the anchor_postings filterd look like next to word_counts_title:

In [None]:
# print("At the begining:")
# word_counts_anchor.take(10)
# print("After sorting:")
# anchor_postings.take(10)

In [None]:
#calculating df
w2df_anchor = calculate_df(anchor_postings)
w2df_anchor_dict = w2df_anchor.collectAsMap()

#calculating DL
DL_Array_Anchor = distict_group_anchor.map(lambda x:Calculate_DocumentLength_AnchorXX(x[1],x[0]))
DL_Dict_Anchor = DL_Array_Anchor.collectAsMap()

#calculating TermTotal
w2termstotal_anchor = calculate_TermTotal(anchor_postings)
dict_term_total_anchor = w2termstotal_anchor.collectAsMap()

# partition posting lists and write out
posting_locs_anchor = partition_postings_and_write_anchor(anchor_postings).collect()

                                                                                

-> check how to w2df dictionary for the anchor index looks like

In [None]:
# print(list(islice(w2df_anchor_dict.items(), 5)))

*creating anchor Inverted Index*

In [None]:
super_posting_locs_anchor = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=f'postings_gcp/Anchor_data'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)       
        for k, v in posting_locs.items():
            super_posting_locs_anchor[k].extend(v)

In [None]:
# Create inverted index instance
inverted_anchor = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anchor.posting_locs = super_posting_locs_anchor
# Add the token - df dictionary to the inverted index
inverted_anchor.df = w2df_anchor_dict
#adding DL
inverted_anchor.DL = DL_Dict_Anchor
#adding Term Total
inverted_anchor.term_total = dict_term_total_anchor

# write the global stats out
inverted_anchor.write_index('.', 'Anchor_index')

#uploading to BUCKET
index_src = "Anchor_index.pkl"
index_dst = f'gs://{bucket_name}/Indices/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://Anchor_index.pkl [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

| [1 files][166.6 MiB/166.6 MiB]                                                
Operation completed over 1 objects/166.6 MiB.                                    


In [None]:
anchor_const_time = time() - anchor_start
print(f"Anchor Inverted Index been created in {anchor_const_time} seconds")

Anchor Inverted Index been created in 1291.4913432598114 seconds


In [None]:
## the cell below is to check term_total dictionary:
##[('jurisdictions', 1210), ('concord', 416), ('crome', 7), ('adobes', 2), ('plasters', 15), ('slumber', 25), ('mylar', 20), ('grey-green', 1), ('spillover', 36), ('selber', 2), ('literary-critical', 2), ('vukmir', 1), ('ep-3e', 3), ('homocysteine', 10), ('earthy', 62), ('siena', 283), ('moonshot', 3), ('bruises', 42), ('candelabrum', 8), ('eclogae', 7)]

In [None]:
# print(inverted_body.term_total['jurisdictions'])

In [None]:
# print(w2df_body_dict["jurisdictions"])

In [47]:
print(f'title df: {list((inverted_title.df).items())[:2]}')
print(f'title term_total: {list((inverted_title.term_total).items())[:2]}')
print(f'title DL: {list((inverted_title.DL).items())[:2]}')
print(f'title avg_DL: {inverted_title.avg_DL}')
print(f'title corpus_size: {inverted_title.corpus_size}')

title df: [('1970', 2419), ('bar', 1566)]
title term_total: [('bar', 1574), ('intent', 72)]
title DL: [(4045403, 4), (4045413, 2)]
title avg_DL: 431.1623765339247
title corpus_size: 6348910
