### **Inverted Index :**

## **Imports and setup :**

In [None]:
!gcloud dataproc clusters list --region us-central1
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes
!pip install nltk==3.7

NAME        PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
ir-cluster  GCE       4                                             RUNNING  us-central1-a
[0m

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import numpy as np
from google.cloud import storage
import math
import hashlib
import builtins

def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  6 07:30 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Define the bucket name and set up paths for data retrieval
bucket_name = '318940913'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if not b.name.startswith('Postings_title/') and not b.name.startswith('Postings_body/') and not b.name.startswith('Dict_folder/') and b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

## **Inverted Index Building :**

In [None]:
# Read Parquet file from the specified paths
parquetFile = spark.read.parquet(*paths)

                                                                                

In [None]:
# Check if the file inverted_index_gcp.py exists in the home directory
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import *

In [None]:
# Extract text, title, anchor and document ID pairs from the Parquet file
doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd
doc_anchor_pairs = parquetFile.select("anchor_text").rdd



## **Helper Functions :**

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

def tokenize(query):
  # filtered_query = filter_the_text(query)
  stemmer = PorterStemmer()
  return [stemmer.stem(token.group()) for token in RE_WORD.finditer(query.lower()) if token.group() not in all_stopwords]

def tokenize_without_stem(query):
    """Tokenize the input query without stemming.
    Args:
    - query (str): The input query.
    Returns:
    - list of str: A list of tokens after tokenization without stemming.
    """
    return [token.group() for token in RE_WORD.finditer(query.lower()) if token.group() not in all_stopwords]



NUM_BUCKETS = 124
def token2bucket_id(token):
  """
    Map a token to a bucket ID for partitioning postings.
    Parameters:
        token (str): The input token.
    Returns:
        int: The bucket ID for the token.
  """
  return int(_hash(token),16) % NUM_BUCKETS

def anchor_to_text(item):
  # Merge all the references to one list
    lst = []
    for i in item[0]:
        lst.append((i[0],i[1]))
    return lst

def word_count(text, id):
  """
    Count the occurrences of words in the given text.
    Parameters:
        text (str): The input text.
        id (str): The ID associated with the text.
    Returns:
        list: A list of tuples containing (word, (document_id, term_frequency)).
  """
  word_counts_map = Counter(tokenize(text))
  # Filter out stopwords and create the list of tuples
  result = [(token, (id, word_counts_map[token])) for token in word_counts_map]
  return result

def word_count_without_stem(text, id):
  """
    Count the occurrences of words in the given text.
    Parameters:
        text (str): The input text.
        id (str): The ID associated with the text.
    Returns:
        list: A list of tuples containing (word, (document_id, term_frequency)).
  """
  word_counts_map = Counter(tokenize_without_stem(text))
  # Filter out stopwords and create the list of tuples
  result = [(token, (id, word_counts_map[token])) for token in word_counts_map]
  return result

def word_count_ngrams(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    tokens = [stemmer.stem(t) for t  in tokens if t not in all_stopwords]
    tokens = ngrams(tokens,2)
    fin_tokens=[]
    for tup in tokens:
        fin_tokens.append(tup[0]+" "+tup[1])

    lst = []
    c = Counter(fin_tokens)

    for t in c:
        lst.append((t, (id, c[t])))

    return lst

def reduce_word_counts(unsorted_pl):
  """
    Reduce unsorted posting lists by sorting them.
    Parameters:
        unsorted_pl (list): Unsorted posting lists.
    Returns:
        list: Sorted posting lists.
  """
  return sorted(unsorted_pl)

def calculate_df(postings):
  """
    Calculate the document frequency (DF) for each token in the postings.
    Parameters:
        postings (RDD): RDD containing posting lists for tokens.
    Returns:
        RDD: RDD containing tuples of token and its document frequency.
  """
  token_df = postings.map(lambda token_tuple: (token_tuple[0], len(token_tuple[1])))
  return token_df

def partition_postings_and_write(postings,folder_name,bucket_name):
  """
    Partition postings and write them to storage.
    Parameters:
        postings (RDD): RDD containing posting lists.
        bucket_name (str): Name of the storage bucket.
        folder_name (str): Name of the folder within the storage bucket.
    Returns:
        RDD: Partitioned postings with bucket IDs.
  """
  partition_postings = postings.map(lambda token_tuple: (token2bucket_id(token_tuple[0]), token_tuple))
  return partition_postings.groupByKey().map(lambda token_tuple: InvertedIndex.write_a_posting_list(token_tuple,folder_name, bucket_name))

## **Inverted Index for the Title**:


In [None]:
def Create_inverted_index_title(data):
  """
    Create an inverted index for document retrieval based on the document titles.

    Parameters:
        data (RDD): RDD containing document data, where each element is a tuple (document_id, title_text).
        index_name (str): Name of the index file to be created.
        bucket_name (str): Name of the storage bucket where the index will be stored.
        folder_name (str): Name of the folder within the storage bucket where index files will be stored.

    Returns:
        InvertedIndex: An inverted index object containing the constructed index.
  """
  #count words
  word_counts = data.flatMap(lambda x: word_count(x[0], x[1]))
  #posting list
  postings = word_counts.groupByKey().mapValues(reduce_word_counts)
  #df calc
  w2df_t = calculate_df(postings)
  w2df_dict_t = w2df_t.collectAsMap()
  # write the index
  _ = partition_postings_and_write(postings,"Postings_title",bucket_name).collect()
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix="Postings_title"):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
      for k, v in posting_locs.items():
        super_posting_locs[k].extend(v)
  inverted_index_title = InvertedIndex()
  inverted_index_title.posting_locs = super_posting_locs
  inverted_index_title.df = w2df_dict_t
  inverted_index_title.write_index('.', "Title_Inverted_Index")
  return inverted_index_title

In [None]:
# Create an inverted index for document titles
inverted_index_title=Create_inverted_index_title(doc_title_pairs)
print("Building the index for title completed successfully")

In [None]:
## upload to gs
# Define the source path of the index file
index_src = "Title_Inverted_Index.pkl"
# Define the destination path where the index file will be copied
index_dst = f'gs://{bucket_name}/Postings_title/{index_src}'
# Use the gsutil command-line tool to copy the index file from the source path to the destination path
!gsutil cp $index_src $index_dst
# Use the gsutil command-line tool to list detailed information about the copied index file
!gsutil ls -lh $index_dst

## **Inverted Index for the Title Without Stemming**:


In [None]:
def Create_inverted_index_title_without_stem(data):
  """
    Create an inverted index for document retrieval based on the document titles.

    Parameters:
        data (RDD): RDD containing document data, where each element is a tuple (document_id, title_text).
        index_name (str): Name of the index file to be created.
        bucket_name (str): Name of the storage bucket where the index will be stored.
        folder_name (str): Name of the folder within the storage bucket where index files will be stored.

    Returns:
        InvertedIndex: An inverted index object containing the constructed index.
  """
  #count words
  word_counts = data.flatMap(lambda x: word_count_without_stem(x[0], x[1]))
  #posting list
  postings = word_counts.groupByKey().mapValues(reduce_word_counts)
  #df calc
  w2df_t = calculate_df(postings)
  w2df_dict_t = w2df_t.collectAsMap()
  # write the index
  _ = partition_postings_and_write(postings,"Postings_title_without_stem",bucket_name).collect()
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix="Postings_title_without_stem"):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
      for k, v in posting_locs.items():
        super_posting_locs[k].extend(v)
  inverted_index_title = InvertedIndex()
  inverted_index_title.posting_locs = super_posting_locs
  inverted_index_title.df = w2df_dict_t
  inverted_index_title.write_index('.', "Title_Without_Stem_Inverted_Index")
  return inverted_index_title

In [None]:
# Create an inverted index for document titles
inverted_index_title_without_stem=Create_inverted_index_title_without_ste(doc_title_pairs)
print("Building the index for title completed successfully")

In [None]:
## upload to gs
# Define the source path of the index file
index_src = "Title_Without_Stem_Inverted_Index.pkl"
# Define the destination path where the index file will be copied
index_dst = f'gs://{bucket_name}/Postings_title_without_stem/{index_src}'
# Use the gsutil command-line tool to copy the index file from the source path to the destination path
!gsutil cp $index_src $index_dst
# Use the gsutil command-line tool to list detailed information about the copied index file
!gsutil ls -lh $index_dst

## **Inverted Index for the Body**:


In [None]:
def Create_inverted_index_body(data):
  """
    Create an inverted index using the BM25 algorithm for document retrieval.

    Parameters:
        data (RDD): RDD containing document data.
        index_name (str): Name of the index file.
        bucket_name (str): Name of the storage bucket.
        folder_name (str): Name of the folder in the storage bucket.

    Returns:
        InvertedIndex: An inverted index object containing the constructed index.
  """
  #count words
  word_counts = data.flatMap(lambda x: word_count(x[0], x[1]))
  #posting list
  postings_text = word_counts.groupByKey().mapValues(reduce_word_counts)
  postings_filtered = postings_text.filter(lambda x: len(x[1])>50)
  #df calc
  w2df = calculate_df(postings_filtered)
  w2df_dict = w2df.collectAsMap()
  # write the index
  _ = partition_postings_and_write(postings_filtered,"Postings_body",bucket_name).collect()
  # posting_locs_list_text = partition_postings_and_write(postings_filtered,"BodyBins").collect()
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix="Postings_body"):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
      for k, v in posting_locs.items():
        super_posting_locs[k].extend(v)
  # update fields
  inverted_index_body = InvertedIndex()
  inverted_index_body.posting_locs = super_posting_locs
  inverted_index_body.df = w2df_dict
  inverted_index_body.write_index('.', "Body_Inverted_Index")
  return inverted_index_body

In [None]:
# Create an inverted index for document titles using the BM25 algorithm
inverted_index_body=Create_inverted_index_body(doc_text_pairs)
print("Building the index for body completed successfully")

In [None]:
## upload to gs
# Define the source path of the index file
index_src = "Body_Inverted_Index.pkl"
# Define the destination path where the index file will be copied
index_dst = f'gs://{bucket_name}/Postings_body/{index_src}'
# Use the gsutil command-line tool to copy the index file from the source path to the destination path
!gsutil cp $index_src $index_dst
# Use the gsutil command-line tool to list detailed information about the copied index file
!gsutil ls -lh $index_dst


# **Inverted Index for the Body With N-Grams**:


In [None]:
def Create_inverted_index_body_ngrams(data):
  #count words
  word_counts = data.flatMap(lambda x: word_count_ngrams(x[0], x[1]))
  postings_text = word_counts.groupByKey().mapValues(reduce_word_counts)
  postings_filtered = postings_text.filter(lambda x: len(x[1])>50)
  #df calc
  w2df = calculate_df(postings_filtered)
  w2df_dict = w2df.collectAsMap()
  # write the index
  _ = partition_postings_and_write(postings_filtered,"Postings_body_Ngrams",bucket_name).collect()
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix="Postings_body_Ngrams"):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
      for k, v in posting_locs.items():
        super_posting_locs[k].extend(v)
  # update fields
  inverted_index_body_ngrams = InvertedIndex()
  inverted_index_body_ngrams.posting_locs = super_posting_locs
  inverted_index_body_ngrams.df = w2df_dict
  inverted_index_body_ngrams.write_index('.', "Body_Inverted_Index_Ngrams")
  return inverted_index_body_ngrams

In [None]:
# Create an inverted index for body with ngrams
inverted_index_body_ngrams=Create_inverted_index_body_ngrams(doc_text_pairs)
print("Building the index for body with ngrams completed successfully")

In [None]:
## upload to gs
# Define the source path of the index file
index_src = "Body_Inverted_Index_Ngrams.pkl"
# Define the destination path where the index file will be copied
index_dst = f'gs://{bucket_name}/Postings_body_Ngrams/{index_src}'
# Use the gsutil command-line tool to copy the index file from the source path to the destination path
!gsutil cp $index_src $index_dst
# Use the gsutil command-line tool to list detailed information about the copied index file
!gsutil ls -lh $index_dst

# **Inverted Index for the Anchor**:


In [None]:
## BODY
def Create_inverted_index_anchor(data):
  """
    Create an inverted index for document retrieval based on the document anchor text.

    Parameters:
        data (RDD): RDD containing document data, where each element is a tuple (document_id, title_text).
        index_name (str): Name of the index file to be created.
        bucket_name (str): Name of the storage bucket where the index will be stored.
        folder_name (str): Name of the folder within the storage bucket where index files will be stored.

    Returns:
        InvertedIndex: An inverted index object containing the constructed index.
  """
  #count words
  word_counts = data.flatMap(lambda x: word_count(x[0], x[1]))
  #posting list
  postings_text = word_counts.groupByKey().mapValues(reduce_word_counts)
  postings_filtered = postings_text.filter(lambda x: len(x[1])>50)
  #df calc
  w2df = calculate_df(postings_filtered)
  w2df_dict = w2df.collectAsMap()
  # write the index
  _ = partition_postings_and_write(postings_filtered,"Postings_anchor",bucket_name).collect()
  # posting_locs_list_text = partition_postings_and_write(postings_filtered,"BodyBins").collect()
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix="Postings_anchor"):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
      for k, v in posting_locs.items():
        super_posting_locs[k].extend(v)
  # update fields
  inverted_index_anchor = InvertedIndex()
  inverted_index_anchor.posting_locs = super_posting_locs
  inverted_index_anchor.df = w2df_dict
  inverted_index_anchor.write_index('.', "Anchor_Inverted_Index")
  return inverted_index_anchor

In [None]:
# Create an inverted index for Anchor
# Transformations to create anchor_text_pairs RDD
anchor_text_pairs = (
    doc_anchor_pairs
    .flatMap(anchor_to_text)  # Convert anchor-doc pairs to list of (anchor, doc) tuples
    .distinct()               # Remove duplicate (anchor, doc) tuples
    .groupByKey()             # Group tuples by anchor
    .mapValues(list)          # Convert values (iterator) to lists
)

# Further transformation to create anchor_text RDD
anchor_text = (anchor_text_pairs.map(lambda x: (" ".join(x[1]), x[0])))  # Concatenate anchors and map to (text, anchor) tuples
inverted_index_anchor=Create_inverted_index_anchor(anchor_text)
print("Building the index for anchor completed successfully")

In [None]:
## upload to gs
# Define the source path of the index file
index_src = "Anchor_Inverted_Index.pkl"
# Define the destination path where the index file will be copied
index_dst = f'gs://{bucket_name}/Postings_anchor/{index_src}'
# Use the gsutil command-line tool to copy the index file from the source path to the destination path
!gsutil cp $index_src $index_dst
# Use the gsutil command-line tool to list detailed information about the copied index file
!gsutil ls -lh $index_dst