In [None]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

# Imports & Setup

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import io
import gzip
import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()
import numpy as np
nltk.download('stopwords')
!pip install gcsfs
import gcsfs
from contextlib import closing

In [None]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'ryan316597145' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and b.name.startswith("multi"):
        paths.append(full_path+b.name)
        

# Building inverted indexes

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [None]:
parquetFile = spark.read.parquet(*paths)


In [None]:
# making 3 files relevant for each inverted undex
doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd
text_pairs = parquetFile.rdd.filter(lambda x: x[3] !=[] and type(x[0]) == int).map(lambda row: (row[3]))
#doc_anchor_pairs = parquetFile.rdd.map(lambda row: (row[0],row[3])).flatMapValues(lambda x: (x[0:])).map(lambda x:(x[1][1], (x[0],x[1][0]))) #making list of tuples: [(anchor_text, (source doc_id, dest doc_id))..]
#doc_anchor_pairs = parquetFile.rdd.filter(lambda x: x[3] !=[]).map(lambda row: (row[0],row[3])).flatMapValues(lambda x: (x[0:])).filter(lambda x: x[1][0] is not None and type(x[1][0]) != tuple and x[0] is not None and type(x[0]) != tuple).map(lambda x:(x[1][1], (x[0],x[1][0])))

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [None]:
# Count number of wiki pages
parquetFile.count()

Let's import the inverted index module:

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex
from inverted_index_gcp import MultiFileReader

#### Functions used for making inverted indexes

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS
# PLACE YOUR CODE HERE
def termcount(text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered = [i for i in tokens if i not in all_stopwords]
    counts = Counter(filtered)
    return counts.items()

def doc_length(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered = [i for i in tokens if i not in all_stopwords]
    result = [(id, len(filtered))]
    return result

def word_count(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered = [i for i in tokens if i not in all_stopwords]
    result = [(i[0], (id, i[1])) for i in Counter(filtered).items()]
    return result

def reduce_word_counts(unsorted_pl):
    return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
    return postings.map(lambda x: (x[0], len(x[1])))

# function for every inverted index with different hash values
def partition_postings_and_write0(postings):
    postingsmap = postings.map(lambda x: (token2bucket_id(x[0]),(x[0], x[1])))
    joinlist = postingsmap.groupByKey().mapValues(list)
    answer = joinlist.map(lambda x: (InvertedIndex.write_a_posting_list(x, bucket_name, 'body')))
    return answer
def partition_postings_and_write1(postings):
    postingsmap = postings.map(lambda x: (token2bucket_id(x[0])+124,(x[0], x[1])))
    joinlist = postingsmap.groupByKey().mapValues(list)
    answer = joinlist.map(lambda x: (InvertedIndex.write_a_posting_list(x, bucket_name, 'title')))
    return answer
def partition_postings_and_write2(postings):
    postingsmap = postings.map(lambda x: (token2bucket_id(x[0])+248,(x[0], x[1])))
    joinlist = postingsmap.groupByKey().mapValues(list)
    answer = joinlist.map(lambda x: (InvertedIndex.write_a_posting_list(x, bucket_name, 'anchor')))
    return answer
def partition_postings_and_write3(postings):
    postingsmap = postings.map(lambda x: (token2bucket_id(x[0])+372,(x[0], x[1])))
    joinlist = postingsmap.groupByKey().mapValues(list)
    answer = joinlist.map(lambda x: (InvertedIndex.write_a_posting_list(x, bucket_name, 'titlenames')))
    return answer

### Body Index

In [None]:
## Body Index

# time the index creation time
t_start = time()
# counting the normalizing factor for each doc_id
docs_normal = doc_text_pairs.map(lambda x: (x[1], 1/np.sqrt(np.sum([i[1]**2 for i in termcount(x[0])]))))
docs_normal_dict = docs_normal.collectAsMap()
# docs length map
doc_len = doc_text_pairs.flatMap(lambda x: doc_length(x[0], x[1]))
doc_length_dict = doc_len.collectAsMap()
# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write0(postings_filtered).collect()
index_const_time = time() - t_start

In [None]:
# test index construction time
print("body index took", index_const_time)

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs1 = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp_body'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs1[k].extend(v)

In [None]:
# Create inverted index instance
Body_Index = InvertedIndex('body')
# Add docs normalizing factor
Body_Index.docs_normal = docs_normal_dict
# Adding the posting locations dictionary to the inverted index
Body_Index.posting_locs = super_posting_locs1
# Add the token - df dictionary to the inverted index
Body_Index.df = w2df_dict
# Add the total length of each document
Body_Index.doc_length = doc_length_dict
# Add number of docs in corpus
Body_Index.number_of_docs = len(doc_length_dict.keys())
# write the global stats out
Body_Index.write_index('.', 'Body_Index')
# upload to gs
index_src = "Body_Index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_body/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
 !gsutil ls -lh $index_dst

### Title Index

In [None]:
## Title index

# time the index creation time
t_start = time()
# docs length map
doc_len = doc_title_pairs.flatMap(lambda x: doc_length(x[0], x[1]))
doc_length_dict = doc_len.collectAsMap()
# word counts map
word_counts = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# not filtering postings and calculate df
postings_filtered = postings
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write1(postings_filtered).collect()
index_const_time = time() - t_start

In [None]:
# test index construction time
print("title index took", index_const_time)

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs2 = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp_title'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs2[k].extend(v)

In [None]:
# Create inverted index instance
Title_Index = InvertedIndex('title')
# Adding the posting locations dictionary to the inverted index
Title_Index.posting_locs = super_posting_locs2
# Add the token - df dictionary to the inverted index
Title_Index.df = w2df_dict
# Add the total length of each document
Title_Index.doc_length = doc_length_dict
# write the global stats out
Title_Index.write_index('.', 'Title_Index')
# upload to gs
index_src = "Title_Index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_title/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
 !gsutil ls -lh $index_dst

### Anchor Index

In [None]:
## Anchor index

# time the index creation time
t_start = time()
anchor_text = text_pairs.flatMap(lambda x: x)
anchor_id_text = anchor_text.map(lambda x: (x.text, x.id))
# word counts map
counts = anchor_id_text.flatMap(lambda x: word_count(x[0], x[1]))
postings = counts.groupByKey().mapValues(reduce_word_counts)
# Example: [('political', (23040, 1)), ('philosophy', (23040, 1)), ('movement', (99232, 1))]
w2df = calculate_df(postings)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write2(postings).collect()
index_const_time = time() - t_start

In [None]:
# test index construction time
print("anchor index took", index_const_time)

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs3 = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp_anchor'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs3[k].extend(v)

In [None]:
# Create inverted index instance
Anchor_Index = InvertedIndex('anchor')
# Adding the posting locations dictionary to the inverted index
Anchor_Index.posting_locs = super_posting_locs3
# Add the token - df dictionary to the inverted index
Anchor_Index.df = w2df_dict
# write the global stats out
Anchor_Index.write_index('.', 'Anchor_Index')
# upload to gs
index_src = "Anchor_Index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_anchor/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
 !gsutil ls -lh $index_dst

### Titles Names' Index

In [None]:
## Titles names' index
t_start = time()
newrdd = parquetFile.rdd
keypair_rdd = newrdd.map(lambda x : (x[0],x[1]))
parquetFile_dict = keypair_rdd.collectAsMap()
index_const_time = time() - t_start

In [None]:
# test index construction time
print("docs names' index took", index_const_time)


In [None]:
# Create inverted index instance
TitleNames_Index = InvertedIndex('titlenames')
# Add the docs names
TitleNames_Index.doc_name = parquetFile_dict
# write the global stats out
TitleNames_Index.write_index('.', 'TitleNames_Index')
# upload to gs
index_src = "TitleNames_Index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp_titlenames/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
 !gsutil ls -lh $index_dst

### Importing the Indexes

In [None]:
# Importing Body Index from Bucket 
file_name = 'Body_Index.pickle'
# Create a GCSFileSystem object
fs = gcsfs.GCSFileSystem()
# Open the file from the bucket
bucket_name = 'ryan316597145'
with fs.open(f'gs://ryan316597145/postings_gcp_body/Body_Index.pkl', 'rb') as f:
    # Load the pickle file
    Body_Index = pickle.load(f)
print(Body_Index)

In [None]:
# Importing Title Index from Bucket 
file_name = 'Title_Index.pickle'
# Create a GCSFileSystem object
fs = gcsfs.GCSFileSystem()
# Open the file from the bucket
bucket_name = 'ryan316597145'
with fs.open(f'gs://ryan316597145/postings_gcp_title/Title_Index.pkl', 'rb') as f:
    # Load the pickle file
    Title_Index = pickle.load(f)
print(Title_Index)

In [None]:
# Importing Anchor Index from Bucket 
file_name = 'Anchor_Index.pkl'
# Create a GCSFileSystem object
fs = gcsfs.GCSFileSystem()
# Open the file from the bucket
bucket_name = 'ryan316597145'
with fs.open(f'gs://ryan316597145/postings_gcp_anchor/Anchor_Index.pkl', 'rb') as f:
    # Load the pickle file
    Anchor_Index = pickle.load(f)
print(Anchor_Index)

In [None]:
# Importing Titles names' Index from Bucket 
file_name = 'Titlesnames_Index.pickle'
# Create a GCSFileSystem object
fs = gcsfs.GCSFileSystem()
# Open the file from the bucket
bucket_name = 'ryan316597145'
with fs.open(f'gs://ryan316597145/postings_gcp_titlenames/TitleNames_Index.pkl', 'rb') as f:
    # Load the pickle file
    Titlesnames_Index = pickle.load(f)
print(Titlesnames_Index)

### Creating PageRank & PageViews Dictionary

In [None]:
# Create a client to interact with GCP storage
client = storage.Client()
# Get a reference to the desired bucket
bucket = client.get_bucket("ryan316597145")
# Get a reference to the desired file
blob = bucket.blob("pr/part-00000-4e51edea-48bb-4366-97ed-4102b6ea0c7f-c000.csv.gz")
# Download the contents of the file as bytes
content = blob.download_as_string()

with io.BytesIO(content) as f:
    with gzip.GzipFile(fileobj=f) as gz:
        content = gz.read().decode()
    data = pd.read_csv(io.StringIO(content), header=None, names=["id", "PageRank"])

PageRank_dict = dict(zip(data.id, data.PageRank))

In [None]:
# PageViews dictionary

# Create a client to interact with GCP storage
client = storage.Client()
# Get a reference to the desired bucket
bucket = client.get_bucket("ryan316597145")
# Get a reference to the desired file
temp_blob = bucket.blob("pageviews/pageviews-202108-user-4dedup.txt")
clean_blob = bucket.blob("pageviews/pageviews-202108-user.pkl")
# Download the contents of the file as bytes
temp_content = temp_blob.download_as_string()

# Create the Counter object
wid2pv = Counter()
for line in temp_content.decode().split('\n'):
    parts = line.split(' ')
    if parts[0] and parts[1]:
        wid2pv.update({int(parts[0]): int(parts[1])})

# write out the counter as binary file (pickle it)
with io.BytesIO() as f:
    pickle.dump(wid2pv, f)
    f.seek(0)
    clean_blob.upload_from_file(f, content_type='application/octet-stream')

# read in the counter
with io.BytesIO() as f:
    clean_blob.download_to_file(f)
    f.seek(0)
    wid2pv = pickle.load(f)
    
# the dict name: wid2pv

### Creating Association Model

In [None]:
!pip install gensim==3.5.0
import gcsfs

In [None]:
ֳ# word2vec 
# saving the file
from gensim.models import Word2Vec, KeyedVectors   
model.wv.save_word2vec_format('model.bin', binary=True)

In [None]:
# uploading to bucket
!gsutil cp model.bin gs://ryan316597145/

In [None]:
# Uploading the model from bucket
storage_client = storage.Client()
bucket = storage_client.bucket('ryan316597145')
blob = bucket.blob('model.bin')
import tempfile
with tempfile.NamedTemporaryFile() as temp:
    blob.download_to_file(temp)
    temp.seek(0)
    # Load the word2vec model
    from gensim.models import KeyedVectors
    newmodel = KeyedVectors.load_word2vec_format(temp.name, binary=True)

In [None]:
# size of input data
!gsutil du -sh "gs://wikidata_preprocessed/"

In [None]:
!gsutil du -ch gs://ryan316597145