In [1]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [2]:
import pyspark
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage
from pyspark.sql import SparkSession

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

In [3]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [4]:
spark

In [5]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'bucket-ir-project-nicoleayelet'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
# We iterate over the iterable 'blobs'. Each iteration gives us a single blob object.
for b in blobs:
    # We check if the name of the blob starts with the string 'multi'.
    # The 'startswith' method is used to filter the blobs we're interested in.
    if b.name.startswith('multi'):
        # If the blob name starts with 'multi', we append the full URI of the blob to our 'paths' list.
        paths.append(full_path+b.name)

In [6]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''

  edges = pages.flatMap(lambda row: ((row[0], v[0]) for v in row[1])).distinct()

  vertices = pages.flatMap(lambda e: [(e[1][i][0],) for i in range(len(e[1]))] + [(e[0],)]).distinct()

  return edges, vertices

In [7]:
# Read the parquet files specified in the 'paths' list into a Spark DataFrame, then select only the 'id' and 'anchor_text' columns. 
# Convert the DataFrame into an RDD (Resilient Distributed Dataset) for further processing.
pages_links = spark.read.parquet(*paths).select("id", "anchor_text").rdd

# Use a custom function 'generate_graph' which takes the RDD 'pages_links' as input and returns two objects: 'edges' and 'vertices'.
# These represent the relationships (edges) and entities (vertices) in the graph.
edges, vertices = generate_graph(pages_links)

# Convert the edges and vertices RDDs to DataFrames in preparation for creating a GraphFrame.
# Repartition both DataFrames to 124 partitions for optimization based on the 'src' column for edges and 'id' for vertices.
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')

# Create a GraphFrame 'g' which is a graph built from verticesDF and edgesDF.
g = GraphFrame(verticesDF, edgesDF)

# Compute the PageRank of each vertex in the graph using the GraphFrame's pageRank method.
# 'resetProbability' corresponds to the probability of random jumps in the PageRank algorithm.
# 'maxIter' specifies the maximum number of iterations for the PageRank algorithm to run.
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)

# From the PageRank results, select the 'id' and 'pagerank' columns for each vertex.
pr = pr_results.vertices.select("id", "pagerank")

# Sort the PageRank DataFrame in descending order based on the 'pagerank' score.
pr = pr.sort(col('pagerank').desc())

# Repartition the resulting DataFrame to a single partition and write out as a gzip compressed CSV to the specified GCS bucket path.
# The use of 'repartition(1)' will result in a single CSV file output, which might be useful if the dataset is small enough.
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")

# Show the results of the PageRank computation. By default, this will display the top 20 rows of the DataFrame.
pr.show()


24/03/07 11:43:35 WARN YarnAllocator: Container from a bad node: container_1709795963840_0003_01_000006 on host: cluster-0fc4-w-1.c.ir-project-nicoleayelet.internal. Exit status: 137. Diagnostics: [2024-03-07 11:43:35.120]Container killed on request. Exit code is 137
[2024-03-07 11:43:35.121]Container exited with a non-zero exit code 137. 
[2024-03-07 11:43:35.121]Killed by external signal
.
24/03/07 11:43:35 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 6 for reason Container from a bad node: container_1709795963840_0003_01_000006 on host: cluster-0fc4-w-1.c.ir-project-nicoleayelet.internal. Exit status: 137. Diagnostics: [2024-03-07 11:43:35.120]Container killed on request. Exit code is 137
[2024-03-07 11:43:35.121]Container exited with a non-zero exit code 137. 
[2024-03-07 11:43:35.121]Killed by external signal
.
24/03/07 11:43:35 ERROR YarnScheduler: Lost executor 6 on cluster-0fc4-w-1.c.ir-project-nicoleayelet.internal: Container from a bad

+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750| 9917.889528786589|
|  10568| 5387.609470096127|
|  32927| 5284.298441243099|
|  30680|5130.3860058361915|
|5843419| 4959.648354746547|
|  68253| 4771.279909629094|
|  31717| 4488.233081199122|
|  11867|4148.1548821766155|
|  14533|3998.1437395714443|
| 645042| 3533.109297540148|
|  17867| 3247.460763251462|
|5042916|2993.2014451081504|
|4689264|2983.5765511544078|
|  14532|  2935.97852902425|
|  25391| 2904.764828582978|
|   5405| 2892.629843363032|
|4764461| 2835.556558193232|
|  15573|2785.0334936159047|
|   9316|2783.2072557286942|
|8569916| 2776.450966760946|
+-------+------------------+
only showing top 20 rows



                                                                                

In [None]:
# Convert the Spark DataFrame 'pr' to a Pandas DataFrame and set the 'id' column as the index.
pr_pandas = pr.toPandas().set_index('id')

# Transpose the Pandas DataFrame and convert it into a dictionary where each key is an 'id' and the value is a list of PageRank scores.
pr_dict = pr_pandas.T.to_dict('list')


In [22]:
name = "pagerank_dict"
with open(f"{name}.pkl", "wb") as f:
    pickle.dump(pr_dict, f)

client = storage.Client()
bucket = client.bucket(bucket_name)
blob_posting_locs = bucket.blob(f"page_rank/{name}.pkl")
blob_posting_locs.upload_from_filename(f"{name}.pkl")

## Documant id and title dictionary

In [6]:
parquetFile = spark.read.parquet(*paths)

                                                                                

In [7]:
parquetFile.count()

                                                                                

6348910

In [8]:
# Function to create a dictionary from a parquet file's 'id' and 'title' columns.
def construct_dict_doc_id_title():
    return dict(parquetFile.select("id", "title").rdd.collect())

# Execute the function to get the dictionary.
dict_doc_id_title = construct_dict_doc_id_title()

                                                                                

In [10]:
name = "dict_doc_id_and_title"
with open(f"{name}.pkl", "wb") as f:
    pickle.dump(dict_doc_id_title, f)

client = storage.Client()
bucket = client.bucket(bucket_name)
blob_posting_locs = bucket.blob(f"title_id_dict/{name}.pkl")
blob_posting_locs.upload_from_filename(f"{name}.pkl")

## Documant PageView dictionary

In [6]:
# # Paths
# # Using user page views (as opposed to spiders and automated traffic) for the 
# # month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# # Download the file (2.3GB) 
!wget -N $pv_path
# # Filter for English pages, and keep just two fields: article ID (3) and monthly 
# # total number of page views (5). Then, remove lines with article id or page 
# # view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# # Create a Counter (dictionary) that sums up the pages views for the same 
# # article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})

--2024-03-07 14:23:12--  https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2
Resolving dumps.wikimedia.org (dumps.wikimedia.org)... 208.80.154.71, 2620:0:861:3:208:80:154:71
Connecting to dumps.wikimedia.org (dumps.wikimedia.org)|208.80.154.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2503235912 (2.3G) [application/octet-stream]
Saving to: ‘pageviews-202108-user.bz2’


2024-03-07 14:33:29 (3.87 MB/s) - ‘pageviews-202108-user.bz2’ saved [2503235912/2503235912]



In [7]:
name = "PageView"
with open(f"{name}.pkl", "wb") as f:
    pickle.dump(wid2pv, f)

client = storage.Client()
bucket = client.bucket(bucket_name)
blob_posting_locs = bucket.blob(f"PageView/{name}.pkl")
blob_posting_locs.upload_from_filename(f"{name}.pkl")