***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-dfa7  GCE       2                                             RUNNING  us-central1-a


# Imports & Setup

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
import pickle
import math
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  7 13:05 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'bucket_title'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('parquet'):
        paths.append(full_path+b.name)

# Building an inverted index

In [7]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd
n_pages = parquetFile.count()

                                                                                

In [8]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [9]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [10]:
from inverted_index_gcp import InvertedIndex

In [11]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
NUM_BUCKETS = 124

def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

def get_tokens(text, doc_id):
    """Support function - performs token filtering per doc 
      Parameters:
    -----------
    text: str
      Text of one document
    id: int
      Document id
    Returns:
    --------
    List of tuples
      A list of (doc_id, filtered_token_list) pairs 
    """
    tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token != '']
    filtered_tokens = [tok for tok in tokens if (tok not in all_stopwords)]
    return [(doc_id, filtered_tokens)]

def calculate_df(postings):
    ''' Takes a posting list RDD and calculate the df for each token.
    Parameters:
    -----------
      postings: RDD
        An RDD where each element is a (token, posting_list) pair.
    Returns:
    --------
      RDD
        An RDD where each element is a (token, df) pair.
    '''
    # Count the number of documents for each token
    df_rdd = postings.map(lambda x: (x[0], len(x[1])))

    return df_rdd

def reduce_word_counts(unsorted_pl):
    ''' Returns a sorted posting list by wiki_id.
    Parameters:
    -----------
      unsorted_pl: list of tuples
        A list of (wiki_id, tf) tuples
    Returns:
    --------
      list of tuples
        A sorted posting list.
    '''
    sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])  # Sort by wiki_id
    return sorted_pl

def word_count(tokens, id):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    Parameters:
    -----------
      text: str
        Text of one document
      id: int
        Document id
    Returns:
    --------
      List of tuples
        A list of (token, (doc_id, tf)) pairs
        for example: [("Anarchism", (12, 5)), ...]
    '''
    doc_word_count = Counter(tokens)
    return [(tok, (id, tf)) for tok, tf in doc_word_count.items()]

def get_doc_len(doc_id, tokens):
    """ Count document filtered length for storage in index as well as document vector length for RDD calculations
  Parameters:
  -----------
    id: int
      Document id
    tokens: str
      list of tokens from document
    Returns:
  --------
    List of tuples
      A list of (doc_id, doc_length) pairs
  """
    doc_word_count = Counter(tokens)
    doc_len = sum(doc_word_count.values())
    return [(doc_id, doc_len)]
    
def partition_postings_and_write(postings,base_dir):
    ''' A function that partitions the posting lists into buckets, writes out
    all posting lists in a bucket to disk, and returns the posting locations for
    each bucket. Partitioning should be done through the use of `token2bucket`
    above. Writing to disk should use the function  `write_a_posting_list`, a
    static method implemented in inverted_index_colab.py under the InvertedIndex
    class.
    Parameters:
    -----------
      postings: RDD
        An RDD where each item is a (w, posting_list) pair.
    Returns:
    --------
      RDD
        An RDD where each item is a posting locations dictionary for a bucket. The
        posting locations maintain a list for each word of file locations and
        offsets its posting list was written to. See `write_a_posting_list` for
        more details.
    '''
    # Step 1: Partition the posting lists into buckets
    buckets = postings.map(lambda x: (token2bucket_id(x[0]), (x[0], x[1])))

    # Step 2: Group posting lists by bucket ID
    grouped_buckets = buckets.groupByKey()

    # Step 3 & 4: Write each bucket's posting lists to disk and collect information about their location
    posting_locations = grouped_buckets.map(lambda x: InvertedIndex.write_a_posting_list(x,base_dir,bucket_name))

    return posting_locations

def get_tfidf(tf, df, doc_len):
    if doc_len == 0:
        return 0.0
    tf_idf = tf/doc_len * math.log(n_pages / (df + 1), 2)
    return tf_idf

In [None]:
index_name = "full_body_index"
doc_tok = doc_text_pairs.flatMap(lambda x: get_tokens(x[0], x[1]))

# calculate document length for later tf normalization
doc_length = doc_tok.flatMap(lambda x: get_doc_len(x[1], x[0]))
# calculate term frequency by document
word_counts = doc_tok.flatMap(lambda x: word_count(x[1], x[0]))
print("running bodyIndex with uncommon_words filter\n")
postings = word_counts.groupByKey().mapValues(reduce_word_counts).filter(lambda x: len(x[1]) >= 50)
# Calculate term document frequency
w2df = calculate_df(postings)

# Calculate sqrt norm of each document - get tf from posting, df from w2df, doc length and calculate tfidf^2 per doc_id, term and sum by doc_id
doc2norm = postings.flatMapValues(lambda x: x).leftOuterJoin(w2df) \
     .map(lambda x: (x[1][0][0], (x[0], x[1][0][1], x[1][1]))) \
     .leftOuterJoin(doc_length).map(lambda x: (x[0], (x[1][0][1], x[1][0][2], x[1][1]))) \
     .mapValues(lambda x: get_tfidf(x[0], x[1], x[2])).mapValues(lambda x: pow(x,2)) \
     .map(lambda x: (x[0], x[1])).reduceByKey(lambda x,y: x+y).mapValues(lambda x: math.sqrt(x))
#.mapValues(lambda x: round(x, 6))

In [ ]:
# save doc_id: (doc_norm, doc_len) RDD for cosin similarity calculations
doc_data = doc2norm.join(doc_length)
tfidf_location = index_name +"/tfidf"
_ = partition_postings_and_write(doc_data, tfidf_location).collect()
super_doc_data_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=tfidf_location):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_doc_data_locs[k].extend(v)

In [ ]:
# write posting to bin files
postings_location = index_name+"/postings"
_ = partition_postings_and_write(postings, postings_location).collect()
print(f"PostingLocs created for {index_name}\n")

super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix=postings_location):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)

In [ ]:
# Create inverted index instance
inverted = InvertedIndex(base_dir=index_name,name=index_name,bucket_name=bucket_name)
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df.update(w2df.collectAsMap())
print(f"document frequancy created for {index_name}\n")
# Count number of docs
inverted._N = n_pages
# Get each document length and norm
inverted.doc_data = super_doc_data_locs
print(f"doc data created for {index_name}\n")
# save titles to return results
# inverted.docID_to_title_dict = id2title
# write the global stats out
inverted.write_index()
print(f"{index_name} written\n")
# upload to gs
index_src = f"{index_name}.pkl"
index_dst = f'gs://{bucket_name}/{index_name}/{index_src}'
!gsutil cp $index_src $index_dst
print(f"{index_name} uploaded to bucket\n")

!gsutil ls -lh $index_dst