***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [None]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

# Imports & Setup

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import json

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
import pyspark.sql.functions as pyfunc
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'inverted_index_creation' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
#     print(b.name)
    if ((b.name != 'graphframes.sh') and ("index/" not in b.name ) and ("postings_gcp/" not in b.name) and ("page_rank/" not in b.name)):
        paths.append(full_path+b.name)
paths

## Extracting title, anchor, text

In [None]:
parquetFile = spark.read.parquet(*paths)

doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd
pages_links = parquetFile.select("anchor_text","id").rdd

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [None]:
# Count number of wiki pages
parquetFile.count()

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]
all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

In [None]:
# with open('queries_train_partial.json') as f:
#     data = json.load(f)

# filtered_sorted_query_tokens=[token.group() for token in RE_WORD.finditer(''.join(data.keys()).lower())]
# query_trains_without_stopwords=set([word for word in filtered_sorted_query_tokens if word not in all_stopwords])

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

def word_count(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
#     filtered_tokens = [word for word in tokens if (word not in all_stopwords) and (word in query_trains_without_stopwords)]
    filtered_tokens = [word for word in tokens if (word not in all_stopwords)]
    word_tf = Counter(filtered_tokens)
    return [(word,(id,tf)) for word,tf in word_tf.items()]

def reduce_word_counts(unsorted_pl):
    return(sorted(unsorted_pl, key = lambda x: x[0], reverse=False))

def anchor_word_count(text,id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered_tokens = [word for word in tokens if (word not in all_stopwords)]
    return [(id, filtered_tokens)]

def reduce_anchor_word_count(anchor_text):
    return anchor_text

def doc_2_len_doc(text, doc_id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered_tokens = [word for word in tokens if (word not in all_stopwords)]
    return [(doc_id, len(filtered_tokens))]

def reduce_doc_2_len_doc(doc_len):
    return doc_len

def doc_to_title(title, doc_id):
    return [(doc_id, title)]

def reduce_doc_to_title(doc_len):
    return doc_len

def calculate_df(postings):
    return postings.mapValues(lambda token:len(token))

def partition_postings_and_write(postings,bucket_name):
    inverted_index = InvertedIndex()
    return postings.map(lambda posting:(token2bucket_id(posting[0]),(posting[0],posting[1]))).groupByKey().map(lambda t: (t[0],list(t[1]))).map(lambda np:inverted_index.write_a_posting_list(np,bucket_name))

## Title Inverted Index

In [None]:
# create inverted index for title
title_index_bucket_folder = full_path+'postings_gcp'
title_res = doc_title_pairs.flatMap(lambda x: word_count(x[0],x[1]))
title_postings = title_res.groupByKey().mapValues(reduce_word_counts)
w2df_title=calculate_df(title_postings)

w2df_dict_title = w2df_title.collectAsMap()
# stats = storage.Blob(bucket={full_path}, name=title_postings).exists(client)
# print(stats)
print(title_index_bucket_folder)
# try:
#     ! mkdir {title_index_drive_folder}
# except:
#     pass
title_posting_locs_list = partition_postings_and_write(title_postings,bucket_name).collect()
title_posting_locs_list

In [None]:
super_posting_locs_title = defaultdict(list)
for posting_loc in title_posting_locs_list:
    for k, v in posting_loc.items():
        super_posting_locs_title[k].extend(v)
super_posting_locs_title

In [None]:
! gsutil mv gs://inverted_index_creation/postings_gcp/* gs://inverted_index_creation/title_postings_gcp

In [None]:
# Create inverted index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_dict_title
# inverted_title.term_total = title_postings.map(lambda x: calculate_total_term(x[0], x[1]))
# inverted_title._N = parquetFile.count()
# write the global stats out
inverted_title.write_index('.', 'title_inverted_index')
index_src = "title_inverted_index.pkl"
index_dst = f'gs://{bucket_name}/title_postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
from inverted_index_gcp import InvertedIndex, read_posting_list
inverted_text = InvertedIndex.read_index(base_dir= '/home/dataproc/',name='title_inverted_index')

In [None]:
inverted_text.posting_locs

## Text Inverted Index

In [None]:
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
posting_locs_list = partition_postings_and_write(postings_filtered,bucket_name).collect()
print(posting_locs_list)

In [None]:
# merge the posting locations into a single dict
super_posting_locs = defaultdict(list)
for posting_loc in posting_locs_list:
    for k, v in posting_loc.items():
        super_posting_locs[k].extend(v)

In [None]:
! gsutil mv gs://inverted_index_creation/postings_gcp/* gs://inverted_index_creation/text_postings_gcp

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'text_inverted_index')
index_src = "text_inverted_index.pkl"
index_dst = f'gs://{bucket_name}/text_postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

## Anchor Inverted Index

In [None]:
def combine_dict_items(lst):
    dic={}
    for docs,value in lst:
        if docs in dic.keys():
            dic[docs]+=value
        else:
            dic[docs]=value
    return list(zip(dic.keys(),dic.values()))

In [None]:
def calc_total_word_tf(word,posting_list):
    total=0
    for _,tf in posting_list:
        total+=tf
    return word,total

In [None]:
# anchor_rdd = pages_links.flatMap(lambda x: x[0]).flatMap(lambda x: anchor_word_count(x[1],x[0]))
# anchor_df = anchor_rdd.groupByKey().mapValues(reduce_anchor_word_count)
anchor_rdd = pages_links.flatMap(lambda y: map(lambda x: word_count(x[1],(y[1],x[0])),y[0]))
anchor_word_counts = anchor_rdd.flatMap(lambda x:x)
anchor_words_not_unif = anchor_word_counts.mapValues(lambda y:(y[0][1],y[1]))

In [None]:
anchor_postings = anchor_words_not_unif.groupByKey().mapValues(lambda x:combine_dict_items(sorted(list(x),key=lambda x:x[0])))
# anchor_postings_filtered = anchor_postings.filter(lambda x: len(x[1])>50)
# anchor_w2df = calculate_df(anchor_postings_filtered)
anchor_w2df = calculate_df(anchor_postings)
anchor_w2df_dict = anchor_w2df.collectAsMap()


In [None]:
anchor_posting_locs_list = partition_postings_and_write(anchor_postings_filtered, bucket_name).collect()

In [None]:
anchor_super_posting_locs = defaultdict(list)
for posting_loc in anchor_posting_locs_list:
    for k, v in posting_loc.items():
        anchor_super_posting_locs[k].extend(v)

In [None]:
! gsutil mv gs://inverted_index_creation/postings_gcp/* gs://inverted_index_creation/anchor_postings_gcp

In [None]:
inverted = InvertedIndex()
inverted.posting_locs = anchor_super_posting_locs
inverted.df = anchor_w2df_dict
totals = anchor_postings.map(lambda x:calc_total_word_tf(x[0],x[1]))
inverted.term_total=totals.collectAsMap()

inverted.write_index('.', 'anchor_inverted_index')
index_src = "anchor_inverted_index.pkl"
index_dst = f'gs://{bucket_name}/anchor_postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

## Dump pkl to index folder


### method for creating a pkl and persist to bucket

In [None]:
import pickle
def write_pkl_and_persist(index_src,persisted_dict):
#     index_src ="doc_to_title.pkl"
    with open(index_src, 'wb') as out_f:
        pickle.dump(persisted_dict, out_f)
    index_dst = f'gs://{bucket_name}/index/{index_src}'
    !gsutil cp $index_src $index_dst

## Doc_id to title


In [None]:
#create doc_id to title dict
doc_to_title_rdd = doc_title_pairs.flatMap(lambda x: doc_to_title(x[0],x[1]))
doc_id_title_rdd = doc_to_title_rdd.groupByKey().mapValues(reduce_doc_to_title)
doc_title_df = doc_id_title_rdd.toDF().toPandas()
doc_title_df['title'] = doc_title_df['_2'].apply(lambda x: x[0][0])

In [None]:
doc_id_to_title_dict = dict(zip(doc_title_df['_1'], doc_title_df['title']))
# import pickle
# index_src ="doc_to_title.pkl"
# with open(index_src, 'wb') as out_f:
#     pickle.dump(doc_id_to_title_dict, out_f)
# index_dst = f'gs://{bucket_name}/{index_src}'
# !gsutil cp $index_src $index_dst
write_pkl_and_persist("doc_to_title.pkl",doc_id_to_title_dict)

## Doc_id to Len dict


In [None]:
doc_to_doc_len = doc_text_pairs.flatMap(lambda x: doc_2_len_doc(x[0],x[1]))
doc_lens = doc_to_doc_len.groupByKey().mapValues(reduce_doc_2_len_doc)

In [None]:
count_df = doc_lens.toDF().toPandas()
count_df['count'] = count_df['_2'].apply(lambda x: x[0][0])

In [None]:
count_dict = dict(zip(count_df['_1'], count_df['count']))
# with open(path_to_drive+"/doc_len_dict.pkl", 'wb') as out_f:
#   pickle.dump(count_dict, out_f)
write_pkl_and_persist("doc_len_dict.pkl",count_dict)

## Page Rank

In [None]:
# we needed more workers so we updated the worker amount
# !gcloud dataproc clusters update cluster-7b90 \
#     --region=us-central1 \
#     --num-workers=4

In [None]:
# Page Rank
def generate_graph(pages):
#     vertices = pages.flatMap(lambda page:[(anchor[0],) for anchor in page[1]]+[(page[0],)]).distinct()
#     edges = pages.flatMap(lambda page:[(page[0],anchor[0]) for anchor in page[1]]).distinct()
    edges = pages.map(lambda page: [(page[0], link_id.id) for link_id in page[1]]).flatMap(lambda ls: ls).distinct()
    vertices = edges.map(lambda edge: [edge[0],edge[1]]).flatMap(lambda ls: ls).distinct().map(lambda x: (x, ))
    return edges, vertices

In [None]:
# construct the graph 
# pages_links_reversed = parquetFile.limit(1000).select("id","anchor_text").rdd
pages_links_reversed = parquetFile.select("id","anchor_text").rdd
edges, vertices = generate_graph(pages_links_reversed)

In [None]:
page_rank_drive_folder = full_path+'/page_rank'
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')


In [None]:
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=10)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(pyfunc.col('pagerank').desc())
pr.repartition(1).write.csv(page_rank_drive_folder, compression="gzip")
pr.show()

## word_idf creation


In [None]:
from inverted_index_gcp import InvertedIndex, read_posting_list
inverted_text = InvertedIndex.read_index(base_dir= '/home/dataproc/',name='text_inverted_index')

In [None]:
! gsutil -m cp -r "gs://inverted_index_creation/anchor_postings_gcp" .
! gsutil -m cp -r "gs://inverted_index_creation/text_postings_gcp" .
! gsutil -m cp -r "gs://inverted_index_creation/anchor_postings_gcp" .

In [None]:
N = 6348910
# N=len(inverted_text.df)
# print(N)
import numpy as np
idf_calc_dict = defaultdict(float)
for w, w_df in inverted_text.df.items(): 
    idf_calc_dict[w] = np.log(((N - w_df + 0.5) / (w_df + 0.5)) + 1)
idf_calc_dict

In [None]:
def doc_nf(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered_tokens = [tok for tok in tokens if tok not in all_stopwords]
    doc_len = len(filtered_tokens)
    count = Counter(filtered_tokens)
    try:
        nf_score = np.round(1/np.sum([np.power((wc/doc_len) * idf_calc_dict.get(term, 0), 2) for 
                                    term, wc in count.items()]),5)
    except KeyError as e:
        print(id)
        print(e)
        print(count)
    return [(id,float(nf_score))]

def reduce_doc_nf(doc_nf):
    return doc_nf

In [None]:
doc_nf_spark = doc_text_pairs.flatMap(lambda x: doc_nf(x[0],x[1]))
doc_nf_rdd = doc_nf_spark.groupByKey().mapValues(reduce_doc_nf)

# doc_nf_df = doc_nf_rdd.toDF().toPandas()
# doc_nf_df

In [None]:
doc_nf_pd = doc_nf_rdd.toDF().toPandas()
doc_nf_pd['nf'] = doc_nf_pd['_2'].apply(lambda x: x[0][0])
doc_nf_pd

In [None]:
doc_id_nf_dict = dict(zip(doc_nf_pd['_1'], doc_nf_pd['nf']))
# with open(path_to_drive+"/doc_len_dict.pkl", 'wb') as out_f:
#   pickle.dump(count_dict, out_f)
write_pkl_and_persist("doc_id_nf.pkl",doc_id_nf_dict)

## PageView

In [None]:
#  Paths
from pathlib import Path
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})
# # write out the counter as binary file (pickle it)
# with open(pv_clean, 'wb') as f:
#     pickle.dump(wid2pv, f)
write_pkl_and_persist('page_views.pkl',pvclean)
# read in the counter
# with open(pv_clean, 'rb') as f:
#   wid2pv = pickle.loads(f.read())

In [None]:
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})

In [None]:
# write out the counter as binary file (pickle it)
# with open(pv_clean, 'wb') as f:
#     pickle.dump(wid2pv, f)
write_pkl_and_persist('page_views.pkl',wid2pv)