<a href="https://colab.research.google.com/github/Naama133/WIKI_IR_ENGINE/blob/master/inverted_index_to_gcp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this notebook, we will create our inverted indices, by reading the wiki dump file from a gcp bucket.
Our indices will be saved in our GCP bucket storage, and will be used for our search engine.

We will create 3 instances of inverted index: one for the title, one for the body and one for the anchor text.

### Setup - General imports:

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import numpy as np
import math

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

### Installing, importing, and initializing PySpark

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

### Copy wiki data

In [None]:
full_path = "gs://wikidata_preprocessed/*"
parquetFile = spark.read.parquet(full_path)

                                                                                

In [None]:
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

In [None]:
bucket_name = "project_ir_test"
client = storage.Client()

Create stopwords RDD

In [None]:
nltk.download('stopwords')
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

### **function for all of the 3 indeces:**

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def tokenize(text, removeStopword):
  '''
  Returns list of tokens after tokenize the given text.
  choose if we want to remove stopword by using removeStopword boolean argument.
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  if removeStopword:
    tokens = [token for token in tokens if token not in all_stopwords]
  return tokens
    
def word_count(text, id, removeStopword):
  '''
  Count the frequency of each word in the given text (tf),
  and choose if we want to remove stopword by using removeStopword boolean argument.

  Parameters:
  -----------
    text: str: Text/title/anchor text of one document
    id: int: Document id
  Returns:
  --------
    A list of (token, (doc_id, tf)) pairs 
  '''
  tokens = tokenize(text, removeStopword)
  token_counter = Counter(tokens)
  tokens_wo_dup = []
  for token in tokens:
     if token not in tokens_wo_dup:
       tokens_wo_dup.append(token)
  return [(token, (id, token_counter[token])) for token in tokens_wo_dup]


def doc_to_term_counter(text, removeStopword):
  '''
  Calculates word counter for a given document
  '''
  tokens = tokenize(text, removeStopword)
  token_counter = Counter(tokens)
  return token_counter
  

def reduce_word_counts(unsorted_pl):
  '''
  Gets a list of values (unsorted posting list) and returns a sorted list (sorted posting list by wiki_id)
  (Operates on the pairs returned by word_count)

  Parameters:
  -----------
    unsorted_pl: A list of (wiki_id, tf) tuples 
  Returns:
  --------
    A sorted posting list.
  '''
  return sorted(unsorted_pl,key=lambda x: x[0])

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters: Postings is an RDD where each element is a (token, posting_list) pair.
  Returns: An RDD where each element is a (token, df) pair.
  '''
  return postings.groupByKey().mapValues(lambda x: len(list(x)[0]))

def partition_postings_and_write(postings, storage_path):
  '''
  partitions the posting list, writes out each bucket,
  and returns information about the location on storage of each posting list.

  Parameters:
  -----------
    postings: an RDD where each item is a (w, posting_list) pair.
    storage_path: where to write in storage 
  Returns: an RDD where each item is a posting locations dictionary for a bucket.
  '''
  return (
      postings
      .map(lambda x: (token2bucket_id(x[0]),(x[0],x[1])))
      .groupByKey()
      .map(lambda x: InvertedIndex.write_a_posting_list(x, bucket_name, storage_path)))

def calculate_DL(text, id, removeStopword):
  '''
  calculate docs len. returns a tuple of (id, doc len)
  choose if we want to remove stopword by using removeStopword boolean argument.
  '''
  tokens = tokenize(text, removeStopword)
  return((id,len(tokens)))

In [None]:
def createInvertedIndexInstance(super_posting_locs, w2df_dict, DL_dic, term_total, doc_id_to_norm, avgDl, pathName, bucket, needUpdateDocTitleDic):
  '''
  Create inverted index instance & write it to GCP function
  '''
  inverted_index = InvertedIndex()

  # Adding the posting locations dictionary to the inverted index
  inverted_index.posting_locs = super_posting_locs

  # Add the token - df dictionary to the inverted index
  inverted_index.df = w2df_dict

  # Added DL_body dict
  inverted_index.DL = DL_dic

  # Added term_total dict
  inverted_index.term_total = term_total

  inverted_index.total_vec_size = len(inverted_index.term_total)

  inverted_index.doc_id_to_norm = doc_id_to_norm

  inverted_index.avgDl = avgDl # avg documents length in the index

  if needUpdateDocTitleDic:
    inverted_index.doc_id_to_title = united_title_corpus.collectAsMap()

  # write the global stats out
  inverted_index.write_index('.', pathName)

  # upload to gs
  index_src = pathName+".pkl"
  index_dst = f'gs://{bucket}/postings_gcp/{pathName}/{index_src}'
  !gsutil cp $index_src $index_dst


From this point, we will strat to build 3 inverted indeces, one for the body, one for the title, and one for the anchor text

### **Document body inverted index**

In [None]:
united_body_corpus = parquetFile.select("text","id").rdd



In [None]:
word_counts_body = united_body_corpus.flatMap(lambda x: word_count(x[0], x[1],True))

postings_body = word_counts_body.groupByKey().mapValues(reduce_word_counts)

postings_filtered_body = postings_body.filter(lambda x: len(x[1])>50)

w2df_dict_body = calculate_df(postings_filtered_body).collectAsMap()

DL_body_rdd = united_body_corpus.map(lambda x: calculate_DL(x[0], x[1], True))

avgDl_body = DL_body_rdd.map(lambda x: x[1]).mean()

DL_body = DL_body_rdd.collectAsMap()

term_total_body = (word_counts_body.map(lambda x: (x[0], x[1][1]))).reduceByKey(lambda x, y: x + y).collectAsMap()

N = len(DL_body)

doc_id_to_token_counter_body = united_body_corpus.map(lambda x: (x[1], doc_to_term_counter(x[0], True)))

doc_id_to_norm_body = doc_id_to_token_counter_body.map(lambda x: (x[0], np.linalg.norm([(x[1][term] / DL_body[x[0]]) * math.log(N / w2df_dict_body.get(term, N), 10) for term in x[1]]))).collectAsMap()

_ = partition_postings_and_write(postings_filtered_body, "index_body").collect()

                                                                                

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_body = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/index_body'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_body[k].extend(v)

In [None]:
#create the inverted index instance, and write it to GCP
createInvertedIndexInstance(super_posting_locs_body, w2df_dict_body, DL_body, term_total_body, doc_id_to_norm_body, avgDl_body ,'index_body', bucket_name,False)

Copying file://bodyIndex.pkl [Content-Type=application/octet-stream]...
/ [1 files][  2.8 MiB/  2.8 MiB]                                                
Operation completed over 1 objects/2.8 MiB.                                      


### **Document title inverted index**

In [None]:
united_title_corpus = parquetFile.select("id","title").rdd



In [None]:
word_counts_title = united_title_corpus.flatMap(lambda x: word_count(x[1], x[0],False))

postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)

w2df_dict_title = calculate_df(postings_title).collectAsMap()

DL_title_rdd = united_title_corpus.map(lambda x: calculate_DL(x[1], x[0], False))

avgDl_title = DL_title_rdd.map(lambda x: x[1]).mean()

DL_title = DL_title_rdd.collectAsMap()

term_total_title = (word_counts_title.map(lambda x: (x[0], x[1][1]))).reduceByKey(lambda x, y: x + y).collectAsMap()

N = len(DL_title)

doc_id_to_token_counter_title = united_title_corpus.map(lambda x: (x[0], doc_to_term_counter(x[1], True)))

doc_id_to_norm_title = doc_id_to_token_counter_title.map(lambda x: (x[0], np.linalg.norm([(x[1][term] / DL_title[x[0]]) * math.log(N / w2df_dict_title.get(term, N), 10) for term in x[1]]))).collectAsMap()

# partition posting lists and write out
_ = partition_postings_and_write(postings_title, "index_title").collect()

                                                                                

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/index_title'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_title[k].extend(v)

In [None]:
#create the inverted index instance, and write it to GCP
createInvertedIndexInstance(super_posting_locs_title, w2df_dict_title, DL_title, term_total_title, doc_id_to_norm_title, avgDl_title, 'index_title', bucket_name, True)

Copying file://bodyIndex.pkl [Content-Type=application/octet-stream]...
/ [1 files][  2.8 MiB/  2.8 MiB]                                                
Operation completed over 1 objects/2.8 MiB.                                      


### **Document anchor text inverted index**

In [None]:
general_docs_anchor_text = parquetFile.select("id","anchor_text").rdd 

# Pointed documents RDD
united_anchor_text_corpus = general_docs_anchor_text.flatMap(lambda x :x[1]).groupByKey().mapValues(list).map(lambda x : (x[0]," ".join([y for y in x[1]])))



In [None]:
word_counts_anchor_text = united_anchor_text_corpus.flatMap(lambda x: word_count(str(x[1]), x[0],True))

postings_anchor_text = word_counts_anchor_text.groupByKey().mapValues(reduce_word_counts)

w2df_anchor_text = calculate_df(postings_anchor_text).collectAsMap()

DL_anchor_text_rdd = united_anchor_text_corpus.map(lambda x: calculate_DL(str(x[1]), x[0], True))

avgDl_anchor_text = DL_anchor_text_rdd.map(lambda x: x[1]).mean()

DL_anchor_text = DL_anchor_text_rdd.collectAsMap()

term_total_anchor_text = (word_counts_anchor_text.map(lambda x: (x[0], x[1][1]))).reduceByKey(lambda x, y: x + y).collectAsMap()

N = len(DL_anchor_text)

doc_id_to_token_counter_anchor_text = united_anchor_text_corpus.map(lambda x: (x[0], doc_to_term_counter(x[1], True)))

remove_doc_len_0 = doc_id_to_token_counter_anchor_text.filter(lambda x: DL_anchor_text[x[0]] != 0)

doc_id_to_norm_anchor_text = remove_doc_len_0.map(lambda x: (x[0], np.linalg.norm([(x[1][term] / DL_anchor_text.get(x[0],1)) * math.log(N / w2df_anchor_text.get(term, N), 10) for term in x[1]]))).collectAsMap()

# partition posting lists and write out
_ = partition_postings_and_write(postings_anchor_text, "index_anchor_text").collect()

                                                                                

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs_title_anchor_text = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/index_anchor_text'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_title_anchor_text[k].extend(v)

In [None]:
#create the inverted index instance, and write it to GCP
createInvertedIndexInstance(super_posting_locs_title_anchor_text, w2df_anchor_text, DL_anchor_text, term_total_anchor_text, doc_id_to_norm_anchor_text, avgDl_anchor_text, 'index_anchor_text', bucket_name, False)

Copying file://bodyIndex.pkl [Content-Type=application/octet-stream]...
/ [1 files][  2.8 MiB/  2.8 MiB]                                                
Operation completed over 1 objects/2.8 MiB.                                      
