## **Imports & Setup**

In [None]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import math
from nltk.stem import WordNetLemmatizer
import builtins
import numpy as np
from nltk.tokenize import word_tokenize

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')
nltk.download('wordnet')



In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
bucket_name = '209234103_final'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if "multistream" in b.name:
        paths.append(full_path+b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)

                                                                                

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

## **Set Data**

In [None]:
Body = parquetFile.select("text", "id").rdd
Title = parquetFile.select("title", "id").rdd
Pages_Links = parquetFile.select("id","anchor_text").rdd

### Generate Graph

In [None]:
# Put your `generate_graph` function here
def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''

    edges = pages.flatMap(lambda x: set([(x[0],link[0]) for link in x[1]]))
    vertices = edges.flatMap(lambda x: [(x[0],),(x[1],)]).distinct()
    return edges, vertices


## **PageRank Dict**

In [None]:
# construct the graph
edges, vertices = generate_graph(Pages_Links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank").rdd
# pr = pr.sort(col('pagerank').desc())
PageRank = pr.collectAsMap()

PageRank_file = 'PageRank_index.pkl'
with open(PageRank_file,'wb') as f:
    pickle.dump(PageRank,f)

index_src = "PageRank_index.pkl"
index_dst = f'gs://{bucket_name}/PageRank/{index_src}'
!gsutil cp $index_src $index_dst
!gsutil ls -lh $index_dst

## **Documents to Title Dict**

In [None]:
doc_to_title_dict = parquetFile.select("id", "title").rdd
doc_to_title_dict = doc_to_title_dict.collectAsMap()
doc_to_title_dict_file = 'doc_to_title_dict_index.pkl'
with open(doc_to_title_dict_file,'wb') as f:
    pickle.dump(doc_to_title_dict,f)

index_src = "doc_to_title_dict_index.pkl"
index_dst = f'gs://{bucket_name}/doc_to_title_dict/{index_src}'
!gsutil cp $index_src $index_dst
!gsutil ls -lh $index_dst

## **PageView Dict**

In [None]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
PageViews_file = 'PageViews_index.pkl'
with open(PageViews_file, 'wb') as f:
    pickle.dump(wid2pv, f)
    
index_src = "PageViews_index.pkl"
index_dst = f'gs://{bucket_name}/PageViews/{index_src}'
!gsutil cp $index_src $index_dst
!gsutil ls -lh $index_dst
# read in the _clean, 'rb') as f:
#   wid2pv = pcounter
# with open(pvickle.loads(f.read())

## **Word2Vec Model**

In [None]:
!pip install gensim

In [None]:
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
import gensim.downloader as api
import json
info = api.info()

In [None]:
wiki_info = api.info('glove-wiki-gigaword-300')
json.dumps(wiki_info, indent=4)
api.load('glove-wiki-gigaword-300', return_path=True)
Word2VEC_Model = api.load("glove-wiki-gigaword-300")

In [None]:
Word2VEC_Model_file = 'Word2VEC_Model_index.pkl'
with open(Word2VEC_Model_file, 'wb') as f:
    pickle.dump(Word2VEC_Model, f)
    
index_src = "Word2VEC_Model_index.pkl"
index_dst = f'gs://{bucket_name}/Word2VEC_Model/{index_src}'
!gsutil cp $index_src $index_dst
!gsutil ls -lh $index_dst