***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  WORKER_COUNT  PREEMPTIBLE_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-1ddb  GCE       4                                       RUNNING  us-central1-a


# Imports & Setup

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import math
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Jan 13 17:08 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
apply_stemming = False
stemmer = None
if apply_stemming:
    stemmer = PorterStemmer()

In [7]:
# If apply_stemming is True, then another bucket is selcted
bucket_name = 'final-project-full-corpus-tf-sorted-64500'
if apply_stemming:
    bucket_name = 'final-project-full-corpus-tf-sorted-64500-stem'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and not (b.name.startswith("postings_gcp") or b.name.startswith("pr")):
        paths.append(full_path+b.name)

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [8]:
parquetFile = spark.read.parquet(*paths)

                                                                                

In [None]:
title_doc_data = parquetFile.select("id", "title").rdd

In [None]:
body_doc_data = parquetFile.select("id", "text").rdd

In [None]:
anchor_doc_data_raw = parquetFile.select("id", "anchor_text").rdd

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [9]:
# Count number of wiki pages
parquetFile.count()

                                                                                

6348910

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [11]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0, SparkFiles.getRootDirectory())

In [12]:
from inverted_index_gcp import InvertedIndex

Auxiliary functions to calculate different parameters of the inverted index

In [13]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

###########################################################
def word_count(text, id, use_stemming=False, stemmer=None):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
    apply_stemming: boolean
      Flag that notes whether to apply stemming
    stemmer: PorterStemmer or None
      PorterStemmer object if apply_stemming=True else None object
  Returns:
  --------
    list of tuples
      A list of (token, (doc_id, tf)) pairs
  '''
  word_counts = {}
  tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
  if use_stemming:
    tokens = [stemmer.stem(word) for word in tokens]
  for token in tokens:
      word_counts[token] = word_counts.get(token, 0) + 1
  return [(token, (id, word_counts[token])) for token in word_counts]

####################################
def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by tf in descending order.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  return sorted(unsorted_pl, key=itemgetter(1), reverse=True)

###########################
def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  return postings.mapValues(len)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

#####################################################
def partition_postings_and_write(postings, base_dir):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  bucket_id_postings = postings.map(lambda posting: (token2bucket_id(posting[0]), [(posting[0], posting[1])]))
  reduced_postings = bucket_id_postings.reduceByKey(lambda x, y: x + y)
  return reduced_postings.map(lambda posting: InvertedIndex.write_a_posting_list(posting, base_dir, bucket_name))

#####################################################
def tokenize(text, use_stemming=False, stemmer=None):
  """
    This function tokenizes a text into a list of tokens. In addition, it filters stopwords and performs stemming.
    Parameters:
    -----------
    text: string, represting the text to tokenize.
    apply_stemming: boolean flag that notes whether to apply stemming
    stemmer: PorterStemmer object if apply_stemming=True else None object
    Returns:
    -----------
    list of tokens.
  """
  tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
  if use_stemming:
    tokens = [stemmer.stem(word) for word in tokens]
  return tokens

#########################
def calculate_idf(index):
  # Calculate idf values for each term in index
  idf_token_dict = defaultdict(float)       # returns zero if key is not found in dictionary
  num_docs_corpus = len(index.dl)
  for token in index.df.keys():
    idf_token_dict[token] = math.log(num_docs_corpus / index.df[token], 10)
  return idf_token_dict

##############################
def group_links(posting_list):
  # Auxiliary function to group anchor's links
  link_counter = {}
  for pair in posting_list:
    link_counter[pair[0]] = link_counter.get(pair[0], 0) + 1
  return sorted([(key, link_counter[key]) for key in link_counter.keys()], key=itemgetter(1), reverse=True)

In [None]:
# Builds a dictionary of doc_id and its title
id_title_pairs = title_doc_data.map(lambda row: (row[0], row[1]))
id_title_dict = id_title_pairs.collectAsMap()

with open('titles.pkl', 'wb') as f:
    pickle.dump(id_title_dict, f)

bucket = client.bucket(bucket_name)
blob = bucket.blob('postings_gcp/titles.pkl')
blob.upload_from_filename('titles.pkl')

In [15]:
# Builds title inverted index
word_counts = title_doc_data.flatMap(lambda x: word_count(x[1], x[0], apply_stemming, stemmer))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings_filtered = postings.filter(lambda x: len(x[1]) > 0)    # filter out blank titles after tokenization
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
_ = partition_postings_and_write(postings_filtered, 'postings_gcp/title').collect()

                                                                                

In [16]:
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/title/'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_dict
# write the global stats out
inverted_title.write_index('postings_gcp', 'title_index', bucket_name)

                                                                                

In [None]:
# Builds body inverted index
word_counts = body_doc_data.flatMap(lambda x: word_count(x[1], x[0], apply_stemming, stemmer))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings_filtered = postings.filter(lambda x: len(x[1]) > 50)           # filter out rare words
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()

# Calculates length of all documents puts them in dictionary of doc_id and its length, in addition
# calculates avg. length of all documents (to be used by BM25 ranking function)
index_body_tok = body_doc_data.map(lambda row: (row[0], tokenize(row[1], apply_stemming, stemmer)))
index_body_tok = index_body_tok.filter(lambda row: len(row[1]) > 0)
w2dl = index_body_tok.map(lambda x: (x[0], len(x[1])))
dl_dict = w2dl.collectAsMap()

sum_and_count = w2dl.map(lambda row: (row[1], 1)).reduce(lambda a, b: (a[0] + b[0], a[1] + b[1]))
sum_dl_body = sum_and_count[0]
N_docs_body = sum_and_count[1]
AVG_DL_body = sum_dl_body / N_docs_body

_ = partition_postings_and_write(postings_filtered, 'postings_gcp/body').collect()

In [None]:
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/body/'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted_body = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_body.posting_locs = super_posting_locs
# Add token - df dictionary to the inverted index
inverted_body.df = w2df_dict
# Add doc_id - doc length dictionary to the inverted index
inverted_body.dl = dl_dict
# Average length of doc in inverted index
inverted_body.avg_dl = AVG_DL_body
# write the global stats out
inverted_body.write_index('postings_gcp', 'body_index', bucket_name)

In [None]:
# Builds dictionary of term and it's IDF value for body inverted index
term_idf_body_dict = calculate_idf(inverted_body)

with open('term_idf_body.pkl', 'wb') as f:
    pickle.dump(term_idf_body_dict, f)

bucket = client.bucket(bucket_name)
blob = bucket.blob('postings_gcp/term_idf_body.pkl')
blob.upload_from_filename('term_idf_body.pkl')

In [None]:
# Builds anchor inverted index
anchor_doc_data = anchor_doc_data_raw.flatMap(lambda x: x[1])
word_counts = anchor_doc_data.flatMap(lambda x: word_count(x[1], x[0], apply_stemming, stemmer))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings_filtered = postings.filter(lambda x: len(x[1]) > 0)    # filter out empty link lists after tokenization
grouped_postings = postings_filtered.map(lambda x: (x[0], group_links(x[1])))
w2df = calculate_df(grouped_postings)
w2df_dict = w2df.collectAsMap()
_ = partition_postings_and_write(grouped_postings, 'postings_gcp/anchor').collect()

In [None]:
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp/anchor/'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted_anchor = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anchor.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted_anchor.df = w2df_dict
# write the global stats out
inverted_anchor.write_index('postings_gcp', 'anchor_index', bucket_name)

In [None]:
def generate_graph(pages):
  temp_edges = pages.map(lambda x: [(x[0], y[0]) for y in x[1]])
  edges = temp_edges.flatMap(lambda x: x).distinct()
  temp_vertices = edges.map(lambda x: [(x[0], x[0]), (x[1], x[1])])
  vertices = temp_vertices.flatMap(lambda x: x).distinct()
  return edges, vertices

In [None]:
# construct the graph 
edges, vertices = generate_graph(anchor_doc_data_raw)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr.show()

In [None]:
# turns pr into dictionary of doc_id and its PageRank
pgrank_rdd = pr.rdd
id_pgrank_pairs = pgrank_rdd.map(lambda row: (row[0], row[1]))
id_pgrank_dict = id_pgrank_pairs.collectAsMap()

with open('pagerank.pkl', 'wb') as f:
    pickle.dump(id_pgrank_dict, f)

bucket = client.bucket(bucket_name)
blob = bucket.blob('postings_gcp/pagerank.pkl')
blob.upload_from_filename('pagerank.pkl')

In [None]:
# Using user page views (as opposed to spiders and automated traffic) for the month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
# Convert counter to defaultdict
page_view_dict = defaultdict(int)
for doc_id, view in wid2pv.items():
  page_view_dict[doc_id] = view

with open("pageview.pkl", 'wb') as f:
  pickle.dump(page_view_dict, f)

bucket = client.bucket(bucket_name)
blob = bucket.blob('postings_gcp/pageview.pkl')
blob.upload_from_filename('pageview.pkl')