In [None]:
from builtins import *
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes



In [None]:
from graphframes import *
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles

In [None]:
# TODO: Put your bucket name below and make sure you can access it without an error
bucket_name = 'fadlonbucket'

client = storage.Client()
blobs = client.list_blobs(bucket_name, prefix='postings_gcp')
# # print all the blobs in the bucket
# for b in blobs:
#     print(b.name)

In [None]:
import pickle

def save_index_to_pickle(name, object_to_save):
  with open(f"{name}.pkl", "wb") as f:
      pickle.dump(object_to_save, f)

  client = storage.Client()
  bucket = client.bucket(bucket_name)
  blob_posting_locs = bucket.blob(f"postings_gcp/{name}.pkl")
  blob_posting_locs.upload_from_filename(f"{name}.pkl")

# **Page Rank**

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
  '''
  edges = pages.flatMapValues(lambda x: x).map(lambda x: (x[0], x[1][0])).distinct()
  page_id = pages.map(lambda x: x[0])
  links_id = pages.flatMap(lambda x: x[1]).map(lambda x: x[0])
  vertices = page_id.union(links_id).distinct().map(lambda x: [x])
  return edges, vertices

In [None]:
pages_links = spark.read.parquet("gs://wikidata_preprocessed/*").select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")

pr.show()



+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750|  9917.88952878659|
|  10568| 5387.609470096126|
|  32927| 5284.298441243097|
|  30680|5130.3860058361915|
|5843419| 4959.648354746549|
|  68253|4771.2799096290955|
|  31717| 4488.233081199125|
|  11867| 4148.154882176618|
|  14533|3998.1437395714415|
| 645042|3533.1092975401466|
|  17867|3247.4607632514617|
|5042916|2993.2014451081495|
|4689264|2983.5765511544096|
|  14532| 2935.978529024251|
|  25391|2904.7648285829778|
|   5405|2892.6298433630336|
|4764461|2835.5565581932315|
|  15573| 2785.033493615905|
|   9316|2783.2072557286947|
|8569916|2776.4509667609464|
+-------+------------------+
only showing top 20 rows



                                                                                

In [None]:
pr_dict = pr.toPandas().set_index('id').T.to_dict('list')

In [None]:
save_index_to_pickle(pr_dict)

# **Page View**

In [None]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})

In [None]:
save_index_to_pickle(wid2pv)