# setups

In [1]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-8096  GCE       3                                             RUNNING  us-central1-a


In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import builtins
import math
from nltk.stem import *


stemmer = PorterStemmer()


import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  7 14:39 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:

spark

# Building an Inverted Index for title with stemming and ngram

### check access to the bucket, change the "bucket_name" depending on the bucket

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '209706803_body' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and not b.name.startswith("postings_gcp"):
        paths.append(full_path+b.name)

In [12]:

parquetFile = spark.read.parquet(*paths)
doc_title_pairs_limit = parquetFile.select("title", "id").rdd 

                                                                                

In [13]:
# Count number of wiki pages - Careful, might kill kernel
parquetFile.count()


                                                                                

Row(title='Foster Air Force Base', id=4045403)


### Nirs imports - make sure Inverted Index class exists and is imported

In [17]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [18]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [19]:
from inverted_index_gcp import InvertedIndex

#### depends on what kind of tokenizing desired, change the arguments of the function tokenizer

In [20]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)


def tokenizer(text, stem=True, ngram=True):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    
    if stem and not ngram:
        tokens = [stemmer.stem(token) for token in tokens if token not in all_stopwords]
    
    elif ngram and not stem:
        tokens = [token[0] + " " + token[1] for token in list(nltk.bigrams(tokens))]
    
    elif stem and ngram:
        tokens = [stemmer.stem(token) for token in tokens if token not in all_stopwords]
        tokens = [token[0] + " " + token[1] for token in list(nltk.bigrams(tokens))]
        
    else:
        tokens = [token for token in tokens if (token not in all_stopwords)]
        
    return tokens

# in the code below, depending in the index being created, change the variables "BASE_DIR" and "BUCKET_NAME"

In [21]:


NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE


BASE_DIR = "POSTING_LISTS_TITLE"
BUCKET_NAME = "209706803"



def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = tokenizer(text)

  # YOUR CODE HERE
  counting_dict = Counter(tokens)

  lst_of_tups = [(key, (id, val)) for key, val in counting_dict.items()]

  return lst_of_tups


def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # YOUR CODE HERE
  return sorted(unsorted_pl, key=lambda x: x[0])


def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE 
    
  token_df = postings.mapValues(lambda x: len(x))

  return token_df


def partition_postings_and_write(postings, BASE_DIR):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''

  # YOUR CODE HERE

  # Step 1: Determine the bucket ID for each token - (bucket_id, (token, pl))
  bucketed_postings = postings.map(lambda x: (token2bucket_id(x[0]), (x[0], x[1])))

  # Step 2: Group posting lists by bucket ID
  grouped_buckets = bucketed_postings.groupByKey()

  # Step 3: Write each posting list to disk and collect posting locations - (location(bucket_id), (token, pl))
  posting_locations = grouped_buckets.map(lambda x: InvertedIndex().write_a_posting_list((x[0], x[1]), BASE_DIR, BUCKET_NAME))

  return posting_locations


# Before running the code below, check the tokenizer used in the variable "id_title_length_dict" and adjust the arguments accordingly

In [23]:
# SHIR AND NITZAN - TITLE INVERTED INDEX WITH STEMMING AND NGRAM CALCULATIONS

# time the index creation time
t_start = time()
# word counts map
word_counts = doc_title_pairs_limit.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df

################3
postings_filtered = postings.filter(lambda x: len(x[1])>=1)
################3

id_title_length_list = doc_title_pairs_limit.map(lambda row: (row.id, len(tokenizer(row.title)))).collect()
id_title_length_dict = {item[0]: item[1] for item in id_title_length_list}


w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered, BASE_DIR).collect()
index_const_time = time() - t_start

                                                                                

In [24]:
# test index construction time
print(index_const_time)

187.73400020599365


## change the "POSTING_DIRECTORY" to the name of the directory the posting lists will be located

In [22]:
# collect all posting lists locations into one super-set

POSTING_DIRECTORY = "POSTING_LISTS_TITLE"

super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=POSTING_DIRECTORY):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [23]:
# Create inverted index instance  


########## we may want to address the fact that we are not updating the term_total attribute #########
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
######## Add the DL dict
inverted.DL = id_title_length_dict
# write the global stats out
inverted.write_index('.', 'index')
# upload to gs
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/{POSTING_DIRECTORY}/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://index.pkl [Content-Type=application/octet-stream]...
/ [1 files][  1.5 KiB/  1.5 KiB]                                                
Operation completed over 1 objects/1.5 KiB.                                      


In [24]:
!gsutil ls -lh $index_dst

  1.48 KiB  2024-03-05T22:44:17Z  gs://209706803/POSTING_LISTS_TITLE/index.pkl
TOTAL: 1 objects, 1513 bytes (1.48 KiB)


## tests to see if the attributes are set correctly

In [39]:
inverted.DL

1090

In [None]:
inverted.posting_locs

In [None]:
inverted.df