In [None]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-fbc8  GCE       2                                             RUNNING  us-central1-a


In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
bucket_name = 'new315071910'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)

for b in blobs:
    if b.name.endswith('.parquet'):
        paths.append(full_path+b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

                                                                                

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

def tokenize(text):
    list_of_tokens =  [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return list_of_tokens

In [None]:
DL_dict = doc_text_pairs.map(lambda row: (row[1], len(tokenize(row[0])))).collectAsMap()
with open('DL_dict.pkl', 'wb') as handle:
   pickle.dump(DL_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)
DL_dict_src = "DL_dict.pkl"
DL_dict_dst = f'gs://{bucket_name}/{DL_dict_src}'
!gsutil cp $DL_dict_src $DL_dict_dst

In [None]:
parquetFile = spark.read.parquet(*paths)
doc_title_pairs_for_dict = parquetFile.select("id", "title").rdd
docs_titles_big_dict = doc_title_pairs_for_dict.collectAsMap()
with open('docs_to_title_dict.pkl', 'wb') as handle:
   pickle.dump(docs_titles_big_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

index_src_title = "docs_to_title_dict.pkl"
index_dst_title = f'gs://{bucket_name}/{index_src_title}'
!gsutil cp $index_src_title $index_dst_title

In [None]:
index_src_title = "docs_to_title_dict.pkl"
index_dst_title = f'gs://{bucket_name}/{index_src_title}'
!gsutil cp $index_src_title $index_dst_title

In [None]:
print(len(docs_titles_big_dict.keys()))

# Page Rank

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  # YOUR CODE HERE
  edges = pages.flatMap(lambda x: set([(x[0], i[0]) for i in x[1]]))
  vertices = edges.flatMap(lambda x: x).distinct().flatMap(lambda x: [[x]])
  return edges, vertices

In [None]:
pages_links = parquetFile.select ("id","anchor_text").rdd
edges, vertices = generate_graph(pages_links)
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")

In [None]:
pageview = 'https://dumps.wikimedia.org/other/pageview_complete/2023/2023-12/pageviews-20231207-user.bz2'
p = Path(pageview)
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pageview_pkl_file = f'{p.stem}.pkl'
!wget -N $pageview
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
with open(pageview_pkl_file, 'wb') as f:
  pickle.dump(wid2pv, f)

bucket_name = "315071910"
bucket = client.bucket(bucket_name)
blob = bucket.blob(pageview_pkl_file)
blob.upload_from_filename(pageview_pkl_file)
os.remove(pageview_pkl_file)
index_src = pageview_pkl_file
index_dst = f'gs://{bucket_name}/{index_src}'
!gsutil cp $index_src $index_dstt