In [None]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '316608942a' 

full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title","id").rdd
doc_anchor_pairs = parquetFile.select("anchor_text").rdd

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

In [None]:
def anchor_change(doc_anchor):
  edge_set = []
  for id, anchor in doc_anchor[0]:
    if (id, anchor) in edge_set:
      continue
    else:
      edge_set.append((id, anchor))
  return edge_set

doc_anchor_pairs_new = doc_anchor_pairs.flatMap(anchor_change).distinct().groupByKey().mapValues(list).map(lambda x:(" ".join(x[1]),x[0]))

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords =["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
all_stopwords = english_stopwords.union(corpus_stopwords)

#remember to change NUM_BUCKETS back to 124 
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def tokenize(text):
    """
    This function aims in tokenize a text into a list of tokens. Moreover, it filter stopwords.
    
    Parameters:
    -----------
    text: string , represting the text to tokenize.    
    
    Returns:
    -----------
    list of tokens (e.g., list of tokens).
    """
    list_of_tokens =  [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]    
    return list_of_tokens

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  # YOUR CODE HERE
  return([(k,(id,v)) for k,v in Counter(tokens).items() if k not in all_stopwords and v>0])

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples 
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # YOUR CODE HERE
  return sorted(unsorted_pl, key=lambda tup: tup[0])

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  return postings.map(lambda x:(x[0],len(x[1])))

def partition_postings_and_write(postings,path):
  ''' A function that partitions the posting lists into buckets, writes out 
  all posting lists in a bucket to disk, and returns the posting locations for 
  each bucket. Partitioning should be done through the use of `token2bucket` 
  above. Writing to disk should use the function  `write_a_posting_list`, a 
  static method implemented in inverted_index_colab.py under the InvertedIndex 
  class. 
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and 
      offsets its posting list was written to. See `write_a_posting_list` for 
      more details.
  '''
  # YOUR CODE HERE
  dict1 = postings.map(lambda token: (token2bucket_id(token[0]), token)).groupByKey()
  return dict1.map(lambda x: InvertedIndex.write_a_posting_list(x,bucket_name,path))

def calc_dl(tokens,doc_id):
  return (doc_id,len(tokens))

def calc_total_term(postings):
  return postings.mapValues(helpFunc).collectAsMap()

def helpFunc(posting):
  count=0
  for tup in posting:
    count+=tup[1]
  return count

def doc_norm(text,doc_id,body_dl,N,body_df):
  tokens=tokenize(text)
  dict_tokens=Counter(tokens)
  sum=0
  for term, tf in dict_tokens.items():
    if term in body_df:
      sum+=((tf/len(tokens))*math.log(N/body_df[term]))**2
  return (doc_id,math.sqrt(sum))


In [None]:
min_body=50
!mkdir body_index title_index anchor_index

In [None]:
#body
word_count_bodys = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
posting_lists_body = word_count_bodys.groupByKey().mapValues(reduce_word_counts)
filter_bodys = posting_lists_body.filter(lambda x: len(x[1])>min_body)
#global statistics
body_df=calculate_df(filter_bodys).collectAsMap()
posting_locs_body = partition_postings_and_write(filter_bodys,'body_index').collect()
body_total_term=calc_total_term(filter_bodys)
body_dl=doc_text_pairs.map(lambda x: calc_dl(tokenize(x[0]),x[1])).collectAsMap()
body_norm=doc_text_pairs.map(lambda x: doc_norm(x[0],x[1])).collectAsMap()
super_posting_locs_body = defaultdict(list)

#title
word_count_titles = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
posting_lists_title = word_count_titles.groupByKey().mapValues(reduce_word_counts)
#global statistics
title_df=calculate_df(posting_lists_title).collectAsMap()
posting_locs_title = partition_postings_and_write(posting_lists_title,'title_index').collect()
title_total_term=calc_total_term(posting_lists_title)
title_dl=doc_title_pairs.map(lambda x: calc_dl(tokenize(x[0]),x[1])).collectAsMap()
title_norm=doc_title_pairs.map(lambda x: doc_norm(x[0],x[1])).collectAsMap()
super_posting_locs_title = defaultdict(list)

# #Anchor
word_count_anchors = doc_anchor_pairs_new.flatMap(lambda x: word_count(x[0], x[1]))
posting_lists_anchor = word_count_anchors.groupByKey().mapValues(reduce_word_counts)
#global statistics
anchor_df=calculate_df(posting_lists_anchor).collectAsMap()
posting_locs_anchor = partition_postings_and_write(posting_lists_anchor,'anchor_index').collect()
anchor_total_term=calc_total_term(posting_lists_anchor)
anchor_dl=doc_anchor_pairs_new.map(lambda x: calc_dl(tokenize(x[0]),x[1])).collectAsMap()
anchor_norm=doc_anchor_pairs_new.map(lambda x: doc_norm(x[0],x[1])).collectAsMap()
super_posting_locs_anchor = defaultdict(list)


In [None]:
#body
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_body[k].extend(v)

#title
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_title[k].extend(v)

#anchor
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs_anchor[k].extend(v)

NameError: ignored

In [None]:
#body
index_body=InvertedIndex()
index_body.posting_locs=super_posting_locs_body
index_body.df=body_df
index_body.DL=body_dl
index_body.norma=body_norm
index_body.term_total=body_total_term

#title
index_title=InvertedIndex()
index_title.posting_locs=super_posting_locs_title
index_title.df=title_df
index_title.DL=title_dl
index_title.norma=title_norm
index_title.term_total=title_total_term

# #anchor
index_anchor=InvertedIndex()
index_anchor.posting_locs=super_posting_locs_anchor
index_anchor.df=anchor_df
index_anchor.DL=anchor_dl
index_anchor.norma=anchor_norm
index_anchor.term_total=anchor_total_term

index_body.write_index('.', 'index_body')
index_anchor.write_index('.', 'index_anchor')
index_title.write('.', 'index_title')

In [None]:
# upload to gs
index_src = "index_body.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

index_src = "index_anchor.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

index_src = "index_title.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst