In [1]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [2]:
import pyspark
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [4]:
spark

In [5]:
bucket_name = 'project_bucket_sy' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name, prefix = 'wiki_files/')
for b in blobs:
    if b.name != 'wiki_files/graphframes.sh' and b.name != 'wiki_files/':
        paths.append(full_path+b.name)


In [6]:
def idx_to_pickle(name, object_to_save):
    with open(f"{name}.pkl", "wb") as f:
        pickle.dump(object_to_save, f)

    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob_posting_locs = bucket.blob(f"postings_gcp/{name}.pkl")
    blob_posting_locs.upload_from_filename(f"{name}.pkl")

## Page Rank

In [7]:
def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
    Parameters:
    -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
    Returns:
    --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
    '''
    # YOUR CODE HERE
    e = pages.flatMapValues(lambda p:p)
    edges = e.map(lambda x: (x[0], x[1][0])).distinct()
    k, v = edges.keys(), edges.values()
    vertices = k.union(v).distinct().map(lambda v: [v])

    return edges, vertices

In [8]:
pages_links = spark.read.parquet(*paths).select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr.show()



+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750|457.41334873015523|
| 697535|202.85551757098216|
|5042916|196.35504895032895|
|  11867|196.01560639219883|
|  10568|186.05472770354592|
|  32927|171.91829684576572|
|  31717| 159.2992402460553|
|   9316|154.17530698230544|
|5843419|150.70351437530363|
|  21241|148.18532871695962|
|4689264|138.65936331783217|
| 645042|128.40023485080863|
|  68253|127.30337056091777|
| 528282|121.84510758522758|
|  30680|120.19739963087184|
|  14533|118.99042460188433|
|  17867|109.61910246969148|
|   3392|107.20045199126344|
|  21148| 106.1672215975165|
|  15573| 97.65279791456467|
+-------+------------------+
only showing top 20 rows



                                                                                

In [16]:
pr_pandas = pr.toPandas().set_index('id')

                                                                                

In [18]:
pr_dict = pr_pandas.T.to_dict('list')

In [33]:
import pickle

name = "pagerank_dict"
with open(f"{name}.pkl", "wb") as f:
    pickle.dump(pr_dict, f)

client = storage.Client()
bucket = client.bucket(bucket_name)
blob_posting_locs = bucket.blob(f"page_rank/{name}.pkl")
blob_posting_locs.upload_from_filename(f"{name}.pkl")

## Page View

In [None]:
# # Paths
# # Using user page views (as opposed to spiders and automated traffic) for the 
# # month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# # Download the file (2.3GB) 
!wget -N $pv_path
# # Filter for English pages, and keep just two fields: article ID (3) and monthly 
# # total number of page views (5). Then, remove lines with article id or page 
# # view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# # Create a Counter (dictionary) that sums up the pages views for the same 
# # article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})
# # write out the counter as binary file (pickle it)
# with open(pv_clean, 'wb') as f:
#       pickle.dump(wid2pv, f)
# # read in the counter
# # with open(pv_clean, 'rb') as f:
# #   wid2pv = pickle.loads(f.read())

--2023-01-09 19:18:55--  https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2
Resolving dumps.wikimedia.org (dumps.wikimedia.org)... 208.80.154.142, 2620:0:861:2:208:80:154:142
Connecting to dumps.wikimedia.org (dumps.wikimedia.org)|208.80.154.142|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘pageviews-202108-user.bz2’ not modified on server. Omitting download.



In [None]:
# idx_to_pickle(wid2pv)

In [9]:
name = "pageview"
with open(f"{name}.pkl", "wb") as f:
    pickle.dump(wid2pv, f)

client = storage.Client()
bucket = client.bucket(bucket_name)
blob_posting_locs = bucket.blob(f"page_view/{name}.pkl")
blob_posting_locs.upload_from_filename(f"{name}.pkl")

In [None]:
print("done")