# Imports & Setup

In [1]:
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-ec23  GCE       4                                             RUNNING  us-central1-a


In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
import itertools
from itertools import islice, count, groupby
from math import log
from operator import itemgetter
from collections import Counter, OrderedDict, defaultdict
import pandas as pd
import os
import re
import math
import numpy as np
import nltk
from nltk.stem.porter import *
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
from time import time
from heapq import heappop, heappush, heapify
from threading import Thread
from flask import Flask, request, jsonify
import pickle
import requests
from pathlib import Path
from google.cloud import storage
nltk.download('stopwords')

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [4]:
# initialization cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  9 09:17 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from graphframes import *

In [6]:
spark

In [7]:
# access to the bucket
bucket_name = 'yuval_206542839' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and b.name.endswith(".parquet"):
        paths.append(full_path+b.name)

***GCP setup is complete!***

# Building an inverted index

In [8]:
# entire corpus 
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text","id").rdd

                                                                                

In [9]:
# Count number of wiki pages
num_documents = parquetFile.count()

                                                                                

In [10]:
# check if inverted_index_gcp.py is uploaded to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [11]:
# adding python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [12]:
from inverted_index_gcp import *
from inverted_index_gcp import InvertedIndex

***preproccesing***
remove stop words (enslish & corpus)
filter by ReGex
PorterStemmer
create functions that will calculate tf, idf, document lengths
and functions that will write posting list to bucket




In [13]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

porter = PorterStemmer()

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    
  # Apply stemming to the tokens
  stemmed_tokens = [porter.stem(token) for token in tokens]
    
  # Count the frequency of each word using `Counter` from the collections module
  freq = Counter(stemmed_tokens)

  # Convert the `freq` dictionary to a list of tuples as required
  result = [(token, (id, count)) for token, count in freq.items()]

  return result

def length_doc(text, id):
  ''' Calculate the length of a document.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    tuple (doc_id, doc_length) 
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    
  # Apply stemming to the tokens
  stemmed_tokens = [porter.stem(token) for token in tokens]

  return (id,len(stemmed_tokens))



def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''

  # Sort the unsorted posting list by wiki_id
  sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
  return sorted_pl



def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  return postings.map(lambda x: (x[0], len(x[1])))



def partition_postings_and_write(postings, bucket_name):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning is done using the `token2bucket_id` function.

  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    bucket_name: str
      The name of the bucket where the posting lists will be stored.

  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket.
  '''
  # Map postings to buckets based on token
  postings_bucketed = postings.map(lambda posting: (token2bucket_id(posting[0]), posting))

  # Group postings by bucket ID
  postings_grouped = postings_bucketed.groupByKey().mapValues(list)

  # Write postings to disk and collect location information
  def write_postings_to_disk(postings_list):
      # Pass `bucket_name` to `write_a_posting_list` method
      return InvertedIndex.write_a_posting_list(postings_list, "postings_gcp_body/", bucket_name)

  # Apply the function to each group of postings and collect the results
  posting_locations = postings_grouped.map(write_postings_to_disk)

  return posting_locations


def calculate_idf2(postings, total_documents):
  ''' Takes a posting list RDD, calculates the IDF for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
    total_documents: int
      Total number of documents in the corpus.

  Returns:
  --------
    RDD
      An RDD where each element is a (token, idf) pair.
  '''
  # Calculate Document Frequency (DF) for each token
  df_rdd = calculate_df(postings)

  # Calculate Inverse Document Frequency (IDF) for each token
  idf_rdd = df_rdd.map(lambda x: (x[0], log(total_documents / x[1])))

  return idf_rdd


In [14]:
# word counts as flat map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings_filtered = word_counts.groupByKey().mapValues(reduce_word_counts)

# filtering postings and calculate df
FILTER_MIN = 25
postings_filtered = postings_filtered.filter(lambda x: len(x[1])>FILTER_MIN)

w2df = calculate_df(postings_filtered)

In [15]:
# כתיבה לבאקט, לא להריץ שוב, עבד!!

# w2df_path = "gs://yuval_206542839/postings_gcp_body/w2df"

# numPartitions = 1000
# repartitionedRDD = w2df.repartition(numPartitions)

# # Save the repartitioned RDD to GCS
# repartitionedRDD.saveAsTextFile(w2df_path)

In [16]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("sparkContext") \
    .getOrCreate()

# Access SparkContext from SparkSession
sc = spark.sparkContext

# read w2df from bucket
path = "gs://yuval_206542839/postings_gcp_body/w2df"
w2df_dict_str = sc.textFile(path)

24/03/09 21:46:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [17]:
# Function to safely parse the strings to list
def safe_parse_to_dict(s):
    try:
        key, value = eval(s)
        return {key: value}
    except:
        # Return an empty dictionary or None in case of error
        return {}

In [18]:
# לקח חצי דקה בערך

# Parse each string into a tuple and filter out any failures
w2df_lst = w2df_dict_str.map(safe_parse_to_dict).collect()

                                                                                

In [19]:
# create dict from the list
w2df_dict = {key: value for d in w2df_lst for key, value in d.items()}

In [20]:
# calculate doc_length
doc_length = doc_text_pairs.map(lambda row: length_doc(row['text'], row['id']))

In [21]:
# לא להריץ שוב !!!!!

# Sum all document lengths
# total_length = doc_length.map(lambda x: x[1]).reduce(lambda a, b: a + b)

In [22]:
# # לא להריץ שוב !!!!!

# # Count the number of documents
# num_documents = 6348910

# # Calculate average length
# doc_avg_len = total_length / num_documents if num_documents else 0

In [23]:
# # write to bucket
# # לא להריץ שוב !!!

# dl_path = "gs://yuval_206542839/postings_gcp_body/doc_length_dict"

# numPartitions = 1000
# repartitionedRDD = doc_length.repartition(numPartitions)

# # Save the repartitioned RDD to GCS
# repartitionedRDD.saveAsTextFile(dl_path)

In [24]:
# read doc_length_dict from bucket
path = "gs://yuval_206542839/postings_gcp_body/doc_length_dict"
doc_length_str = sc.textFile(path)

In [25]:
# Parse each string into a tuple and filter out any failures
doc_length_lst = doc_length_str.map(safe_parse_to_dict).collect()

                                                                                

In [26]:
# dictionary
doc_length_dict = {key: value for d in doc_length_lst for key, value in d.items()}

In [27]:
# # לא להריץ שוב !!!
# # write posting list to bucket
# _ = partition_postings_and_write(postings_filtered, "yuval_206542839").collect()

In [28]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp_body'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [30]:
# # לא להריץ שוב !!!
# # Create inverted index instance
# inverted_body = InvertedIndex()

# # Adding the posting locations dictionary to the inverted index
# inverted_body.posting_locs = super_posting_locs

# # Add the token - df dictionary to the inverted index
# inverted_body.df = w2df_dict
# inverted_body.dict_len = doc_length_dict
# inverted_body.N = num_documents
# inverted_body.AVG = doc_avg_len

# # write the global stats out
# inverted_body.write_index('.', 'index_body')

# # upload to gs
# index_src = "index_body.pkl"
# index_dst = f'gs://{bucket_name}/postings_gcp_body/{index_src}'
# !gsutil cp $index_src $index_dst

In [31]:
!gsutil ls -lh $index_dst

gs://dataproc-staging-us-central1-860733016545-rvegqajb/
gs://dataproc-temp-us-central1-860733016545-hd7yfavc/
gs://yuval_206542839/


In [32]:
!pip install google-cloud-storage
!pip install flask

[0m

In [33]:
bucket_name = "yuval_206542839"
index_src = "index_body"

# import inverted index from the bucket
base_dir = "postings_gcp_body/"
index_body = InvertedIndex.read_index(base_dir, f'{index_src}', bucket_name)

SEARCH ENGINE

In [34]:
def preprocess_query(query):
    """
    Preprocesses the query by removing stopwords, stemming, and tokenization.
    """
    
    # Stopwords list
    stop_words = set(stopwords.words('english')) 
    
    # Stemmer
    stemmer = PorterStemmer() 
    
    # lower case
    query = query.lower() 
    
    # Remove non-alphabetic characters
    query = re.sub(r'[^a-zA-Z\s]', '', query) 
   
    # Tokenize
    tokens = query.split() 

    # Remove stopwords and stem
    tokens = [stemmer.stem(word) for word in tokens if word not in stop_words]
    
    return tokens


def query_to_vector(query_tokens, index):
    """
    Transforms a query into a vector using TF-IDF weights from the inverted index.
    :param query_tokens: List of tokens in the query
    :param index: The inverted index object
    :return: A dictionary representing the query vector, with terms as keys and TF-IDF as values.
    """
    # Calculate TF for the query
    query_tf = Counter(query_tokens)
    query_vector = {}

    for term, tf in query_tf.items():
        # Calculate TF-IDF, assuming the document contains the term
        if term in index.df:
            query_vector[term] = tf

    return query_vector


def cosine_similarity(query_vector,index):
    """
    Calculates the cosine similarity between a query vector and document vectors,
    where document vectors are represented by posting lists of tokens that appear both in the documents and the query.
    :param query_vector: The query vector represented as a dictionary.
    :param posting_lists: A dictionary of posting lists, with terms as keys and lists of (doc_id, tf_idf) as values.
    :return: A list of tuples, each tuple being (doc_id, cosine_similarity_score).
    """
    # Initialize document scores and norms
    doc_scores = Counter()
    doc_norms = Counter()

    # Calculate scores for each term in the query
    for term, query_weight in query_vector.items():
        if term in index.df:
            posting = index.read_a_posting_list("",term,"yuval_206542839")
            for doc_id, tf_idf in posting:
                doc_scores[doc_id] += query_weight * tf_idf
                doc_norms[doc_id] += tf_idf ** 2

    # Calculate query norm
    sum_total = 0
    for weight in query_vector.values():
        sum_total += weight**2
    query_norm = math.sqrt(sum_total)

    # Calculate final cosine similarity scores
    similarities = []
    for doc_id, score in doc_scores.items():
        doc_norm = math.sqrt(doc_norms[doc_id])
        if query_norm == 0 or doc_norm == 0:
            similarity = 0
        else:
            similarity = score / (query_norm * doc_norm)
        similarities.append((doc_id, similarity))

    return similarities[:1000]


def bm25_score(query_vector, index, k1=1.5, b=0.75):
    """
    Calculates the BM25 score between a query vector and document vectors.
    :param query_vector: The query vector represented as a dictionary with terms and their TF-IDF weights.
    :param index: The inverted index object containing document frequencies and other relevant data.
    :param k1: The scaling factor for term frequency. Typically between 1.2 and 2.0.
    :param b: The document length normalization factor. Typically close to 0.75.
    :return: A list of tuples, each tuple being (doc_id, BM25_score).
    """
    doc_scores = Counter()

    for term, query_tf in query_vector.items():
        if term in index.df:
            posting = index.read_a_posting_list("", term, "yuval_206542839")
            for doc_id, doc_tf in posting:
                # IDF calculation for the term
                idf = math.log((index.N - index.df[term] + 0.5) / (index.df[term] + 0.5) + 1)
                
                # Term frequency normalization and scaling
                B = 1 - b + b * (doc_length_dict[doc_id] / index.AVG)
                tf_scaled = (doc_tf * (k1 + 1)) / (doc_tf + k1 * B )
                # Accumulate the BM25 score for the document
                doc_scores[doc_id] += idf * tf_scaled

    # Convert scores to a sorted list of tuples (doc_id, score)
    sorted_scores = doc_scores.most_common()  # This will sort documents by their BM25 score in descending order
    
    return sorted_scores[:1000]  # Return top 100 documents, adjust as needed




def return_result(page_ids):
    """
    recieve list of page_idf
    return list (doc_id,title)
    """
    titles = []
    for page_id in page_ids:
        url = f'https://en.wikipedia.org/?curid={page_id}'
        response = requests.get(url)
        if response.status_code == 200:
            # Assuming the title is easily extractable from the response content
            # This is a simplification; actual extraction may require parsing the HTML or using an API
            title_start = response.text.find('<title>') + 7
            title_end = response.text.find('</title>', title_start)
            title = response.text[title_start:title_end].replace(" - Wikipedia", "")
            titles.append((page_id,title))
            
    return titles
            



def my_search(query):
    """
    Performs the search operation, including query preprocessing, document retrieval,
    and document ranking.
    """
    index_src_title = "index"
    index_src_body = "index_body"
    
    base_dir_title = "postings_gcp/"
    base_dir_body = "postings_gcp_body/"
    bucket_name = "yuval_206542839"
    
    index_title = InvertedIndex.read_index(base_dir_title, f'{index_src_title}', bucket_name)
    index_body = InvertedIndex.read_index(base_dir_body, f'{index_src_body}', bucket_name)
    
    tokens = preprocess_query(query)
    query_vector_title = query_to_vector(tokens, index_title)
    query_vector_body = query_to_vector(tokens, index_body)
    
    sim_cosine = cosine_similarity(query_vector_title, index_title) 
    sim_bm25 = bm25_score(query_vector_body, index_body, k1=1.5, b=0.75)
    
    result = calculate_result(sim_bm25,sim_cosine,0.6,0.4)
    
    result = [doc_id for doc_id, _ in result]

    result = result[:100]
    
    result = return_result(result)
    return result



def calculate_result(bm25_scores,cosine_similarity_scores,weight_TITLE,weight_BODY):
    # Convert lists to dictionaries
    
    bm25_dict = {key: value for key, value in bm25_scores}
    cosine_similarity_dict = {key: value for key, value in cosine_similarity_scores}

    # Normalize BM25 scores using min-max normalization
    min_bm25 =bm25_scores[-1][1]
    max_bm25 =bm25_scores[0][1]
    normalized_bm25_dict = {doc_id: (score - min_bm25) / (max_bm25 - min_bm25) for doc_id, score in bm25_dict.items()}

    # Initialize a dictionary to store the final weighted scores
    final_scores = {}

    # Calculate the weighted score for documents appearing in either or both dictionaries
    all_doc_ids = set(bm25_dict.keys()).union(set(cosine_similarity_dict.keys()))
    for doc_id in all_doc_ids:
        bm25_score = normalized_bm25_dict.get(doc_id, 0)
        cosine_score = cosine_similarity_dict.get(doc_id, 0)
        final_scores[doc_id] = weight_BODY * bm25_score + weight_TITLE * cosine_score

    # Optional: Sort the final scores dictionary by score in descending order to see the highest ranked documents first
    sorted_final_scores = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)

    # Display the sorted final scores
    return sorted_final_scores


In [35]:
q = "hello, running"
my_search(q)

[(65476855, 'Hello FM (Ghana)'),
 (28719199, 'Hello Hey'),
 (6644261, 'Hello Kitty: Roller Rescue'),
 (59612068, 'Hello Molly'),
 (13834, '"Hello, World!" program'),
 (34043293, 'Hello Dummy!'),
 (49272424, 'Hello Gorgeous'),
 (7956284, 'Hello (Aya Ueto song)'),
 (28063614, 'List of Hello Kitty animated series'),
 (12409428, 'Say Hello, Wave Goodbye'),
 (33361179, 'Hello! Canada'),
 (61882884, 'Hello Youmzain'),
 (3162118, 'Hello, Larry'),
 (1480256, 'Hello! Morning'),
 (1895881, 'Hello, Dolly! (song)'),
 (7375587, 'Hello Tomorrow'),
 (40434533, 'The Oh Hellos'),
 (46740416, 'The Oh, Hello Show'),
 (1379058, 'Hello Muddah, Hello Fadduh (A Letter from Camp)'),
 (1206479, 'Hello, Goodbye'),
 (3748733, 'Hello Sailor'),
 (7531070, 'Hello Kitty no Hanabatake'),
 (3333421, 'Hello (disambiguation)'),
 (16393064, 'The Adventures of Hello Kitty &amp; Friends'),
 (3047381, 'Hello Mary Lou'),
 (42663426, 'Hello Hello'),
 (11896394, 'Hello Sunshine (Super Furry Animals song)'),
 (40778942, 'Hello 