# project: index creation and search on small data - version 1

# Setup

In [50]:
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

from inverted_index_gcp import *

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Installing, importing, and initializing PySpark


The following code installs PySpark and its dependencies in Colab. In addition, we install GraphFrames, which is a PySpark package for dealing with graphs in a distributed fashion. Colab notebooks run on a single machine so we will work in local mode, i.e. there is no cluster of machines and both the master and worker processes run on a single machine. This will help us debug and iron out the code we will use in the second half of this assignment on an actual cluster mode in GCP.

The installation in the next cell should take about 1 minute in a fresh environment. Don't worry about going  over the 90 seconds limit for the assignment because in our testing enviroment all of these requirements will already be met.

In [51]:
# These will already be installed in the testing environment so disregard the
# amount of time (~1 minute) it takes to install.
!pip install -q pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install -q graphframes
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
graphframes_jar = 'https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar'
spark_jars = '/usr/local/lib/python3.8/dist-packages/pyspark/jars'
!wget -N -P $spark_jars $graphframes_jar

openjdk-8-jdk-headless is already the newest version (8u392-ga-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.
--2024-02-29 16:08:00--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 18.173.166.39, 18.173.166.6, 18.173.166.12, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|18.173.166.39|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘/usr/local/lib/python3.8/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar’ not modified on server. Omitting download.



In [52]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from graphframes import *

In [53]:
# Initializing spark context
# create a spark context and session
conf = SparkConf().set("spark.ui.port", "4050")
conf.set("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
sc = pyspark.SparkContext(conf=conf)
sc.addPyFile(str(Path(spark_jars) / Path(graphframes_jar).name))
spark = SparkSession.builder.getOrCreate()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=pyspark-shell, master=local[*]) created by __init__ at <ipython-input-7-e586420c6456>:5 

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [None]:
spark

## Copy some wiki data

As mentioned above, all wiki dumps were preprocessed and placed in a shared google storage bucket. To access the files in Colab, you will first have to authenticate with google storage, and then copy the data to your local environment.

**IMPORTANT NOTE**

Before you start working with the wiki data, you first have to go over the
"Working with GCP.pdf" Provided to you under the gcp folder in the same zip file as this folder. In that pdf you will redeem your credits for the GCP and create your instance. You have to do this procedure because the files will be mounted to you through GCP process.
Notice that you have to change the project_id variable in the second cell below.

In GCP, the storage will already be mounted on the cluster machines and we will show you how to access it.


In [None]:
# Authenticate your user
# The authentication should be done with the email connected to your GCP account
from google.colab import auth
import signal

AUTH_TIMEOUT = 300000

def handler(signum, frame):
  raise Exception("Authentication timeout!")

signal.signal(signal.SIGALRM, handler)
signal.alarm(AUTH_TIMEOUT)

try:
   auth.authenticate_user()
except:
   pass

In [None]:
# Copy one wikidumps files
import os
from pathlib import Path
from google.colab import auth
## RENAME the project_id to yours project id from the project you created in GCP
project_id = 'mapreducepagerank'
!gcloud config set project {project_id}

data_bucket_name = 'wikidata20210801_preprocessed'
try:
    if os.environ["wikidata_preprocessed"] is not None:
        pass
except:
      !mkdir wikidumps
      !gsutil -u {project_id} cp gs://{data_bucket_name}/multistream1_preprocessed.parquet "wikidumps/"


# Processing wikipedia

Let's look at our data before transforming it to RDD.

In [None]:
from pathlib import Path
import os

try:
    if os.environ["wikidata_preprocessed"] is not None:
      path = os.environ["wikidata_preprocessed"]+"/wikidumps/*"
except:
      path = "wikidumps/*"

parquetFile = spark.read.parquet(path)

In [None]:
# take the 'text' and 'id' or the first 1000 rows and create an RDD from it
doc_text_pairs = parquetFile.limit(1000).select("text", "id").rdd
print(doc_text_pairs.take(1))

[Row(text='\'\'\'Anarchism\'\'\' is a political philosophy and movement that is sceptical of authority and rejects all involuntary, coercive forms of hierarchy. Anarchism calls for the abolition of the state, which it holds to be undesirable, unnecessary, and harmful. As a historically far-left movement, it is usually described alongside libertarian Marxism as the libertarian wing (libertarian socialism) of the socialist movement and has a strong historical association with anti-capitalism and socialism.\n\nThe history of anarchy goes back to prehistory, when humans arguably lived in anarchic societies long before the establishment of formal states, realms or empires. With the rise of organised hierarchical bodies, scepticism toward authority also rose, but it was not until the 19th century that a self-conscious political movement emerged. During the latter half of the 19th and the first decades of the 20th century, the anarchist movement flourished in most parts of the world and had a

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS
bucket_name = "bucket-mr-project-ir-david"
base_dir = "base_dir"

english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  len_of_doc = len(tokens)
  # YOUR CODE HERE
  word_counts = Counter(tokens)
    # Filter out stopwords and create tuples of (token, (doc_id, tf))
  result = [(token, (id, tf)) for token, tf in word_counts.items() if token not in all_stopwords]
  return result

def doc_len(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]# should i remove stop word???
  return id,len(tokens)
def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # YOUR CODE HERE
  # Sort the posting list by wiki_id
  sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
  return sorted_pl

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  # Calculate the length of each posting list for each token
  token_document_counts = postings.map(lambda x: (x[0], len(x[1])))

  return token_document_counts

def partition_postings_and_write(postings):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  # YOUR CODE HERE
  def write_partition_to_disk(partition):
        ''' Writes a partition of posting lists to disk and returns the posting
        locations for each word in the partition.
        Parameters:
        -----------
            partition: iterator
                An iterator of (bucket_id, posting_lists) tuples.
        Returns:
        --------
            tuple
                A tuple containing the bucket ID and the posting locations dictionary
                for the partition.
        '''
        bucket_id, posting_lists = partition
        bucket_id , posting_locs = InvertedIndex.write_a_posting_list((bucket_id, posting_lists),base_dir,bucket_name)
        return posting_locs
  # Partition the posting lists into buckets
  bucketed_postings = postings.map(lambda x: (token2bucket_id(x[0]), [x])) \
                               .reduceByKey(lambda a, b: a + b)

    # Write out each bucket to disk and collect the posting locations
  posting_locations = bucketed_postings.map(write_partition_to_disk)

  return posting_locations




In [None]:
#create dict for len of docs
dict_len_docs_rdd = doc_text_pairs.map(lambda x: doc_len(x[0], x[1]))

# Collect the RDD as a dictionary
dict_len_docs = dict_len_docs_rdd.collectAsMap()
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>15)
w2df = calculate_df(postings_filtered)

w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
posting_locs_list = partition_postings_and_write(postings_filtered).collect()


# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for posting_loc in posting_locs_list:
  for k, v in posting_loc.items():
    super_posting_locs[k].extend(v)




# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
inverted.document_len = dict_len_docs
# write the global stats out
inverted.write_index(base_dir, 'inverted_index_small',bucket_name)
# # upload to gs
# index_src = "index.pkl"
# index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
# !gsutil cp $index_src $index_dst

In [None]:
# load_from_bucket
# get textIndex.pkl from bucket
from google.cloud import storage
import pickle
bucket_name = 'bucket-mr-project-ir-david'
file_path = "base_dir/inverted_index_small.pkl"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_path)
contents = blob.download_as_bytes()
inverted = pickle.loads(contents)

In [None]:
import numpy as np
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import string

def preprocess_query(query):
    # Download NLTK resources if not already downloaded //make sure documents are preprocess in the same way
    nltk.download('punkt')
    nltk.download('stopwords')
    english_stopwords = frozenset(stopwords.words('english'))
    corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

    all_stopwords = english_stopwords.union(corpus_stopwords)
    # Initialize Porter Stemmer
    #stemmer = PorterStemmer()

    # Tokenize the query
    tokens = word_tokenize(query)

    # Remove punctuation
    tokens = [token for token in tokens if token not in string.punctuation]

    # Remove stop words
    stop_words = set(stopwords.words('english'))
    filtered_tokens = [token for token in tokens if token.lower() not in all_stopwords]

    # Stemming
    #stemmed_tokens = [stemmer.stem(token) for token in filtered_tokens]

    # Perform additional preprocessing if needed

    return filtered_tokens

# def vectorize_query(query, inverted_index):
#     # Preprocess the query
#     preprocessed_query = preprocess_query(query)


#     # Initialize a vector for the query
#     query_vector = np.zeros(len(inverted_index.posting_locs))# to do:create vector in size of the query

#     # Calculate TF-IDF for the query terms
#     for term in preprocessed_query:

#         if term in inverted_index.posting_locs:
#             # Calculate TF (term frequency) for the query term

#             index = inverted.dictIndexTerm[term]
#             tf = preprocessed_query.count(term)/len(preprocessed_query)

#             # Calculate TF-IDF score for the query term
#             tf_idf = tf * 1

#             # Assign TF-IDF score to the corresponding dimension in the query vector
#             query_vector[index] = tf_idf

#     return query_vector
def vectorize_query(query, inverted_index):
    # Preprocess the query
    preprocessed_query = preprocess_query(query)

    dict_tokens_unq = Counter(preprocessed_query)
    # Initialize a vector for the query
    query_vector = np.zeros(len(dict_tokens_unq))# to do:create vector in size of the query
    counter = 0
    # Calculate TF-IDF for the query terms
    for term, freq in dict_tokens_unq.items():

        if inverted_index.posting_locs.get(term) is not None:
            # Calculate TF (term frequency) for the query term

            tf = freq/len(preprocessed_query)

            # Calculate TF-IDF score for the query term
            tf_idf = tf * 1

            # Assign TF-IDF score to the corresponding dimension in the query vector
            query_vector[counter] = tf_idf
            counter += 1
    return query_vector


def vectorize_documents(inverted_index,query_tokens_unq):
    # Initialize a dictionary to store document vectors
    document_vectors = {}



    counter = 0
    # Calculate TF-IDF for each document
    for term in query_tokens_unq:
        #if term not in inverted.df.keys()
        if inverted_index.df.get(term) is None:
          counter += 1
          continue
        # posting_list = inverted.read_a_posting_list(base_dir, term, bucket_name)
        posting_list = inverted.read_a_posting_list("",term, bucket_name)
        num_of_docs = len(inverted_index.document_len.items())
        print(str(num_of_docs)+"num_of_docs")
        df_of_term = inverted_index.df[term]
        idf = np.log(num_of_docs/df_of_term)  # Adding 1 to avoid division by zero
        print(idf)
        for doc_id, tf in posting_list:
            tf = tf / (inverted.document_len[doc_id])#remove 0.1
            # Calculate TF-IDF score for the term in the document
            tf_idf = tf * idf

            # If the document vector already exists, update it
            if doc_id in document_vectors:
                document_vectors[doc_id][counter] = tf_idf
            # Otherwise, create a new document vector
            else:
                document_vector = np.zeros(len(query_tokens_unq))
                document_vectors[doc_id] = document_vector
                document_vectors[doc_id][counter] = tf_idf
        counter += 1
    return document_vectors
def cosine_similarity(v1, v2):
    """Compute cosine similarity between two vectors."""
    dot_product = np.dot(v1, v2)
    norm_v1 = np.linalg.norm(v1)
    norm_v2 = np.linalg.norm(v2)
    return dot_product / (norm_v1 * norm_v2)

def search(query, inverted_index, doc_titles = None, k=10):
    """Search for documents based on a query using cosine similarity."""
    # Preprocess query (e.g., tokenize, remove stop words, etc.)
    # Vectorize the query

    query_vector = vectorize_query(query,inverted)
    processed_query = preprocess_query(query)
    list_tokens_unq = list(Counter(processed_query).keys())
    vectorize_documents_res = vectorize_documents(inverted,list_tokens_unq)

    print("Vectors of the first 10 documents:")
    for doc_id, doc_vector in islice(vectorize_documents_res.items(), 10):
        print(f"Document ID: {doc_id}, Vector: {doc_vector}")


    # Compute cosine similarity between the query vector and all document vectors
    similarities = {}
    for doc_id, doc_vector in vectorize_documents_res.items():
        similarities[doc_id] = cosine_similarity(query_vector, doc_vector)

    # Sort documents by similarity and return the top k results
    top_results = sorted(similarities.items(), key=lambda x: x[1], reverse=True)[:k]

    # Retrieve titles for the top results
    #results_with_titles = [(doc_id, doc_titles[doc_id]) for doc_id, _ in top_results]
    #results_with_titles = [(doc_id, doc_titles[doc_id]) for doc_id, _ in top_results]
    return top_results



In [None]:
# @title Default title text
#test
first_10_items = list(inverted.document_len.items())[:10]

# Print the first 10 items
print(first_10_items)

query = "jewish singer"
proces_query = preprocess_query(query)
print(proces_query)
query_vector = vectorize_query(query,inverted)
print(query_vector)
print(search(query,inverted))



[(12, 5848), (25, 5811), (39, 2517), (290, 1430), (303, 10948), (305, 6225), (307, 11284), (308, 8763), (309, 1408), (316, 5427)]
['jewish', 'singer']
[0.5 0.5]


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


1000num_of_docs
2.1286317858706076
1000num_of_docs
2.120263536200091
Vectors of the first 10 documents:
Document ID: 303, Vector: [0.00077772 0.        ]
Document ID: 339, Vector: [0.00278071 0.        ]
Document ID: 573, Vector: [0.00030975 0.        ]
Document ID: 599, Vector: [0.00051766 0.        ]
Document ID: 624, Vector: [0.00043327 0.00021578]
Document ID: 628, Vector: [0.00061646 0.        ]
Document ID: 678, Vector: [0.01051176 0.        ]
Document ID: 689, Vector: [0.00067543 0.        ]
Document ID: 700, Vector: [0.00019321 0.        ]
Document ID: 717, Vector: [0.00024671 0.        ]
[(984, 0.9999980605171862), (2382, 0.9999980605171861), (738, 0.999998060517186), (1438, 0.9802227868442028), (1688, 0.9491809580183561), (2195, 0.9491809580183561), (2185, 0.9481844604998539), (1216, 0.9481844604998538), (2406, 0.9481844604998538), (624, 0.9481844604998537)]
