# **General Imports**

In [None]:
import heapq
import math
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage
from contextlib import closing
from flask import Flask, request, jsonify

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# **Spark's Initializing**

In [None]:
# These will already be installed in the testing environment so disregard the 
# amount of time (~1 minute) it takes to install. 
!pip install -q pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install -q graphframes
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
graphframes_jar = 'https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar'
spark_jars = '/usr/local/lib/python3.7/dist-packages/pyspark/jars'
!wget -N -P $spark_jars $graphframes_jar

openjdk-8-jdk-headless is already the newest version (8u352-ga-1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
--2023-01-13 11:37:49--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 54.230.18.120, 54.230.18.122, 54.230.18.61, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|54.230.18.120|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘/usr/local/lib/python3.7/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar’ not modified on server. Omitting download.



In [None]:
import pyspark
# from pyspark.sql import *
# from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from graphframes import *
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

In [None]:
# Initializing spark context
# create a spark context and session
conf = SparkConf().set("spark.ui.port", "4050")
conf.set("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
sc = pyspark.SparkContext(conf=conf)
sc.addPyFile(str(Path(spark_jars) / Path(graphframes_jar).name))
spark = SparkSession.builder.getOrCreate()


# **Building the InvertedIndex**

## **MultiFileWriter + MultiFileReader Classes**

In [None]:
BLOCK_SIZE = 1999998

class MultiFileWriter:
    """ Sequential binary writer to multiple files of up to BLOCK_SIZE each. """
    def __init__(self, base_dir, name):
        self._base_dir = Path(base_dir)
        self._base_dir.mkdir(parents=True, exist_ok=True)
        self._name = name
        self._file_gen = (open(self._base_dir / f'{name}_{i:03}.bin', 'wb') 
                          for i in itertools.count())
        self._f = next(self._file_gen)
    
    def write(self, b):
      locs = []
      while len(b) > 0:
        pos = self._f.tell()
        remaining = BLOCK_SIZE - pos
        # if the current file is full, close and open a new one.
        if remaining == 0:  
          self._f.close()
          self._f = next(self._file_gen)
          pos, remaining = 0, BLOCK_SIZE
        self._f.write(b[:remaining])
        locs.append((self._f.name, pos))
        b = b[remaining:]
      return locs

    def close(self):
      self._f.close()

class MultiFileReader:
  """ Sequential binary reader of multiple files of up to BLOCK_SIZE each. """
  def __init__(self):
    self._open_files = {}

  def read(self, locs, n_bytes):
    b = []
    for f_name, offset in locs:
      if f_name not in self._open_files:
        self._open_files[f_name] = open(f_name, 'rb')
      f = self._open_files[f_name]
      f.seek(offset)
      n_read = min(n_bytes, BLOCK_SIZE - offset)
      b.append(f.read(n_read))
      n_bytes -= n_read
    return b''.join(b)

  # def read_rdd(self, locs, n_bytes):
  #   sc = SparkContext.getOrCreate()
  #   b = []
  #   last_index_in_locs = len(locs) - 1
  #   last_file_name_in_locs = locs[-1][0]
  #   sum_bytes_readed = 0
  #   for index, filename_offset_tuple in enumerate(locs):
  #     if index != last_index_in_locs:
  #       sum_bytes_readed += BLOCK_SIZE - filename_offset_tuple[1]
  #   bytes_to_read_in_last_file = n_bytes - sum_bytes_readed

  #   def read_file(file_name, offset):
  #     with open(file_name, 'rb') as f:
  #       f.seek(offset)
  #       if file_name == last_file_name_in_locs:
  #         return f.read(bytes_to_read_in_last_file)
  #       return f.read()

  #   file_name_to_offset_rdd = sc.parallelize(locs)
  #   result = file_name_to_offset_rdd.map(lambda file_name_to_offset: read_file(*file_name_to_offset))
  #   return result

  # def read_rdd(self, locs, n_bytes):
  #   sc = SparkContext.getOrCreate()
  #   b = []
  #   for f_name, offset in locs:
  #     if f_name not in self._open_files:
  #       self._open_files[f_name] = open(f_name, 'rb')
  #     f = self._open_files[f_name]
  #     f.seek(offset)
  #     n_read = min(n_bytes, BLOCK_SIZE - offset)
  #     b.append(f.read(n_read))
  #     n_bytes -= n_read
  #   return sc.parallelize(b)
  
  def close(self):
    for f in self._open_files.values():
      f.close()

  def __exit__(self, exc_type, exc_value, traceback):
    self.close()
    return False

## **InvertedIndex Class**

### Class

In [None]:
TUPLE_SIZE = 6       # We're going to pack the doc_id and tf values in this 
                     # many bytes.
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer

class InvertedIndex:  
  def __init__(self):
    """ Initializes the inverted index and add documents to it (if provided).
    Parameters:
    -----------
      docs: dict mapping doc_id to list of tokens
    """
    # stores document frequency per term
    self.df = Counter()
    # stores total frequency per term
    self.term_total = Counter()
    # stores posting list per term while building the index (internally), 
    # otherwise too big to store in memory.
    self._posting_list = defaultdict(list)
    # mapping a term to posting file locations, which is a list of 
    # (file_name, offset) pairs. Since posting lists are big we are going to
    # write them to disk and just save their location in this list. We are 
    # using the MultiFileWriter helper class to write fixed-size files and store
    # for each term/posting list its list of locations. The offset represents 
    # the number of bytes from the beginning of the file where the posting list
    # starts. 
    self.posting_locs = defaultdict(list)
    self.num_of_docs = 0
    # We're going to update and calculate this after each document. This will be usefull for the calculation of AVGDL (utilized in BM25) 
    self.DL = defaultdict() 
    self.doc_id_to_norm = defaultdict()
    self.doc_id_to_title = defaultdict()
    self.avg_dl = 0

  def write_index(self, base_dir, name):
    """ Write the in-memory index to disk. Results in the file: 
        (1) `name`.pkl containing the global term stats (e.g. df).
    """
    self._write_globals(base_dir, name)

  def _write_globals(self, base_dir, name):
    with open(Path(base_dir) / f'{name}.pkl', 'wb') as f:
      pickle.dump(self, f)

  def __getstate__(self):
    """ Modify how the object is pickled by removing the internal posting lists
        from the object's state dictionary. 
    """
    state = self.__dict__.copy()
    del state['_posting_list']
    return state

  def posting_lists_iter(self):
    """ A generator that reads one posting list from disk and yields 
        a (word:str, [(doc_id:int, tf:int), ...]) tuple.
    """
    with closing(MultiFileReader()) as reader:
      for w, locs in self.posting_locs.items():
        b = reader.read(locs, self.df[w] * TUPLE_SIZE)
        posting_list = []
        for i in range(self.df[w]):
          doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
          tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
          posting_list.append((doc_id, tf))
        yield w, posting_list

  @staticmethod
  def read_index(base_dir, name):
    with open(Path(base_dir) / f'{name}.pkl', 'rb') as f:
      return pickle.load(f)

  @staticmethod
  def delete_index(base_dir, name):
    path_globals = Path(base_dir) / f'{name}.pkl'
    path_globals.unlink()
    for p in Path(base_dir).rglob(f'{name}_*.bin'):
      p.unlink()

  @staticmethod
  def write_a_posting_list(b_w_pl,base_dir):
    ''' Takes a (bucket_id, [(w0, posting_list_0), (w1, posting_list_1), ...]) 
    and writes it out to disk as files named {bucket_id}_XXX.bin under the 
    current directory. Returns a posting locations dictionary that maps each 
    word to the list of files and offsets that contain its posting list.
    Parameters:
    -----------
      b_w_pl: tuple
        Containing a bucket id and all (word, posting list) pairs in that bucket
        (bucket_id, [(w0, posting_list_0), (w1, posting_list_1), ...])
    Return:
      posting_locs: dict
        Posting locations for each of the words written out in this bucket.
    '''
    posting_locs = defaultdict(list)
    bucket, list_w_pl = b_w_pl

    with closing(MultiFileWriter(base_dir, bucket)) as writer:
      for w, pl in list_w_pl: 
        # convert to bytes
        b = b''.join([(doc_id << 16 | (tf & TF_MASK)).to_bytes(TUPLE_SIZE, 'big')
                      for doc_id, tf in pl])
        # write to file(s)
        locs = writer.write(b)
      # save file locations to index
        posting_locs[w].extend(locs)
    return posting_locs

  def read_posting_list(self, w, is_read_all):
    with closing(MultiFileReader()) as reader:
      locs = self.posting_locs[w]
      num_of_postings_to_read =  self.df[w] if is_read_all else min(K_TOP_ELEMENTS, self.df[w])
      b = reader.read(locs, num_of_postings_to_read * TUPLE_SIZE)
      posting_list = []
      for i in range(num_of_postings_to_read):
        doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
        tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
        posting_list.append((doc_id, tf))
      return posting_list

  def read_posting_list_to_dict(self, w, is_read_all):
    with closing(MultiFileReader()) as reader:
      locs = self.posting_locs[w]
      num_of_postings_to_read =  self.df[w] if is_read_all else min(K_TOP_ELEMENTS, self.df[w])
      b = reader.read(locs, num_of_postings_to_read * TUPLE_SIZE)
      posting_list_dict = defaultdict()
      for i in range(num_of_postings_to_read):
        doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
        tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
        posting_list_dict[doc_id] = tf
      return posting_list_dict

  # def read_posting_list_to_rdd(self, w):
  #   with closing(MultiFileReader()) as reader:
  #     locs = self.posting_locs[w]
  #     file_bytes_rdd = reader.read_rdd(locs, self.df[w] * TUPLE_SIZE)
  #     file_bytes_in_chunks_of_6_rdd = file_bytes_rdd.flatMap(lambda bytes_from_file: [bytes_from_file[i:i+6] for i in range(0, len(bytes_from_file), 6)])
  #     result = file_bytes_in_chunks_of_6_rdd.map(lambda six_bytes: (int.from_bytes(six_bytes[0:4], 'big'), int.from_bytes(six_bytes[4:6], 'big')))
  #     return result

### Cosine-Similarity methods in InvertedIndex class


In [None]:
class InvertedIndex(InvertedIndex):
  def generate_tfidf_scores_vectors(self, query_to_search, is_all_results):
    """ 
    Parameters:
    -----------
    query_to_search: list of tokens (str). This list will be preprocessed in advance (e.g., lower case, filtering stopwords, etc.'). 
                     Example: 'Hello, I love information retrival' --->  ['hello','love','information','retrieval']

    index:           inverted index loaded from the corresponding files.    
    
    Returns:
    -----------
    docs_result: {key: doc_id, value: vector (list) of query words docs scores}
    query_result: vector (list) of query words scores
    """
    epsilon = .0000001
    docs_result = defaultdict()
    query_counter = Counter(query_to_search)
    num_of_tokens_in_query = len(query_to_search)
    unique_query_tokens = set(query_to_search)
    num_of_unique_tokens_in_query = len(unique_query_tokens)
    index_and_query_tokens = enumerate(unique_query_tokens)
    query_result = [0] * num_of_unique_tokens_in_query
    for token_index, token in index_and_query_tokens:
      if token in self.df.keys(): #avoid terms that do not appear in the index.
        query_tf = query_counter[token] / num_of_tokens_in_query # term frequency divded by the length of the query
        index_idf = math.log(self.num_of_docs / (self.df[token] + epsilon), 10) #smoothing
            
        list_of_candidates_docs = self.read_posting_list(token, is_all_results)
        for doc_id, freq in list_of_candidates_docs:
          if doc_id not in docs_result:
            docs_result[doc_id] = [0] * num_of_unique_tokens_in_query
          docs_result[doc_id][token_index] = freq / self.DL[doc_id] * index_idf
            
        query_result[token_index] = query_tf * index_idf
    return docs_result, query_result
    
  # def generate_tfidf_scores_rdd_and_vector(self, query_to_search):
  #   sc = SparkContext.getOrCreate()
  #   epsilon = .0000001
  #   query_counter = Counter(query_to_search)
  #   num_of_tokens_in_query = len(query_to_search)
  #   unique_query_tokens = set(query_to_search)
  #   token_to_index_idf_dict = {token : math.log(self.num_of_docs / (self.df[token] + epsilon), 10) for token in unique_query_tokens}
  #   token_to_query_tf_dict = {token: query_counter[token] / num_of_tokens_in_query for token in unique_query_tokens}
  #   token_to_dict_of_candidates_docs = {token : self.read_posting_list_to_dict(token) for token in unique_query_tokens}
  #   union_of_candidates_docs = {doc_id for posting_list_dict in token_to_dict_of_candidates_docs.values() for doc_id in posting_list_dict.keys()}
  #   candidates_docs_rdd = sc.parallelize(union_of_candidates_docs)
  #   docs_result_rdd = candidates_docs_rdd.map(lambda candidate_doc_id: (candidate_doc_id, [(token_to_dict_of_candidates_docs[token][candidate_doc_id] / self.DL[candidate_doc_id]) * token_to_index_idf_dict[token] if candidate_doc_id in token_to_dict_of_candidates_docs[token] else 0 for token in unique_query_tokens]))
  #   query_result = [token_to_query_tf_dict[token] * token_to_index_idf_dict[token] for token in unique_query_tokens]
  #   print(docs_result_rdd.take(300))
  #   return docs_result_rdd, query_result

  # def generate_tfidf_scores_with_read_rdd(self, query_to_search):
  #   sc = SparkContext.getOrCreate()
  #   epsilon = .0000001
  #   query_counter = Counter(query_to_search)
  #   num_of_tokens_in_query = len(query_to_search)
  #   unique_query_tokens = list(set(query_to_search))
  #   num_of_unique_tokens_in_query = len(unique_query_tokens)
  #   token_to_index_idf_dict = {token : math.log(self.num_of_docs / (self.df[token] + epsilon), 10) for token in unique_query_tokens}
  #   token_to_query_tf_dict = {token: query_counter[token] / num_of_tokens_in_query for token in unique_query_tokens}

  #   token_to_rdd = {token : self.read_posting_list_to_rdd(token) for token in unique_query_tokens}
  #   token_to_scores_dict = defaultdict()
  #   for token in unique_query_tokens:
  #     token_to_rdd[token] = token_to_rdd[token].map(lambda doc_id_to_freq: (doc_id_to_freq[0], (doc_id_to_freq[1] / self.DL[doc_id_to_freq[0]]) * token_to_index_idf_dict[token]))
  #     token_to_scores_dict[token] = dict(token_to_rdd[token].top(K_TOP_ELEMENTS, key=lambda doc_id_to_tfidf_score: doc_id_to_tfidf_score[1]))

  #   doc_id_to_scores_list_dict = defaultdict()
  #   for token_index, token in enumerate(unique_query_tokens):
  #     for doc_id, tfidf_score in token_to_scores_dict[token].items():
  #       if doc_id not in doc_id_to_scores_list_dict:
  #         doc_id_to_scores_list_dict[doc_id] = [0] * num_of_unique_tokens_in_query
  #       doc_id_to_scores_list_dict[doc_id][token_index] = tfidf_score
  #   query_result = [token_to_query_tf_dict[token] * token_to_index_idf_dict[token] for token in unique_query_tokens]
  #   return doc_id_to_scores_list_dict, query_result
  
  # def calculate_cossim_by_weights_of_tfidf_for_search_body(self, query_to_search):
  #   '''
  #     returns a list of the top 100 docs matching this query according to cosine similarity score.
  #   '''
  #   # result = list()
  #   doc_id_to_list_of_tfidf_scores_rdd, query_tfidf_scores_list = self.generate_tfidf_scores_rdd_and_vector(query_to_search)
  #   query_magnitude = math.sqrt(sum([x ** 2 for x in query_tfidf_scores_list]))
  #   dot_product_rdd = doc_id_to_list_of_tfidf_scores_rdd.map(lambda doc_id_to_scores_list: (doc_id_to_scores_list[0], sum([x * y for x, y in zip(doc_id_to_scores_list[1], query_tfidf_scores_list)])))
  #   cossim_score = dot_product_rdd.map(lambda doc_id_to_dot_product: (doc_id_to_dot_product[0], doc_id_to_dot_product[1] / (query_magnitude * self.doc_id_to_norm[doc_id_to_dot_product[0]])))
  #   # sorted_doc_id_to_cossim_score_rdd = cossim_score.sortBy(lambda doc_id_to_cossim_score: doc_id_to_cossim_score[1], ascending=False)
  #   # sorted_doc_id_to_cossim_score_rdd_with_index = sorted_doc_id_to_cossim_score_rdd.zipWithIndex()
  #   # sorted_doc_id_to_cossim_score_rdd_top_100 = sorted_doc_id_to_cossim_score_rdd_with_index.filter(lambda x: x[1] < 100).map(lambda x: x[0])
  #   sorted_doc_id_to_cossim_score_top_100 = cossim_score.top(100, key=lambda doc_id_to_cossime_score: doc_id_to_cossime_score[1])
  #   # result_rdd = sorted_doc_id_to_cossim_score_rdd_top_100.map(lambda doc_id_to_cossim_score: (doc_id_to_cossim_score[0], self.doc_id_to_title[doc_id_to_cossim_score[0]], doc_id_to_cossim_score[1]))
  #   # docs_tfidf_dict, query_tfidf_vector = self.generate_tfidf_scores_vectors(query_to_search)
  #   # query_magnitude = math.sqrt(sum([x ** 2 for x in query_tfidf_vector]))
  #   # for doc_id, scores_vector in docs_tfidf_dict.items():
  #   #   dot_product = sum([x * y for x, y in zip(scores_vector, query_tfidf_vector)])
  #   #   doc_magnitude = self.doc_id_to_norm[doc_id]
  #   #   cossim_score = dot_product / (query_magnitude * doc_magnitude)
  #   #   heapq.heappush(result, (-1 * cossim_score, doc_id))
  #   # sorted_top_100_docs = heapq.nsmallest(TOP_N_FOR_SEARCH_BODY, result)
  #   return [(doc_id, self.doc_id_to_title[doc_id]) for doc_id, score in sorted_doc_id_to_cossim_score_top_100]

  # def calculate_cossim_by_weights_of_tfidf_old(self, query_to_search):
  #   '''
  #     returns an RDD of all the docs matching this query according to cosine similarity score.
  #   '''
  #   doc_id_to_list_of_tfidf_scores_rdd, query_tfidf_scores_list = self.generate_tfidf_scores_rdd_and_vector_with_read_rdd(query_to_search)
  #   query_magnitude = math.sqrt(sum([x ** 2 for x in query_tfidf_scores_list]))
  #   dot_product_rdd = doc_id_to_list_of_tfidf_scores_rdd.map(lambda doc_id_to_scores_list: (doc_id_to_scores_list[0], sum([x * y for x, y in zip(doc_id_to_scores_list[1], query_tfidf_scores_list) if x is not None])))
  #   cossim_score = dot_product_rdd.map(lambda doc_id_to_dot_product: (doc_id_to_dot_product[0], doc_id_to_dot_product[1] / (query_magnitude * self.doc_id_to_norm[doc_id_to_dot_product[0]])))
  #   # sorted_doc_id_to_cossim_score_top_100 = cossim_score.top(100, key=lambda doc_id_to_cossime_score: doc_id_to_cossime_score[1])
  #   return cossim_score

  def calculate_cossim_by_weights_of_tfidf(self, query_to_search, num_of_results, is_all_results):
    result = list()
    doc_id_to_scores_list_dict, query_tfidf_scores_list = self.generate_tfidf_scores_vectors(query_to_search, is_all_results)
    query_magnitude = math.sqrt(sum([x ** 2 for x in query_tfidf_scores_list]))
    for doc_id, scores_list in doc_id_to_scores_list_dict.items():
      dot_product = sum([x * y for x, y in zip(scores_list, query_tfidf_scores_list)])
      doc_magnitude = self.doc_id_to_norm[doc_id]
      cossim_score = dot_product / (query_magnitude * doc_magnitude)
      heapq.heappush(result, (-1 * cossim_score, doc_id))
    
    sorted_top_num_of_docs = heapq.nsmallest(num_of_results, result)
    return [(doc_id, (score * -1)) for score, doc_id in sorted_top_num_of_docs]


### BM25

In [None]:
class InvertedIndex(InvertedIndex): 
  def generate_doc_id_to_BM25_score_dict_new(self, query_to_search, b, k1, k3, is_all_results):
    epsilon = 0.000001
    query_counter = Counter(query_to_search)
    num_of_tokens_in_query = len(query_to_search)
    unique_query_tokens = list(set(query_to_search))
    num_of_unique_tokens_in_query = len(unique_query_tokens)
    token_to_index_idf_dict = {token : math.log((self.num_of_docs + 1) / (self.df[token] + epsilon), 10) for token in unique_query_tokens}
    # token_to_query_tf_tag_dict = {token: ((k3 + 1) * (query_counter[token] / num_of_tokens_in_query)) / (k3 + (query_counter[token] / num_of_tokens_in_query)) for token in unique_query_tokens}
    token_to_posting_list_dict_dict = {token : self.read_posting_list_to_dict(token, is_all_results) for token in unique_query_tokens}
    doc_id_to_BM25_score_dict = defaultdict()
    for token in unique_query_tokens:
      for doc_id, freq in token_to_posting_list_dict_dict[token].items():
        B = 1 - b + (b * (self.DL[doc_id] / self.avg_dl))
        tf_doc = freq / self.DL[doc_id]
        doc_id_to_BM25_score_dict[doc_id] = doc_id_to_BM25_score_dict.get(doc_id, 0) + ((((k1 + 1) * tf_doc) / (B*k1 + tf_doc)) * token_to_index_idf_dict[token])
    return doc_id_to_BM25_score_dict

  # def generate_doc_id_to_BM25_score_rdd(self, query_to_search, b, k1, k3):
  #   '''
  #     returns an RDD of all the docs matching this query according to BM25 score.
  #   '''
  #   sc = SparkContext.getOrCreate()
  #   epsilon = 0
  #   query_counter = Counter(query_to_search)
  #   num_of_tokens_in_query = len(query_to_search)
  #   unique_query_tokens = set(query_to_search)
  #   token_to_index_idf_dict = {token : math.log((self.num_of_docs + 1) / (self.df[token] + epsilon), 10) for token in unique_query_tokens}
  #   token_to_query_tf_tag_dict = {token: ((k3 + 1) * (query_counter[token] / num_of_tokens_in_query)) / (k3 + (query_counter[token] / num_of_tokens_in_query)) for token in unique_query_tokens}
  #   token_to_dict_of_candidates_docs = {token : self.read_posting_list_to_dict(token) for token in unique_query_tokens}
  #   union_of_candidates_docs = {doc_id for posting_list_dict in token_to_dict_of_candidates_docs.values() for doc_id in posting_list_dict.keys()}
  #   candidates_docs_rdd = sc.parallelize(union_of_candidates_docs)
  #   doc_id_to_BM25_scores_rdd = candidates_docs_rdd.map(lambda candidate_doc_id: (candidate_doc_id, [(((k1 + 1) * (token_to_dict_of_candidates_docs[token][candidate_doc_id] / self.DL[candidate_doc_id])) / (((1 - b + (b * (self.DL[candidate_doc_id] / AVG_DL))) * k1) + (token_to_dict_of_candidates_docs[token][candidate_doc_id] / self.DL[candidate_doc_id]))) * token_to_index_idf_dict[token] * token_to_query_tf_tag_dict[token] if candidate_doc_id in token_to_dict_of_candidates_docs[token] else 0 for token in unique_query_tokens]))
  #   BM25_scores_rdd = doc_id_to_BM25_scores_rdd.map(lambda doc_id_to_list: (doc_id_to_list[0], sum(doc_id_to_list[1])))
  #   # sorted_doc_id_to_BM25_score_top_100 = BM25_scores_rdd.top(100, key=lambda doc_id_to_BM25_score: doc_id_to_BM25_score[1])
  #   return BM25_scores_rdd

  # def generate_doc_id_to_BM25_score_dict(self, query_to_search, b, k1, k3):
  #   '''
  #     returns an RDD of all the docs matching this query according to BM25 score.
  #   '''
  #   sc = SparkContext.getOrCreate()
  #   epsilon = 0.000001
  #   query_counter = Counter(query_to_search)
  #   num_of_tokens_in_query = len(query_to_search)
  #   unique_query_tokens = list(set(query_to_search))
  #   num_of_unique_tokens_in_query = len(unique_query_tokens)
  #   token_to_index_idf_dict = {token : math.log((self.num_of_docs + 1) / (self.df[token] + epsilon), 10) for token in unique_query_tokens}
  #   token_to_query_tf_tag_dict = {token: ((k3 + 1) * (query_counter[token] / num_of_tokens_in_query)) / (k3 + (query_counter[token] / num_of_tokens_in_query)) for token in unique_query_tokens}
  #   token_to_rdd = {token : self.read_posting_list_to_rdd(token) for token in unique_query_tokens}
  #   token_to_scores_dict = defaultdict()
  #   for token in unique_query_tokens:
  #     token_to_rdd[token] = token_to_rdd[token].map(lambda doc_id_to_freq: (doc_id_to_freq[0], (((k1 + 1) * (doc_id_to_freq[1] / self.DL[doc_id_to_freq[0]])) / (((1 - b + (b * (self.DL[doc_id_to_freq[0]] / AVG_DL))) * k1) + (doc_id_to_freq[1] / self.DL[doc_id_to_freq[0]]))) * token_to_index_idf_dict[token] * token_to_query_tf_tag_dict[token]))
  #     token_to_scores_dict[token] = dict(token_to_rdd[token].top(K_TOP_ELEMENTS, key=lambda doc_id_to_BM25_score: doc_id_to_BM25_score[1]))
      
  #   doc_id_to_BM25_score_dict = defaultdict()
  #   for token in unique_query_tokens:
  #     for doc_id, BM25_score in token_to_scores_dict[token].items():
  #       current_BM25_score = doc_id_to_BM25_score_dict.setdefault(doc_id, 0)
  #       doc_id_to_BM25_score_dict[doc_id] = current_BM25_score + BM25_score

  #   return doc_id_to_BM25_score_dict

  def calculate_BM25_scores(self, query_to_search, b, k1, k3, num_of_results, is_all_results):
    result = list()
    doc_id_to_BM25_scores_dict = self.generate_doc_id_to_BM25_score_dict_new(query_to_search, b, k1, k3, is_all_results)
    for doc_id, BM25_score in doc_id_to_BM25_scores_dict.items():
      heapq.heappush(result, (-1 * BM25_score, doc_id))
    
    sorted_top_num_of_docs = heapq.nsmallest(num_of_results, result)
    return [(doc_id, (score * -1)) for score, doc_id in sorted_top_num_of_docs]


### Boolean function methods in InvertedIndex class

In [None]:
class InvertedIndex (InvertedIndex):
  def rank_by_boolean_scores(self, query_to_search, is_all_results):
      """ 
      Parameters:
      -----------
      query_to_search: list of tokens (str). This list will be preprocessed in advance (e.g., lower case, filtering stopwords, etc.'). 
                      Example: 'Hello, I love information retrival' --->  ['hello','love','information','retrieval']

      index:           inverted index loaded from the corresponding files.    
      
      Returns:
      -----------
      docs_result: {key: doc_id, value: vector (list) of query words docs scores}
      """
      docs_to_boolean_score = defaultdict()
      unique_query_tokens = set(query_to_search)
      for token in unique_query_tokens:
        if token in self.df.keys(): #avoid terms that do not appear in the index.  
          list_of_candidates_docs = self.read_posting_list(token, is_all_results)
          for doc_id, freq in list_of_candidates_docs:
            docs_to_boolean_score[doc_id] = docs_to_boolean_score.get(doc_id, 0) + 1
      sorted_docs_to_boolean_score = sorted(docs_to_boolean_score.items(), key = lambda doc_id_to_score: doc_id_to_score[1], reverse = True)
      # return [(doc_id, self.doc_id_to_title[doc_id]) for doc_id, score in sorted_docs_to_boolean_score]
      return sorted_docs_to_boolean_score

### Searching methods

In [None]:
class InvertedIndex(InvertedIndex):
  def search_body(self, query_to_search_string):
    processed_query_tokens_list = process_text(0, query_to_search_string)[1]
    # matching_docs_to_score_rdd = self.calculate_cossim_by_weights_of_tfidf(processed_query_tokens_list)
    # sorted_doc_id_to_cossim_score_top_100 = matching_docs_to_score_rdd.top(100, key=lambda doc_id_to_cossime_score: doc_id_to_cossime_score[1])
    sorted_cossim_score_top_100 = self.calculate_cossim_by_weights_of_tfidf(processed_query_tokens_list, 100, is_all_results=False)
    return [(doc_id, self.doc_id_to_title[doc_id]) for doc_id, score in sorted_cossim_score_top_100]

  def search_anchor(self, query):
    processed_query_tokens_list = process_text(0, query)[1]
    return self.rank_by_boolean_scores(processed_query_tokens_list, is_all_results=True)

  def search_title(self, query):
    processed_query_tokens_list = process_text(0, query)[1]
    return self.rank_by_boolean_scores(processed_query_tokens_list, is_all_results=True)

  def search_cossim(self, query_to_search_string, num_of_results):
    processed_query_tokens_list = process_text(0, query_to_search_string)[1]
    sorted_doc_id_to_cossim_score_top_n = self.calculate_cossim_by_weights_of_tfidf(processed_query_tokens_list, num_of_results, is_all_results=False)
    return [(doc_id, self.doc_id_to_title[doc_id], score) for doc_id, score in sorted_doc_id_to_cossim_score_top_n]

  def search_BM25(self, query_to_search_string, b, k1, k3, num_of_results):
    processed_query_tokens_list = process_text(0, query_to_search_string)[1]
    sorted_BM25_scores = self.calculate_BM25_scores(processed_query_tokens_list, b, k1, k3, num_of_results, is_all_results=False)
    # matching_docs_to_score_rdd = self.generate_doc_id_to_BM25_score_rdd(processed_query_tokens_list, b, k1, k3)
    # sorted_doc_id_to_cossim_score_top_100 = matching_docs_to_score_rdd.top(100, key=lambda doc_id_to_cossime_score: doc_id_to_cossime_score[1])
    return [(doc_id, self.doc_id_to_title[doc_id], score) for doc_id, score in sorted_BM25_scores]


## **Turning our csv to DataFrame**

In [None]:
def import_dataframe_from_csv(csv_path):
  '''
  parameter: csv path of dataframe object (for example "/home/dataproc/test.csv")
  return: spark's dataframe object of (doc_id, title, body, anchor_text)
  '''
  pandas_copy_of_relevant_docs = pd.read_csv(csv_path)
  return spark.createDataFrame(pandas_copy_of_relevant_docs)

## **Global variables and Constants**

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]
all_stopwords = english_stopwords.union(corpus_stopwords)

RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

MINIMUM_DF = 10

TOP_N_FOR_SEARCH_BODY = 100

PAGE_RANKS_PATH = '/content/doc_id_to_pagerank_dict.pkl'

PAGE_VIEWS_PATH = '/content/doc_id_to_pageview_dict.pkl'

docs_data_frame = import_dataframe_from_csv('/content/nirs_relevant_documents_from_gcp.csv')

b = 0.75

AVG_DL = 1758.6576955424728
# average([dl for dl in relevant_body_inverted_index.DL.values()])

K_TOP_ELEMENTS = 400

## **Text Processing**

In [None]:
def extract_id_title(dataframe):
  return dataframe.select("id", "title").rdd

def extract_id_body(dataframe):
  return dataframe.select("id", "text", "title").rdd

def extract_id_anchor_text(dataframe):
  anchors_rdd = dataframe.select("anchor_text").rdd
  doc_id_as_string_to_text_as_string = anchors_rdd.map(lambda anchor_id_string_to_text_string_as_one_string: re.findall(r"\bid=(\d+), text='([^']+)'", anchor_id_string_to_text_string_as_one_string[0])).flatMap(lambda anchors_row: anchors_row)
  doc_id_as_number_to_text_as_string = doc_id_as_string_to_text_as_string.map(lambda anchor_id_to_text: (int(anchor_id_to_text[0]), anchor_id_to_text[1]))
  doc_id_as_number_to_text_as_string_no_duplicated_texts = doc_id_as_number_to_text_as_string.distinct()
  doc_id_as_number_to_word_as_string_no_duplicated_words = doc_id_as_number_to_text_as_string_no_duplicated_texts.flatMap(lambda doc_id_to_text: [(doc_id_to_text[0], word_in_text.group()) for word_in_text in RE_WORD.finditer(doc_id_to_text[1].lower())]).distinct()
  return doc_id_as_number_to_word_as_string_no_duplicated_words
  # return anchors_rdd.flatMap(lambda anchors_row: anchors_row) # real working return for the furture

def extract_rdd_from_data_frame(dataframe, title=False, body=False, anchor=False):
  if title:
    return extract_id_title(dataframe)
  elif body:
    return extract_id_body(dataframe)
  else:
    return extract_id_anchor_text(dataframe)

In [None]:
stemmer = PorterStemmer()
def process_text(id, text, use_stemming=False):
    """
    This function aims in:
    1. tokenize
    2. filter stopwords
    3. do stemming
    
    Parameters:
    -----------
    id: document id
    text: string , represting the text to tokenize.    
    
    Returns:
    -----------
    tuple of (id, list of tokens (e.g., list of tokens)).
    """
    if use_stemming:
      list_of_tokens = list()
      for token in RE_WORD.finditer(text.lower()):
        stemmed_token = stemmer.stem(token.group())
        if stemmed_token not in all_stopwords:
          list_of_tokens.append(stemmed_token)
    else:
      list_of_tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return id, list_of_tokens

## **Functions for creating the full index**

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

In [None]:
def word_count(id, list_of_tokens):
  ''' Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
  '''
  # YOUR CODE HERE
  return [(item[0], (id, item[1])) for item in Counter(list_of_tokens).items()]

In [None]:
class InvertedIndex(InvertedIndex):
  def reduce_word_counts(self, token_to_unsorted_pl_tuple):
    ''' Returns a sorted posting list by wiki_id.
    Parameters:
    -----------
      unsorted_pl: list of tuples
        A list of (wiki_id, tf) tuples 
    Returns:
    --------
      list of tuples
        A sorted posting list.
    '''
    # YOUR CODE HERE
    idf = math.log((self.num_of_docs / self.df[token_to_unsorted_pl_tuple[0]]), 10)
    pl = token_to_unsorted_pl_tuple[1]
    unsorted_pl = [(doc_id, freq, (freq / self.DL[doc_id]) * idf) for doc_id, freq in pl]
    sorted_pl = sorted(unsorted_pl, key= lambda x: x[2])
    return (token_to_unsorted_pl_tuple[0], [(pl[0], pl[1]) for pl in sorted_pl])

In [None]:
def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  return postings.mapValues(lambda p: len(p))

In [None]:
def partition_postings_and_write(postings, base_dir):
  ''' A function that partitions the posting lists into buckets, writes out 
  all posting lists in a bucket to disk, and returns the posting locations for 
  each bucket. Partitioning should be done through the use of `token2bucket` 
  above. Writing to disk should use the function  `write_a_posting_list`, a 
  static method implemented in inverted_index_colab.py under the InvertedIndex 
  class. 
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and 
      offsets its posting list was written to. See `write_a_posting_list` for 
      more details.
  '''
  # YOUR CODE HERE
  buckets_posting_lists = postings.map(lambda p: (token2bucket_id(p[0]), p))
  buckets_posting_lists = buckets_posting_lists.groupByKey()
  postings_locs = buckets_posting_lists.map(lambda bpl: InvertedIndex.write_a_posting_list(bpl, base_dir))
  return postings_locs

## **Putting it all together**

In [None]:
def calculate_norm_of_tokens_tfidf_in_doc(tokens_list, num_of_docs, df):
  num_of_tokens = len(tokens_list)
  counter = Counter(tokens_list)
  uniqe_tokens_set = set(tokens_list)
  sum_of_squared_scores = 0
  for token in uniqe_tokens_set:
    if token in df:
      sum_of_squared_scores += (counter[token] / num_of_tokens * math.log(num_of_docs / df[token], 10)) ** 2
  return math.sqrt(sum_of_squared_scores)

In [None]:
def create_inverted_index(dataframe, inverted_index_name, title=False, body=False, anchor=False, use_stemming=False, filter_by_minimum_df=False):
  id_to_content_rdd = extract_rdd_from_data_frame(dataframe, title, body, anchor)
  processed_id_to_content_rdd = id_to_content_rdd.map(lambda id_content_tuple: process_text(id_content_tuple[0], id_content_tuple[1], use_stemming))
  
  # Create inverted index instance
  inverted = InvertedIndex()

  DL_rdd = processed_id_to_content_rdd.map(lambda id_content_tuple: (id_content_tuple[0], len(id_content_tuple[1])))
  inverted.DL = dict(DL_rdd.collect())
  inverted.avg_dl = DL_rdd.map(lambda doc_id_to_dl: doc_id_to_dl[1]).mean()

  # word counts map
  word_counts = processed_id_to_content_rdd.flatMap(lambda x: word_count(x[0], x[1]))
  postings = word_counts.groupByKey()

  # filtering postings and calculate df
  if filter_by_minimum_df:
    postings = postings.filter(lambda x: len(x[1]) > MINIMUM_DF)
  
  # calculating df
  w2df = calculate_df(postings)
  w2df_dict = w2df.collectAsMap()

  # Add the token - df dictionary to the inverted index
  inverted.df = w2df_dict
  inverted.num_of_docs = processed_id_to_content_rdd.count()

  # sorting posting lists by tf-idf score
  postings = postings.map(lambda token_to_pl_tuple: inverted.reduce_word_counts(token_to_pl_tuple))

  # partition posting lists and write out
  posting_locs_list = partition_postings_and_write(postings, './' + inverted_index_name).collect()


  # merge the posting locations into a single dict
  super_posting_locs = defaultdict(list)
  for posting_loc in posting_locs_list:
    for k, v in posting_loc.items():
      super_posting_locs[k].extend(v)


  # Adding the posting locations dictionary to the inverted index
  inverted.posting_locs = super_posting_locs

  # creating dictionary that maps doc id to norm, to cosine calculation
  doc_id_to_norm_rdd = processed_id_to_content_rdd.map(lambda id_content_tuple: (id_content_tuple[0], calculate_norm_of_tokens_tfidf_in_doc(id_content_tuple[1], inverted.num_of_docs, inverted.df)))
  inverted.doc_id_to_norm = dict(doc_id_to_norm_rdd.collect())
  
  if title:
    id_to_title_rdd = id_to_content_rdd.map(lambda doc_id_to_content_and_title: (doc_id_to_content_and_title[0], doc_id_to_content_and_title[1]))
    inverted.doc_id_to_title = dict(id_to_title_rdd.collect())
  elif body:
    id_to_title_rdd = id_to_content_rdd.map(lambda doc_id_to_content_and_title: (doc_id_to_content_and_title[0], doc_id_to_content_and_title[2]))
    inverted.doc_id_to_title = dict(id_to_title_rdd.collect())
  
  # write the global stats out
  inverted.write_index('.', inverted_index_name)

  return inverted

# **Testing**

## **Creating indexes**

In [None]:
relevant_title_inverted_index = create_inverted_index(docs_data_frame, 'relevant_title_index', title=True)

In [None]:
relevant_body_inverted_index = create_inverted_index(docs_data_frame, 'relevant_body_index', body=True)

In [None]:
relevant_anchor_inverted_index = create_inverted_index(docs_data_frame, 'relevant_anchor_index', anchor=True)

Py4JJavaError: ignored

## **Testing search_body**

In [None]:
search_body_query_string = 'air jordan'
search_body_result = relevant_body_inverted_index.search_cossim(search_body_query_string, 100)
print(search_body_result)

[(1394509, 'Air Jordan', 0.8169141417766355), (1371219, 'Michael Jordan (disambiguation)', 0.617379499216125), (20455, 'Michael Jordan', 0.5667623326130194), (58209447, 'Air Jordan (airline)', 0.5573413013217093), (23353937, 'Michael Jordan: An American Hero', 0.5404345069176116), (28155315, 'Air Arabia Jordan', 0.4818663985661909), (3097723, 'James R. Jordan Sr.', 0.4377720569970151), (13365219, "Jordan Spiz'ike", 0.3886234691017801), (4253801, 'Jumpman (logo)', 0.3880206065496625), (50066979, 'Crying Jordan', 0.3598641218977917), (2310146, 'Michael B. Jordan', 0.32305011258296407), (7851893, 'Amman Civil Airport', 0.27486618335989477), (265033, 'Space Jam', 0.2582825737501759), (51546226, 'Mike Jordan (cornerback)', 0.2107362607973597), (62741501, 'The Last Dance (miniseries)', 0.19903474357353976), (20657443, 'Jordanian cuisine', 0.18091273147952283), (26457880, 'Air India', 0.17860361592994173), (3647739, 'Jordans', 0.17483248361646767), (3890370, 'Michael-Hakim Jordan', 0.14679765

In [None]:
search_body_query_string = 'air jordan'
search_body_result = relevant_body_inverted_index.search_cossim(search_body_query_string, 100)
print(search_body_result)

[(1394509, 'Air Jordan', 0.8169141417766355), (1371219, 'Michael Jordan (disambiguation)', 0.617379499216125), (20455, 'Michael Jordan', 0.5667623326130194), (58209447, 'Air Jordan (airline)', 0.5573413013217093), (23353937, 'Michael Jordan: An American Hero', 0.5404345069176116), (28155315, 'Air Arabia Jordan', 0.4818663985661909), (3097723, 'James R. Jordan Sr.', 0.4377720569970151), (13365219, "Jordan Spiz'ike", 0.3886234691017801), (4253801, 'Jumpman (logo)', 0.3880206065496625), (50066979, 'Crying Jordan', 0.3598641218977917), (2310146, 'Michael B. Jordan', 0.32305011258296407), (7851893, 'Amman Civil Airport', 0.27486618335989477), (265033, 'Space Jam', 0.2582825737501759), (51546226, 'Mike Jordan (cornerback)', 0.2107362607973597), (62741501, 'The Last Dance (miniseries)', 0.19903474357353976), (20657443, 'Jordanian cuisine', 0.18091273147952283), (26457880, 'Air India', 0.17860361592994173), (3647739, 'Jordans', 0.17483248361646767), (3890370, 'Michael-Hakim Jordan', 0.14679765

## **Testing search_title**

In [None]:
search_title_query = 'fifa world cup'
search_title_result = relevant_title_inverted_index.search_title(search_title_query)
print(search_title_result)

[(11370, 3), (16383, 3), (59707, 3), (168079, 3), (244862, 3), (656933, 3), (1166428, 3), (1248592, 3), (1853149, 3), (2150801, 3), (2996777, 3), (3482503, 3), (3556431, 3), (4723188, 3), (4743361, 3), (8734046, 3), (8821389, 3), (10822574, 3), (13327177, 3), (16966712, 3), (17742072, 3), (19537336, 3), (26814387, 3), (27007503, 3), (27226732, 3), (29868391, 3), (32352129, 3), (32516422, 3), (36581929, 3), (39302261, 3), (41648358, 3), (42931572, 3), (45271353, 3), (55490096, 3), (57918704, 3), (57918711, 3), (60410401, 3), (60637832, 3), (61269058, 3), (61715824, 3), (62528055, 3), (64467696, 3), (64999764, 3), (64999924, 3), (66040084, 3), (66040086, 3), (67608822, 3), (33727, 2), (183628, 2), (8258172, 2), (16842834, 2), (22230053, 2), (39812824, 2), (51765484, 2), (57240806, 2), (22888933, 1), (29384326, 1), (36827305, 1), (42072639, 1), (63946361, 1)]


In [None]:
search_title_query = 'fifa world cup'
search_title_result = relevant_title_inverted_index.search_title(search_title_query)
print(search_title_result)

[(39302261, 3), (64999924, 3), (60410401, 3), (60637832, 3), (61715824, 3), (62528055, 3), (66040084, 3), (66040086, 3), (64999764, 3), (27226732, 3), (32516422, 3), (57918704, 3), (57918711, 3), (67608822, 3), (2150801, 3), (26814387, 3), (27007503, 3), (8821389, 3), (10822574, 3), (19537336, 3), (29868391, 3), (42931572, 3), (45271353, 3), (64467696, 3), (1166428, 3), (1853149, 3), (4743361, 3), (8734046, 3), (13327177, 3), (16966712, 3), (17742072, 3), (16383, 3), (32352129, 3), (36581929, 3), (41648358, 3), (55490096, 3), (61269058, 3), (59707, 3), (168079, 3), (244862, 3), (656933, 3), (1248592, 3), (2996777, 3), (3482503, 3), (3556431, 3), (4723188, 3), (11370, 3), (57240806, 2), (8258172, 2), (51765484, 2), (183628, 2), (22230053, 2), (39812824, 2), (16842834, 2), (33727, 2), (36827305, 1), (22888933, 1), (29384326, 1), (42072639, 1), (63946361, 1)]


## **Testing search_anchor**

In [None]:
search_anchor_query = 'air jordan'
search_anchor_result = relevant_anchor_inverted_index.search_anchor(search_anchor_query)
print(search_anchor_result)

[(1394509, 2), (6612, 1), (16556, 1), (19898, 1), (20966, 1), (23146, 1), (23304, 1), (24621, 1), (25679, 1), (26328, 1), (30680, 1), (32090, 1), (37428, 1), (38173, 1), (38957, 1), (39623, 1), (46252, 1), (48563, 1), (52141, 1), (54306, 1), (58422, 1), (63325, 1), (77549, 1), (92313, 1), (107530, 1), (108346, 1), (108646, 1), (140958, 1), (144248, 1), (145623, 1), (148475, 1), (162873, 1), (164036, 1), (185235, 1), (198539, 1), (202594, 1), (202898, 1), (208463, 1), (211583, 1), (246007, 1), (250917, 1), (285974, 1), (289691, 1), (319357, 1), (342775, 1), (357156, 1), (365352, 1), (371568, 1), (377403, 1), (388176, 1), (403355, 1), (418138, 1), (420881, 1), (431130, 1), (431814, 1), (439998, 1), (443038, 1), (483279, 1), (497604, 1), (516142, 1), (551777, 1), (577762, 1), (578183, 1), (583415, 1), (585103, 1), (586049, 1), (613334, 1), (628433, 1), (661186, 1), (680175, 1), (736703, 1), (750263, 1), (754849, 1), (779466, 1), (791422, 1), (898017, 1), (959699, 1), (1021470, 1), (103104

## **Testing BM25 on body**

In [None]:
search_body_query_string = 'Marijuana'
b = 0.75
k1 = 1.25
k3 = 0
search_body_result = relevant_body_inverted_index.search_BM25(search_body_query_string, b, k1, k3, 100)
print(search_body_result)

[(19357, 'Marijuana (disambiguation)', 0.7229397096202594), (28985374, 'Medical cannabis card', 0.3799334984837808), (1227367, 'British Columbia Marijuana Party', 0.28216643778233313), (53836251, 'List of cannabis rights organizations', 0.28113152365015565), (19920359, 'Marijuana (word)', 0.21291740779565077), (52209782, 'Cannabis in Hawaii', 0.205314940994269), (383537, 'Marijuana Party (Canada)', 0.19983619331503555), (52356241, 'Cannabis in Thailand', 0.15547708592186352), (48920848, 'Cannabis in Alaska', 0.10831512131944637), (28572685, 'Cannabis in Malawi', 0.09581352613689148), (184488, 'National Organization for the Reform of Marijuana Laws', 0.09269725432961426), (44975261, 'Medical Marijuana, Inc.', 0.08495610888774935), (52184272, 'Cannabis in Minnesota', 0.07526340131299288), (20566488, 'Cannabis in the United States', 0.0625486674680209), (53871120, 'List of names for cannabis', 0.051890922831448416), (48640150, 'Minors and the legality of cannabis', 0.05133338915300779), (

In [None]:
search_body_query_string = 'marijuana'
b = 0.75
k1 = 1.25
k3 = 0
search_body_result = relevant_body_inverted_index.search_BM25(search_body_query_string, b, k1, k3, 100)
print(search_body_result)

[(19357, 'Marijuana (disambiguation)', 0.7229397096202594), (28985374, 'Medical cannabis card', 0.37993349848378083), (1227367, 'British Columbia Marijuana Party', 0.28216643778233325), (53836251, 'List of cannabis rights organizations', 0.28113152365015565), (19920359, 'Marijuana (word)', 0.21291740779565085), (52209782, 'Cannabis in Hawaii', 0.20531494099426903), (383537, 'Marijuana Party (Canada)', 0.1998361933150356), (52356241, 'Cannabis in Thailand', 0.15547708592186352), (48920848, 'Cannabis in Alaska', 0.1083151213194464), (28572685, 'Cannabis in Malawi', 0.0958135261368915), (184488, 'National Organization for the Reform of Marijuana Laws', 0.09269725432961429), (44975261, 'Medical Marijuana, Inc.', 0.08495610888774936), (52184272, 'Cannabis in Minnesota', 0.07526340131299289), (20566488, 'Cannabis in the United States', 0.06254866746802093), (53871120, 'List of names for cannabis', 0.05189092283144842), (48640150, 'Minors and the legality of cannabis', 0.0513333891530078), (5

# **Page Rank & Page View**

In [None]:
# with open(PAGE_RANKS_PATH, 'rb') as f:
#   doc_id_to_pagerank_dict = pickle.loads(f.read())

In [None]:
def get_pagerank():
    ''' Returns PageRank values for a list of provided wiki article IDs. 

        Test this by issuing a POST request to a URL like:
          http://YOUR_SERVER_DOMAIN/get_pagerank
        with a json payload of the list of article ids. In python do:
          import requests
          requests.post('http://YOUR_SERVER_DOMAIN/get_pagerank', json=[1,5,8])
        As before YOUR_SERVER_DOMAIN is something like XXXX-XX-XX-XX-XX.ngrok.io
        if you're using ngrok on Colab or your external IP on GCP.
    Returns:
    --------
        list of floats:
          list of PageRank scores that correrspond to the provided article IDs.
    '''
    res = []
    wiki_ids = request.get_json()
    if len(wiki_ids) == 0:
      return jsonify(res)
    # BEGIN SOLUTION
    res = [doc_id_to_pagerank_dict[wiki_id] for wiki_id in wiki_ids]
    # END SOLUTION
    return jsonify(res)

In [None]:
# with open(PAGE_VIEWS_PATH, 'rb') as f:
#   doc_id_to_pageview_dict = pickle.loads(f.read())

In [None]:
def get_pageview():
    ''' Returns the number of page views that each of the provide wiki articles
        had in August 2021.

        Test this by issuing a POST request to a URL like:
          http://YOUR_SERVER_DOMAIN/get_pageview
        with a json payload of the list of article ids. In python do:
          import requests
          requests.post('http://YOUR_SERVER_DOMAIN/get_pageview', json=[1,5,8])
        As before YOUR_SERVER_DOMAIN is something like XXXX-XX-XX-XX-XX.ngrok.io
        if you're using ngrok on Colab or your external IP on GCP.
    Returns:
    --------
        list of ints:
          list of page view numbers from August 2021 that correrspond to the 
          provided list article IDs.
    '''
    res = []
    wiki_ids = request.get_json()
    if len(wiki_ids) == 0:
      return jsonify(res)
    # BEGIN SOLUTION
    res = [doc_id_to_pageview_dict[wiki_id] for wiki_id in wiki_ids]
    # END SOLUTION
    return jsonify(res)

# **Generating different IRs**

In [None]:
def search_over_body(query_to_search, cosine=False, BM25=False):
  if cosine:
    return relevant_body_inverted_index.calculate_cossim_by_weights_of_tfidf(query_to_search)
  if BM25:
    return relevant_body_inverted_index.generate_doc_id_to_BM25_score_rdd(query_to_search)

In [None]:
def search_over_title(query_to_search, cosine=False, BM25=False, boolean=False):
  if cosine:
    return relevant_title_inverted_index.calculate_cossim_by_weights_of_tfidf(query_to_search)
  if BM25:
    return relevant_title_inverted_index.generate_doc_id_to_BM25_score_rdd(query_to_search)
  if boolean:
    return relevant_title_inverted_index.rank_by_boolean_scores(query_to_search)
  

In [None]:
def search_over_anchor(query_to_search, cosine=False, BM25=False, boolean=False):
  pass

In [None]:
def search(query_to_search, func_title, w_title, func_body ,w_body, func_anchor, w_anchor, w_pagerank):
  title_scores_rdd = search_over_title(query_to_search, *func_title)
  body_scores_rdd = search_over_body(query_to_search, *func_body)
  anchor_scores_rdd = search_over_anchor(query_to_search, *func_anchor)

  combination_rdd = title_scores_rdd.union(body_scores_rdd).union(anchor_scores_rdd)
  weighted_title_body_anchor_score_rdd = combination_rdd.map(lambda doc_id_to_scores_list: (doc_id_to_scores_list[0], (w_title * doc_id_to_scores_list[1][0]) + (w_body * doc_id_to_scores_list[1][1]) + (w_anchor * doc_id_to_scores_list[1][2])))
  weighted_score_with_pagerank_rdd = weighted_title_body_anchor_score_rdd.map(lambda doc_id_to_score: (doc_id_to_score[0], (w_pagerank * get_pagerank(doc_id_to_score[0])) + ((1 - w_pagerank) * doc_id_to_score[1])))
  

In [None]:
def generate_weights_and_search(query_to_search, offset=0.1):
  results_dict = {}
  for b in range(0, 11):
    for t in range(0, 11):
        for a in range(0, 11):
            for pr in range(0, 11):
                if b + t + a + pr == 10:
                    weights = (b/10, t/10, a/10, pr/10)
                    results_dict[weights] = search(query_to_search, *weights)

# **Recall & Precision**

In [None]:
nirs_queries_to_results_first_20 = {"best marvel movie": [60283633, 61073786, 5676692, 56289553, 60774345, 27306717, 61592102, 42163310, 60952488, 36439749, 48530084, 10589717, 29129051, 59892, 612052, 44254295, 878659, 54653881, 51430647, 66111204, 22114132, 55935213, 41677925, 17296107, 61651800, 9110929, 67055, 37497391, 60744481, 65464184, 41974496, 60616450, 60463979, 65967176, 57069491, 46208997, 22144990, 62372638, 1074657, 44240443, 33463661, 41974555, 43603241, 33700618, 5027882, 66423851, 60754840],\
                                    "How do kids come to world?": [15474, 1357127, 636806, 43033258, 6271835, 56480301, 23133297, 615418, 73165, 24470328, 1833777, 1380383, 79449, 4827661, 387703, 18863597, 36827305, 494299, 194687, 5591344, 48490547, 634139, 42072639, 44311171, 29384326, 1908019, 296627, 11263877, 101942, 2045465, 56921904, 128987, 22888933, 1072968, 25490788, 83449, 884998, 1151454, 30640885, 35072597, 2535885, 30861, 51046955, 13603, 3060346, 88380, 19698110, 72214, 6236554, 46105],\
                                    "Information retrieval": [1897206, 10179411, 25130414, 5818361, 1185840, 20948989, 48317971, 509628, 494528, 11486091, 50716473, 24963841, 296950, 35804330, 261193, 15271, 39000674, 19988623, 38156944, 36794719, 731640, 14109784, 10328235, 25935906, 16635934, 33407925, 743971, 3781784, 14343887, 57312392, 24997830, 442684, 7872152, 14473878, 25959000, 9511414],\
                                    "LinkedIn": [3591502, 55679006, 970755, 36070366, 63641225, 41726116, 51562019, 35549457, 21179478, 62976368, 27769500, 57147095, 31403505, 22291643, 50191962],\
                                    "How to make coffee?": [4506407, 321546, 37249793, 17668101, 26731675, 6887661, 1566948, 5612891, 211895, 68117784, 4604645, 47660, 3757402, 273707, 8866584, 5964683, 49099835, 28890200, 53151326, 300805, 1623162, 3775558, 273700, 667037, 5212064, 6826364, 63534797, 54459918, 604727, 30860428, 2461806, 3639440, 2929216, 12343966, 408360, 63520964, 838057, 6332026, 19619306, 215424, 482824, 38579961, 8728856, 2165666, 3785715, 366244, 1646753, 31824340],\
                                    "Ritalin": [649100, 8802530, 6428730, 608718, 13594085, 66391, 25164479, 24754461, 22611786, 964614, 7432624, 5721484, 57068567, 1333695, 4387617, 463961, 23891416, 56961277, 47956615, 4726434, 52780757, 50762105, 40542151, 1186041, 10671710, 7594242, 57762, 2580091, 159284, 2495940, 6281833, 45690249, 1546447, 32325617, 205878, 1790029, 5497377],\
                                    "How to make wine at home?": [373172, 3602925, 20790067, 223834, 15468138, 3398365, 61014433, 19600890, 927688, 146918, 22216378, 1417287, 13824744, 57098, 3276784, 466664, 41337483, 1031040, 36029170, 29324283, 26924822, 31505523, 13532634, 4378282, 1045027, 1455948, 14825456, 485220, 37468361, 1041458, 8177057, 2866516, 31704630, 21991369, 4554556, 713636, 8608425, 20810258, 22777652, 1039412, 32961, 8778890, 683094, 19561784, 6032951, 10998, 5222577, 7414829, 20185928, 8318345],\
                                    "Most expensive city in the world": [33508970, 3602421, 94167, 24724090, 30057, 220886, 31453, 19058, 31326350, 32706, 645042, 3928523, 18402, 34374079, 522934, 13476079, 2376810, 36511, 172538, 15218891, 390875, 22309, 12521, 65708464, 1664254, 35368654, 19004, 309890, 27862, 27318, 45470, 10992, 53446, 19261, 19189, 3848717, 11947794, 49749249, 7780, 14900757, 9299090, 26976, 49728, 63946361, 302201],\
                                    "India": [141896, 14745, 24452, 265059, 14597, 13890, 42737, 2377, 1186115, 6825785, 26457880, 1472206, 17359901, 37756, 53707, 315776, 4208015, 295335, 14598, 1996872, 764545, 1108803, 3574003, 678583, 7564733, 37534, 2198463, 720414, 6622547, 1683930, 231623, 17774253, 14533, 19189, 275047, 20611562, 43281, 17719886, 10710364, 5864614, 3315459, 14580, 47905, 3799826, 553883, 375986, 408215],\
                                    "how to make money fast?": [67987778, 12789839, 5624681, 44379765, 400777, 47720307, 45332, 1531043, 48732, 7322279, 51895777, 65228, 60739751, 21175589, 846772, 9833167, 22226313, 63809606, 35666788, 1527716, 4416646, 23830729, 264058, 32595633, 1335238, 12020461, 1793651, 1370831, 63121, 2913859, 42994, 4090453, 17418777, 5145001, 43250171, 8957449, 43030666, 473309, 624998, 7555986, 22156522, 13681, 29681566, 17362858, 19390, 407288, 1276547, 2763667],\
                                    "Netflix": [65595607, 34075129, 50602056, 65741484, 32670973, 61972257, 66174045, 47048067, 49016960, 63732884, 175537, 56312051, 65073808, 59629338, 54671372, 56312054, 50276542, 57041239, 66422422, 67450679, 66299065, 9399111, 50137861, 40030145],\
                                    "Apple computer": [254496, 50865995, 5285468, 5653238, 3356874, 345676, 2275, 4478297, 2593693, 3608414, 18640, 248101, 15183570, 20647724, 1159939, 17826747, 619983, 856, 46728817, 2116, 1492625, 77118, 32327247, 15357987, 400593, 17997437, 1005263, 345354, 2020710, 660310, 1344, 19006979, 15295713, 2786155, 2117, 21694, 233780, 5078775, 73262, 21347643, 27848, 548115],\
                                    "The Simpsons": [19293758, 1424178, 74813, 1625137, 34519668, 4939408, 11028525, 49387265, 4939471, 292279, 60534017, 9306179, 33350134, 4939519, 1466966, 4939306, 4939444, 140332, 4939501, 29838, 5451605, 19266557, 3038969, 14040227, 4939334, 188572, 10765975, 22423628, 4776930],\
                                    "World cup": [32516422, 42931572, 2996777, 33727, 183628, 60637832, 8821389, 16842834, 22230053, 1166428, 29868391, 64467696, 4743361, 13327177, 61269058, 26814387, 62528055, 10822574, 3482503, 36581929, 8258172, 16966712, 39302261, 244862, 67608822, 1853149, 39812824, 55490096, 2150801, 8734046, 32352129, 16383, 59707, 19537336, 3556431, 17742072, 11370, 656933, 168079, 41648358, 4723188, 1248592],\
                                    "How to lose weight?": [400199, 1151047, 791546, 67730903, 27300359, 84252, 26639763, 8581665, 1148926, 64543917, 6319249, 2029766, 56885915, 11665493, 1958879, 28396636, 56435, 2883760, 31429041, 32051848, 277790, 11884255, 49051658, 1017976, 42528947, 1149933, 65004286, 4748844, 44442017, 35281209, 40925771, 30687447, 11249433, 45280337, 17659030, 8460, 3549164, 727293, 28541957, 12523816, 33825347, 18168862, 9972157, 410007, 27148738],\
                                    "Java": [1179384, 17521476, 5516020, 5863400, 15628, 4093054, 135063, 663788, 9845, 1455590, 3901428, 731735, 1079500, 24920873, 11125049, 7955681, 38321273, 456722, 15881, 16389, 26257672, 43284, 651278, 127604, 43826, 314356, 53078721, 611589, 1131136, 230828, 417018, 42870, 69336, 4718446, 1414212, 7811267, 42871, 40659966, 13593, 1326984, 453584, 320443, 30120784, 7771171, 269441, 4294832],\
                                    "Air Jordan": [3647739, 3890370, 6722408, 105344, 18998781, 1371219, 60601430, 7851893, 28155315, 1394509, 4253801, 36916362, 265033, 23353937, 13365219, 20455, 3097723, 50066979, 51546226, 2310146, 67838974, 9998569, 62741501, 58209447],\
                                    "how to deal with depression?": [2721889, 13190302, 63499429, 16360289, 39218436, 33310173, 2367697, 57688, 20529621, 4041101, 49233423, 2685269, 840273, 25258288, 43600438, 60611538, 19283335, 18550003, 33255495, 19356, 60457349, 2891701, 66811, 34753948, 43875835, 42730418, 717119, 1295947, 18176448, 2353519, 1879108, 14325087, 3440273, 175357, 16407460, 3762294, 4531, 19064282, 52316, 8389, 255475, 341658, 20448627, 22481627, 21211994, 5144613, 30846934, 1500618, 234796],\
                                    "How do you make gold": [323246, 5580137, 1686492, 1385632, 23290471, 6890967, 15739, 39740796, 62929, 1020809, 251087, 6109962, 6996576, 402244, 2015573, 20063724, 1230653, 180211, 7133952, 23324, 12240, 1291393, 3519942, 12095348, 44712684, 27119, 886856, 18300514, 25918508, 37412, 2526649, 39639653, 390698, 1356272, 10865561, 1386629, 5024105, 3706246, 67110306, 2732267, 15457257, 56226, 19074264, 63280480, 1581831, 45756, 2927992, 27345986, 152176],\
                                    "Marijuana": [60920, 52227830, 22707918, 4512923, 68188835, 28985374, 31188467, 52184272, 52209782, 27202445, 20481920, 1481886, 19920359, 2331004, 19357, 44975261, 145891, 28572685, 20566488, 37646421, 383537, 20866399, 53836251, 150113, 53871120, 19760623, 3045683, 8596369, 1227367, 168917, 14942276, 48640150, 52342272, 52356241, 56078060, 38310, 175440, 53897655, 52228042, 52183794, 11164587, 168915, 48920848, 47227709, 23154203, 184488]}

In [None]:
def calculate_map_40_over_nirs_first_20_queries(title=False, body=False, anchor=False, cossim=False, BM25=False):

  if title:
    inverted_index = relevant_title_inverted_index
  elif body:
    inverted_index = relevant_body_inverted_index
  else:
    inverted_index = relevant_anchor_inverted_index

  sum_of_average_precisions = 0
  for query, results_list in nirs_queries_to_results_first_20.items():
    if cossim:
      top_40_results = inverted_index.search_cossim(query, 40)
    else:
      top_40_results = inverted_index.search_BM25(query, 0.75, 1.25, 0, 40)

    num_of_hits = 0
    sum_of_precisions = 0
    for index, doc_id, doc_title, doc_score in [(i+1, *elements) for i, elements in enumerate(top_40_results)]:
      if doc_id in results_list:
        num_of_hits += 1
        precision = num_of_hits / index
        sum_of_precisions += precision
    average_precision = sum_of_precisions / num_of_hits
    print('Average Precision for the query ',query , average_precision)
    sum_of_average_precisions += average_precision
  return sum_of_average_precisions / 20

In [None]:
body_BM25_map_40_result = calculate_map_40_over_nirs_first_20_queries(body=True, BM25=True)
print()
print(body_BM25_map_40_result)

Average Precision for the query  best marvel movie 0.6901217150676671
Average Precision for the query  How do kids come to world? 0.6001606978879704
Average Precision for the query  Information retrieval 0.9858512594764861
Average Precision for the query  LinkedIn 1.0
Average Precision for the query  How to make coffee? 1.0
Average Precision for the query  Ritalin 0.9847558201901625
Average Precision for the query  How to make wine at home? 0.9531142810529131
Average Precision for the query  Most expensive city in the world 0.3939060200774049
Average Precision for the query  India 0.7474784648030819
Average Precision for the query  how to make money fast? 0.9464357751028148
Average Precision for the query  Netflix 0.7517846715607228
Average Precision for the query  Apple computer 0.8712914157239983
Average Precision for the query  The Simpsons 0.9522760754595276
Average Precision for the query  World cup 0.655255453245081
Average Precision for the query  How to lose weight? 0.971272134

In [None]:
body_BM25_map_40_result = calculate_map_40_over_nirs_first_20_queries(body=True, BM25=True)
print()
print(body_BM25_map_40_result)

Average Precision for the query  best marvel movie 0.6901217150676671
Average Precision for the query  How do kids come to world? 0.6001606978879704
Average Precision for the query  Information retrieval 0.9858512594764861
Average Precision for the query  LinkedIn 1.0
Average Precision for the query  How to make coffee? 1.0
Average Precision for the query  Ritalin 0.9847558201901625
Average Precision for the query  How to make wine at home? 0.9531142810529131
Average Precision for the query  Most expensive city in the world 0.3939060200774049
Average Precision for the query  India 0.7474784648030819
Average Precision for the query  how to make money fast? 0.9464357751028148
Average Precision for the query  Netflix 0.7517846715607228
Average Precision for the query  Apple computer 0.8712914157239983
Average Precision for the query  The Simpsons 0.9522760754595276
Average Precision for the query  World cup 0.655255453245081
Average Precision for the query  How to lose weight? 0.971272134

In [None]:
body_cossim_map_40_result = calculate_map_40_over_nirs_first_20_queries(body=True, cossim=True)
print()
print(body_cossim_map_40_result)

Average Precision for the query  best marvel movie 0.9677265206651526
Average Precision for the query  How do kids come to world? 0.8300897606109894
Average Precision for the query  Information retrieval 0.9927625751155161
Average Precision for the query  LinkedIn 1.0
Average Precision for the query  How to make coffee? 1.0
Average Precision for the query  Ritalin 0.9922829421909688
Average Precision for the query  How to make wine at home? 1.0
Average Precision for the query  Most expensive city in the world 0.6202262278270324
Average Precision for the query  India 0.9993589743589744
Average Precision for the query  how to make money fast? 0.9793869243773587
Average Precision for the query  Netflix 0.8708298974906481
Average Precision for the query  Apple computer 1.0
Average Precision for the query  The Simpsons 0.9942449282271306
Average Precision for the query  World cup 0.9054325595965992
Average Precision for the query  How to lose weight? 0.9979214778383753
Average Precision for

In [None]:
body_cossim_map_40_result = calculate_map_40_over_nirs_first_20_queries(body=True, cossim=True)
print()
print(body_cossim_map_40_result)