In [None]:
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  WORKER_COUNT  PREEMPTIBLE_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-a9c1  GCE       2                                       RUNNING  us-central1-a


In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import gc

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Nov 21 17:33 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'irproject2066' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('.parquet'):
        paths.append(full_path+b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

N_temp = parquetFile.count()

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp_final.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp_final.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

sc.addFile("/home/dataproc/write_to_memory.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp_final import InvertedIndex
from write_to_memory import *

# Body Index

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def word_count(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  counter = Counter(tokens)

  returnList = []
  for c in counter:
    if c not in all_stopwords:
      returnList.append((c, (id,counter[c])))

  return returnList

def reduce_word_counts(unsorted_pl):
  return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
  return postings.map(lambda x: (x[0],len(x[1])))

  ####### we can add filtering on rare terms

In [None]:
def partition_postings_and_write(postings, directory):
  with_bucket_id = postings.map(lambda w: (token2bucket_id(w[0]) , (w[0] , w[1]))).groupByKey()
  buckets_mapped = with_bucket_id.map(lambda w: InvertedIndex.write_a_posting_list(w, bucket_name, directory))
  return buckets_mapped

In [None]:
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
print("done1")
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
print("done2")
postings_filtered = postings.filter(lambda x: len(x[1])>50)
print("done3")
w2df = calculate_df(postings_filtered)
print("done4")
w2df_dict = w2df.collectAsMap()
print("done5")
_ = partition_postings_and_write(postings_filtered,'postings_body').collect()
print("done6")

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_body'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
inverted = InvertedIndex()
inverted.posting_locs = super_posting_locs
inverted.df = w2df_dict
inverted.write_index('.', 'body_index')

index_src = "body_index.pkl"
index_dst = f'gs://{bucket_name}/postings_body/{index_src}'
!gsutil cp $index_src $index_dst

del super_posting_locs
del _
del word_counts
del postings
del w2df
del postings_filtered
gc.collect()

creating norms for all the body of the ducuments based on tf-idf:

In [None]:
import math
def createNorm(text):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  counter = Counter(tokens)

  tf_list = []
  N=N_temp
  new_counter = Counter()
  for c in counter:
    if c not in all_stopwords:
      tf_list.append((c,counter[c]))
      new_counter[c] = counter[c]
  doc_len = 0
  for y in new_counter.values():
    doc_len += y
  pow_list = [math.pow((x/doc_len)*(math.log(N/w2df_dict[c],2)) ,2) for c,x in tf_list]
  sumOfTf = 0
  for x in pow_list:
    sumOfTf += x
  return math.sqrt(sumOfTf)

docs_norms = doc_text_pairs.map(lambda x: (x[1],createNorm(x[0]))).collectAsMap()

write_to_memory(docs_norms,'.','body_TFIDFnorma2')

doc_norma_src = "body_TFIDFnorma2.pkl"
doc_norma_dst = f'gs://{bucket_name}/general_files/{doc_norma_src}'
!gsutil cp $doc_norma_src $doc_norma_dst

del docs_norms
del w2df_dict
gc.collect()

saving for each document his leangth:

In [None]:
def DL_calc(id, text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    count = 0
    for token in tokens:
      if token not in all_stopwords:
        count += 1
    return (id,count)

temp_DL=doc_text_pairs.map(lambda x:DL_calc(x[1],x[0])).collectAsMap()
write_to_memory(temp_DL,'.','DL_bodyIndex')

DL_bodyIndex_src = "DL_bodyIndex.pkl"
DL_bodyIndex_src_dst = f'gs://{bucket_name}/general_files/{DL_bodyIndex_src}'
!gsutil cp $DL_bodyIndex_src $DL_bodyIndex_src_dst

del temp_DL
del doc_text_pairs
gc.collect()

# Title Index

In [None]:
doc_title_pairs = parquetFile.select("id", "title").rdd

In [None]:
def word_count_title(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  counter = Counter(tokens)

  returnList = []
  for c in counter:
    if c not in all_stopwords:
      returnList.append((c, (id,counter[c])))

  return returnList

word_counts_title = doc_title_pairs.flatMap(lambda x: word_count_title(x[0], x[1]))

def reduce_word_counts_title(unsorted_pl):
  return sorted(unsorted_pl, key=lambda x: x[0])

postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts_title)

def calculate_df_title(postings):
  return postings.map(lambda x: (x[0],len(x[1])))


#all the terms df dict for the index
w2df_title = calculate_df_title(postings_title)
w2df_dict_title = w2df_title.collectAsMap()

In [None]:
posting_locs_list_title = partition_postings_and_write(postings_title,'postings_title').collect()

super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_title'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
#create an Inverted Index
inverted_title = InvertedIndex()
inverted_title.posting_locs = super_posting_locs_title
inverted_title.df = w2df_dict_title
inverted_title.write_index('.', 'title_index')

title_index_src = "title_index.pkl"
title_index_dst = f'gs://{bucket_name}/postings_title/{title_index_src}'
!gsutil cp $title_index_src $title_index_dst

del super_posting_locs_title
del w2df_dict_title
del posting_locs_list_title
del postings_title
del w2df_title
del word_counts_title
gc.collect()

creating a dictionary mapping between doc_id and the document title:

In [None]:
title_pairs = doc_title_pairs.collectAsMap()
write_to_memory(title_pairs,'.','title_and_id')

title_id_src = "title_and_id.pkl"
title_id_src_dst = f'gs://{bucket_name}/general_files/{title_id_src}'
!gsutil cp $title_id_src $title_id_src_dst

del title_pairs
del doc_title_pairs
gc.collect()

# Anchor Index

In [None]:
doc_anchor_pairs = parquetFile.select("id", "anchor_text").rdd

In [None]:
def helperFunc1(d_id):
    list_of_edges = map(lambda w: (s_id, w[1]), d_id)
    return list_of_edges

# position1 = doc_anchor_pairs.flatMapValues(lambda w: helperFunc1(w[0], w[1])).collect()

position1 = doc_anchor_pairs.flatMap(lambda w: w[1])
# print(position1)

def tuple_creator(doc_id, text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    counter = Counter(tokens)
    return_list = []
    for term in tokens:
      if term not in all_stopwords:
        return_list.append((term, (doc_id,counter[term])))
    return return_list

position2 = position1.flatMap(lambda x: tuple_creator(x[0],x[1]))
position3 = position2.groupByKey()
position4 = position3.mapValues(lambda x: list(set(x)))
temp_df = position4.map(lambda x: (x[0], len(x[1]))).collectAsMap()

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS


posting_locs_list = partition_postings_and_write(position4,'postings_anchor').collect()
super_posting_locs = defaultdict(list)

for blob in client.list_blobs(bucket_name, prefix='postings_anchor'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

inverted = InvertedIndex()
inverted.posting_locs = super_posting_locs
inverted.df = temp_df

inverted.write_index('.', 'anchor_index')

anchor_index_src = "anchor_index.pkl"
anchor_index_dst = f'gs://{bucket_name}/postings_anchor/{anchor_index_src}'
!gsutil cp $anchor_index_src $anchor_index_dst