In [None]:
!gcloud dataproc clusters list --region us-central1

In [None]:
# !pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import math

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

In [16]:
import findspark
findspark.init() 
from pyspark.sql import *
import pyspark.sql.functions as pyfunc
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
bucket_name = '201640042_project' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('parquet'):
        paths.append(full_path+b.name)

In [6]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd
doc_title_pairs = parquetFile.select("title", "id").rdd
doc_anchor_pairs = parquetFile.select("anchor_text", "id").rdd
n_pages = parquetFile.count()

                                                                                

In [7]:
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [8]:
from inverted_index_gcp import *

In [9]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124

# # Opening JSON file
# import json
# with open('/home/dataproc/queries_train.json') as json_file:
#     data = json.load(json_file)
# train_tokens = [token.group() for token in RE_WORD.finditer(' '.join(data.keys()).lower())]
# train_filtered_vocab = set([tok for tok in train_tokens if (tok not in all_stopwords)])

def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS


def get_tokens(text, doc_id):
    """Support function - performs token filtering per doc 
      Parameters:
    -----------
    text: str
      Text of one document
    id: int
      Document id
    Returns:
    --------
    List of tuples
      A list of (doc_id, filtered_token_list) pairs 
    """
    tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token != '']
    filtered_tokens = [tok for tok in tokens if (tok not in all_stopwords)]
    return [(doc_id, filtered_tokens)]


def word_count(doc_id, tokens):
    """ Count the frequency of each word in `text` (tf) that is not included in 
    `all_stopwords` and return entries that will go into our posting lists. 
    Parameters:
    -----------
    id: int
      Document id
    tokens: str
      list of tokens from document
    Returns:
    --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
    """
    doc_word_count = Counter(tokens)
    return [(tok, (doc_id, tf)) for tok, tf in doc_word_count.items()]

def get_doc_len(doc_id, tokens):
    """ Count document filtered length for storage in index as well as document vector length for RDD calculations
  Parameters:
  -----------
    id: int
      Document id
    tokens: str
      list of tokens from document
    Returns:
  --------
    List of tuples
      A list of (doc_id, doc_length) pairs
  """
    doc_word_count = Counter(tokens)
    doc_len = sum(doc_word_count.values())
    return [(doc_id, doc_len)]
    

def get_tfidf(tf, df, doc_len):
    if doc_len == 0:
        return 0.0
    tf_idf = tf/doc_len * math.log(n_pages / (df + 1), 2)
    return tf_idf

In [10]:
# get title dict for inverted index to quickly prepare results
id2title = doc_title_pairs.map(lambda x: (x[1], x[0])).collectAsMap()

                                                                                

In [None]:
# rdd_dict = {"full_body_index": doc_text_pairs, "full_title_index": doc_title_pairs, "full_anchor_index": doc_anchor_pairs}
rdd_dict = {"full_anchor_index": doc_anchor_pairs}
for rdd_name, rdd_pairs in rdd_dict.items():
    # for anchors - need to handle duplicated anchors found on different pages pointing to the same page with similar or different anchor text
    if (rdd_name == "full_anchor_index"):
        print("running anchorIndex")
        doc_tok = rdd_pairs.flatMap(lambda x: x[0]).flatMap(lambda x: get_tokens(x[1], x[0])).reduceByKey(lambda x,y: x+y).mapValues(set)
    else:
        doc_tok = rdd_pairs.flatMap(lambda x: get_tokens(x[0], x[1]))
    
    # calculate document length for later tf normalization
    doc_length = doc_tok.flatMap(lambda x: get_doc_len(x[0], x[1]))
    # calculate term frequency by document
    word_counts = doc_tok.flatMap(lambda x: word_count(x[0], x[1]))
    if (rdd_name == "full_body_index"):
        print("running bodyIndex with uncommon_words filter\n")
        postings = word_counts.groupByKey().mapValues(reduce_word_counts).filter(lambda x: len(x[1]) >= 50)
    else:
        postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    # Calculate term document frequency
    w2df = calculate_df(postings)
    # Calculate norm of each document - get tf from posting, df from w2df, doc length and calculate tfidf^2 per doc_id, term and sum by doc_id
    doc2norm = postings.flatMapValues(lambda x: x).leftOuterJoin(w2df) \
        .map(lambda x: (x[1][0][0], (x[0], x[1][0][1], x[1][1]))) \
        .leftOuterJoin(doc_length).map(lambda x: (x[0], (x[1][0][1], x[1][0][2], x[1][1]))) \
        .mapValues(lambda x: math.pow(get_tfidf(x[0], x[1], x[2]), 2)) \
        .reduceByKey(lambda x,y: x+y).mapValues(lambda x: round(x, 6))

    # save doc_id: (doc_norm, doc_len) RDD for cosin similarity calculations
    doc_data = doc2norm.join(doc_length)

    # write posting to bin files
    _ = partition_postings_and_write(postings, bucket_name, rdd_name).collect()
    print(f"PostingLocs created for {rdd_name}\n")
    
    super_posting_locs = defaultdict(list)
    for blob in client.list_blobs(bucket_name, prefix=rdd_name):
        if not blob.name.endswith("pickle"):
            continue
        with blob.open("rb") as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs[k].extend(v)
                
    # Create inverted index instance
    inverted = InvertedIndex()
    # Adding the posting locations dictionary to the inverted index
    inverted.posting_locs = super_posting_locs
    # Add the token - df dictionary to the inverted index
    inverted.df.update(w2df.collectAsMap())
    print(f"document frequancy created for {rdd_name}\n")
    # Count number of docs
    inverted._N = n_pages
    # Get each document length and norm
    inverted.doc_data.update(doc_data.collectAsMap())
    print(f"doc data created for {rdd_name}\n")
    # save titles to return results
    inverted.doc2title = id2title
    # write the global stats out
    inverted.write_index('.', rdd_name)
    print(f"{rdd_name} written\n")
    
    # upload to gs
    index_src = f"{rdd_name}.pkl"
    index_dst = f'gs://{bucket_name}/{rdd_name}/{index_src}'
    !gsutil cp $index_src $index_dst
    print(f"{rdd_name} uploaded to bucket\n")
    
    !gsutil ls -lh $index_dst

running anchorIndex


                                                                                

PostingLocs created for full_anchor_index



                                                                                

document frequancy created for full_anchor_index



                                                                                

doc data created for full_anchor_index

full_anchor_index written

Copying file://full_anchor_index.pkl [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

/ [1 files][364.6 MiB/364.6 MiB]                                                
Operation completed over 1 objects/364.6 MiB.                                    
full

In [12]:
print(f"done with {rdd_name}")

done with full_anchor_index


In [10]:
def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
  '''
    edges = pages.map(lambda page: [(page[0], link_id.id) for link_id in page[1]]).flatMap(lambda ls: ls).distinct()
    vertices = edges.map(lambda edge: [edge[0],edge[1]]).flatMap(lambda ls: ls).distinct().map(lambda x: (x, ))
    return edges, vertices

In [12]:
t_start = time()
# construct the graph 
edges, vertices = generate_graph(parquetFile.select("id", "anchor_text").rdd)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')


                                                                                

NameError: name 'GraphFrame' is not defined

In [None]:
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")


                                                                                

NameError: name 'col' is not defined

In [18]:
pr = pr.sort(pyfunc.col('pagerank').desc())
pr_time = time() - t_start
pr.show()



+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750| 9913.728782160777|
|  10568| 5385.349263642033|
|  32927| 5282.081575765273|
|  30680|  5128.23370960412|
|5843419| 4957.567686263868|
|  68253|4769.2782653551585|
|  31717|  4486.35018054831|
|  11867| 4146.414650912772|
|  14533|3996.4664408854983|
| 645042|3531.6270898037437|
|  17867| 3246.098390604142|
|5042916|  2991.94573916618|
|4689264|2982.3248830417483|
|  14532|2934.7468292031704|
|  25391| 2903.546223513398|
|   5405| 2891.416329154636|
|4764461|2834.3669873326617|
|  15573| 2783.865118158838|
|   9316|2782.0396464137684|
|8569916|2775.2861918400167|
+-------+------------------+
only showing top 20 rows



                                                                                

In [22]:
pagerank = pr.rdd.collectAsMap()

                                                                                

In [27]:
storage_client = storage.Client()
bucket = storage_client.bucket(f'{bucket_name}')
blob = bucket.blob("pagerank_org.pkl")
with blob.open("wb") as f:
    pickle.dump(pagerank, f)