## Welcome to our preproccessing - here we will calculate and write our indices to the disk

### Imports & Setup

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes
!pip install -U regex

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
#Change the hash function so title/body/anchor terms will get the same value for the same term
def _hash(s):
    if s.endswith('_t') or s.endswith('_a'): s = s[:-2]
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark
# conf = (SparkConf()
#     .set("spark.driver.maxResultSize", "4g"))

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = '207024878' 


Here, we read the entire corpus to an rdd, directly from Google Storage Bucket

In [None]:
full_path = "gs://wikidata_preprocessed/*"
parquetFile = spark.read.parquet(full_path)


                                                                                

Let's import the our inverted index class.

In [None]:
#look for the file
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

22/01/10 19:49:29 WARN org.apache.spark.SparkContext: The path /home/dataproc/inverted_index_gcp.py has been added already. Overwriting of added paths is not supported in the current version.


In [None]:
from inverted_index_gcp import *

# Building inverted indices:

## First - index for the titles: (for binary index)

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [None]:
doc_title_pairs = parquetFile.select("title", "id").rdd

In [None]:
#save pickle - dictionary of doc_id, title for the retrival 
with open('doc_title_dict.pkl', 'wb') as handle:
    pickle.dump(doc_title_pairs.collectAsMap(), handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Count number of wiki pages
parquetFile.count()

### Tokenize 

In [None]:
# find regex pattern s

def get_html_pattern():
    pattern = "<(\"[^\"]\"|'[^']|[^'\">])*>"
    return pattern

def get_time_pattern():
    pattern = "((?:[01][0-2]|2[0-4])(?:\.)?(?:[0-4][0-9])|(?:[0-1]?[0-9]|2[0-5]):(?:([0-5][0-9])):(?:([0-9][0-9]))?$)((AM|am|a\.m\.|PM|p\.m\.)?$|([AP][M]|[ap]\.[m]\.))"
    return pattern

def get_number_pattern():
    pattern ="(?<![\w\+\-,\.])[\+\-]?\d{1,3}((,\d{3})|\d)(\.\d+)?(?!\S?[\w\+\-])"
    return pattern
 
def get_percent_pattern():
    pattern ="(?<![\w\+\-,\.])[\+\-]?\d{1,3}((,\d{3})|\d)(\.\d+)?%(?!\S?[\w\+\-])"
    return pattern

def get_date_pattern():
    pattern = "((([12][0-9]|(30)|[1-9])\ )?(Apr(il?)?|Jun(e?)?|Sep(tember?)?|Nov(ember?)?)(\ ([12][0-9],|(30,)|[1-9],))?((\ \d\d\d\d)))|((Jan(uary?)?|Mar(ch?)?|May?|Jul(y?)?|Aug(ust?)?|Oct(ober?)?|Dec(ember?)?)(\ ([12][0-9],|3[10],|[1-9],))?((\ \d\d\d\d)))|((([1][0-9]|2[0-8]|[0-9])\ )?(Feb(ruary?)?)(\ ([1][0-9],|2[0-8],|[0-9],))?((\ \d\d\d\d)))"
    return pattern

def get_word_pattern():
    pattern = "(\w+(?:-\w+)+)|(?<!-)(\w+'?\w*)"
    return pattern


RE_TOKENIZE = re.compile(rf"""
(
    # parsing html tags
     (?P<HTMLTAG>{get_html_pattern()})                                  
    # dates
    |(?P<DATE>{get_date_pattern()})
    # time
    |(?P<TIME>{get_time_pattern()})
    # Percents
    |(?P<PERCENT>{get_percent_pattern()})
    # Numbers
    |(?P<NUMBER>{get_number_pattern()})
    # Words
    |(?P<WORD>{get_word_pattern()})
    # space
    |(?P<SPACE>[\s\t\n]+) 
    # everything else
    |(?P<OTHER>.)
)
""", 
re.MULTILINE | re.IGNORECASE | re.VERBOSE | re.UNICODE)

def filter_text(text):
    filtered = [v for match in RE_TOKENIZE.finditer(text)
                 for k, v in match.groupdict().items() 
                  if v is not None and k not in ['HTMLTAG', 'DATE', 'TIME', 'PERCENT','NUMBER', 'SPACE', 'OTHER']]
    return filtered

english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

all_stopwords = english_stopwords.union(corpus_stopwords)

def tokenize(text):
    """
    This function aims in tokenize a text into a list of tokens. Moreover, it filter stopwords.
    
    Parameters:
    -----------
    text: string , represting the text to tokenize.    
    
    Returns:
    -----------
    list of tokens (e.g., list of tokens).
    """

    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    ls_lower = filter_text(' '.join(tokens))
    list_of_tokens = [token for token in ls_lower if token not in all_stopwords]    
    return list_of_tokens

### Word counts - Term frequency

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]


RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

all_stopwords = english_stopwords.union(corpus_stopwords)

def word_count(text, id):
    ''' Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
  '''
    tokens = tokenize(text)
    return [(term, (id, tf)) for (term,tf) in Counter(tokens).items()]


In [None]:
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))

### Posting list - Reduce and sort word count to get the posting list :

In [None]:
def sort_word_counts(unsorted_pl):
    ''' Returns a sorted posting list by wiki_id.
    Parameters:
    -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples 
    Returns:
    --------
    list of tuples
      A sorted posting list.
    '''
    return sorted(unsorted_pl)

In [None]:
postings_title = word_counts_title.groupByKey().mapValues(sort_word_counts)

### add _t at the end of title tokens

In [None]:
postings_filtered_title = postings_title
postings_filtered_title = postings_filtered_title.map(lambda x:(x[0]+'_t', x[1]))

### Document frequency

In [None]:
def calculate_df(postings):
    ''' Takes a posting list RDD and calculate the df for each token.
    Parameters:
    -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
    Returns:
    --------
    RDD
      An RDD where each element is a (token, df) pair.
    '''
    return postings.mapValues(len)

In [None]:
# get df
w2df_title = calculate_df(postings_filtered_title)
w2df_title_dict = w2df_title.collectAsMap()

### partition_postings_and_write

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

def partition_postings_and_write(postings, ftype=""):
    ''' A function that partitions the posting lists into buckets, writes out 
    all posting lists in a bucket to disk, and returns the posting locations for 
    each bucket. Partitioning should be done through the use of `token2bucket` 
    above. Writing to disk should use the function  `write_a_posting_list`, a 
    static method implemented in inverted_index_colab.py under the InvertedIndex 
    class. 
    Parameters:
    -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    Returns:
    --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and 
      offsets its posting list was written to. See `write_a_posting_list` for 
      more details.
    '''
    res = postings.map(lambda w_p: (token2bucket_id(w_p[0]),w_p))
    res = res.groupByKey()
    return res.map(lambda b_w_l : InvertedIndex.write_a_posting_list(b_w_l, bucket_name, ftype=ftype))

In [None]:
_ = partition_postings_and_write(postings_filtered_title, ftype='Title_index').collect()

### Merge to one dict

In [None]:
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if blob.name.endswith("pickle"):
        print(blob.name)

In [None]:
super_posting_locs_title = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_title[k].extend(v)

### Write Index pkl to disc:

In [None]:
title_docs_tokenized = doc_title_pairs.map(lambda x:(x[1], tokenize(x[0])))

In [None]:
# Create inverted index instance
inverted_title = InvertedIndex(docs = title_docs_tokenized.collectAsMap())
# Adding the posting locations dictionary to the inverted index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted index
inverted_title.df = w2df_title_dict
# write the global stats out
inverted_title.write_index('.', 'Title_index')

# upload to gs
index_src = "Title_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

### reading_index - just to check that everything is good

In [None]:
readed_index = InvertedIndex().read_index('.', 'Title_index')

In [None]:
TUPLE_SIZE = 6
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer
from contextlib import closing

def read_posting_list(inverted, w):
    with closing(MultiFileReader()) as reader:
        locs = inverted.posting_locs[w]
        b = reader.read(locs, inverted.df[w] * TUPLE_SIZE)
        posting_list = []
        for i in range(inverted.df[w]):
            doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
            tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
            posting_list.append((doc_id, tf))
        return posting_list

In [None]:
read_posting_list(readed_index, 'leppänen_t')

## Now - index for the anchors: (for binary index)

In [None]:
doc_list_of_acnchors = parquetFile.select("id", "anchor_text").rdd 

                                                                                

### join anchor link to text

In [None]:
page_pointer_text = doc_list_of_acnchors.flatMap(lambda x:x[1]).reduceByKey(lambda x,y:x + '. ' + y)
#cast doc_anchor_pairs to be the same format as - title/body
from pyspark.sql import Row
from pyspark.sql import SQLContext
doc_anchor_pairs = page_pointer_text.map(lambda x: Row(anchor_text=x[1], id=x[0]))

### Word counts - Term frequency

In [None]:
word_counts_anchors = doc_anchor_pairs.flatMap(lambda x: word_count(x[0], x[1]))

### Posting list - Reduce and sort word count to get the posting list :

In [None]:
postings_anchors = word_counts_anchors.groupByKey().mapValues(sort_word_counts)

In [None]:
postings_filtered_anchors = postings_anchors

### add _a at the end of anchors

In [None]:
postings_filtered_anchors = postings_filtered_anchors.map(lambda x:(x[0]+'_a', x[1]))

### Document frequency

In [None]:
w2df_anchors = calculate_df(postings_filtered_anchors)
w2df_anchors_dict = w2df_anchors.collectAsMap()

### Write Index pkl to disc:

In [None]:
anchor_docs_tokenize = doc_anchor_pairs.map(lambda x: (x[1], tokenize(x[0])))

### partition_postings_and_write

In [None]:
_ = partition_postings_and_write(postings_filtered_anchors, ftype='Anchor_index').collect()

### Merge to one dict

In [None]:
super_posting_locs_anchors = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_anchors[k].extend(v)

In [None]:
# Create inverted index instance
inverted_anchors = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anchors.posting_locs = super_posting_locs_anchors
# Add the token - df dictionary to the inverted index
inverted_anchors.df = w2df_anchors_dict
# write the global stats out
inverted_anchors.write_index('.', 'Anchor_index')

# upload to gs
index_src = "Anchor_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
# readed_index = InvertedIndex().read_index('postings_gcp/postings_gcp', 'Anchor_index') ## read 

# Now - body index: (for tfidf index - cosine similarity)

In [None]:
doc_text_pairs = parquetFile.select("text", "id").rdd

### Word counts - Term frequency

In [None]:
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))

### Posting list - Reduce and sort word count to get the posting list :

In [None]:
postings = word_counts.groupByKey().mapValues(sort_word_counts)

### Filter

In [None]:
# filter out rare words, words that appear in threshold(=50) or fewer documents
threshold = 50
postings_filtered = postings.filter(lambda x: len(x[1])>threshold)

### Document frequency

In [None]:
# # get df
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()

In [None]:
import numpy as np
corpus_len = parquetFile.count()

### take df dict and replace df scores with idf scores

In [None]:
idf_dict = {token:np.log10(corpus_len/val) for token,val in w2df_dict.items()}

### Tokeniz docs to insert to index

In [None]:
ls_tokens_filtered = postings_filtered.map(lambda x:x[0]).collect()

In [None]:
def get_filtered_tokens(token_list):
    #input - document text as list of tokens
    #output - the list of tokens when taking only the tokens who appear in postings_filtered
    return [token for token in token_list if token in ls_tokens_filtered]

In [None]:
body_docs_tokenized = doc_text_pairs.map(lambda x: (x[1], tokenize(x[0])))

In [None]:
# body_docs_tokenized = body_docs_tokenized.map(lambda x: (x[0], get_filtered_tokens(x[1])))

In [None]:
# doc = body_docs_tokenized.collectAsMap()

In [None]:
# # Create inverted index instance
inverted_tfidf = InvertedIndex()

In [None]:
DL_dict = defaultdict(int)
for i in body_docs_tokenized.collect():
    DL_dict[i[0]] = i[1]

In [None]:
inverted_tfidf.DL = DL_dict 

In [None]:
#calculate posting list with tf*idf instead of tf -
tfidf_counts = word_counts.filter(lambda x: x[0] in idf_dict)

# d = 10**6


def idf_product(x, d, idf_dict):
    #input x - (token, (doc_id, tf))
    #otput x - (token, (doc_id, tf*idf)) where tfidf is rounded by d digits
    token = x[0]
    doc_id = x[1][0]
    tf = x[1][1]/DL_dict[doc_id]
    tf_idf = tf*idf_dict[token]
    rounded_tf_idf = int(np.round(tf_idf*10**d, d)) #*10**d to get integer
    return token, (doc_id, rounded_tf_idf)

tfidf_counts = tfidf_counts.map(lambda x: idf_product(x, 6, idf_dict))

In [None]:
#group tfidf_counts to get posting list of (token - list(doc_id, tf*idf, doc_id, tf*idf))
tfidf_postings = tfidf_counts.groupByKey().mapValues(sort_word_counts)

In [None]:
#write posting lists to bin files, add idf to the name
_ = partition_postings_and_write(tfidf_postings, ftype='Body_index').collect()

In [None]:
super_posting_locs_tfidf = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs_tfidf[k].extend(v)

### Calculate norm for each document:

In [None]:
# if we want to read the index from disk
# read_inverted_tfidf = InvertedIndex().read_index('postings_gcp/postings_gcp', 'Body_index') 
# read_posting_list(inverted_tfidf, 'movement')

In [None]:
corpus_len = parquetFile.count()

In [None]:
idf_dict = {token:np.log10(corpus_len/val) for token,val in read_inverted_tfidf.df.items()}

In [None]:
def product_idf(x):
    token = x[0]
    doc_id, tf = x[1]
    tfidf = tf*idf_dict[token] if token in idf_dict else 0 #4.493417 = average value 
    return (doc_id, tfidf)


In [None]:
word_doc_tfidf = word_counts.map(product_idf)

In [None]:
doc_tfidf_ls = word_doc_tfidf.groupByKey().mapValues(list)

In [None]:
doc_norm_dict = doc_tfidf_ls.map(lambda x:(x[0], LA.norm(x[1]))).collectAsMap()

Putting it all together

In [None]:
# Adding the posting locations dictionary to the inverted index
inverted_tfidf.posting_locs = super_posting_locs_tfidf
# Add the token - df dictionary to the inverted index
inverted_tfidf.df = w2df_dict
#add dictionary of norm for each document
inverted_tfidf.doc_norm_dict = doc_norm_dict
# write the global stats out
inverted_tfidf.write_index('.', 'Body_index')

# upload to gs
index_src = "Body_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

# The final index - bm25 index: (for all sections - body, title, anchor)

###first - get the exist indices until now, in order to make this calculations after the indices before already been readed.

In [None]:
inverted_tfidf = InvertedIndex().read_index('postings_gcp/postings_gcp/', 'Body_index')
inverted_anchors = InvertedIndex().read_index('postings_gcp/postings_gcp/', 'Anchor_index')
inverted_title = InvertedIndex().read_index('postings_gcp/postings_gcp/', 'Title_index')

###calculate idf component (diifrent from cosine idf) 

In [2]:
corpus_len = parquetFile.count()

def idf_bm25(x, w2df):
    #input x - (token, posting_list(doc_id, tf))
    #otput x - (token, (doc_id, tf*idf)) where tfidf is rounded by d digits
    token = x[0]
    n_ti = w2df[token]
    N = corpus_len
    idf = np.log(((N-n_ti+0.5)/(n_ti+0.5))+1)
    # rounded_idf = int(np.round(idf*10**d, d)) #*10**d to get integer
    return (token, idf)#


idf_bm25_body = postings_filtered.map(lambda x: (idf_bm25(x, w2df_dict)))
idf_bm25_anchors = postings_filtered_anchors.map(lambda x: idf_bm25(x, w2df_anchors_dict))
idf_bm25_title = postings_filtered_title.map(lambda x: idf_bm25(x, w2df_title_dict))

In [None]:
#get the entire dictionary for each section
bm25_idf_body_dict = idf_bm25_body.collectAsMap()
bm25_idf_anchors_dict = idf_bm25_anchors.collectAsMap()
bm25_idf_title_dict = idf_bm25_title.collectAsMap()

In [None]:
#calculate DL for anchors:
dl_rdd_anc = doc_anchor_pairs.map(lambda x:(x[1],len(x[0].split())))
dl_anc = defaultdict(int)
for i in dl_rdd_anc.collect():
    dl_anc[i[0]] = i[1]

In [None]:
DL_body = inverted_tfidf.DL
DL_anchors = dl_anc
DL_title = inverted_title.DL

In [None]:
#calculate avgdl: 
def calc_avgdl(DL):
    sum_dl = 0
    for val in DL.values():
        sum_dl+=val
    avgdl = sum_dl/len(DL)
    return avgdl

avgdl_body = calc_avgdl(DL_body)
avgdl_anchors = calc_avgdl(DL_anchors)
avgdl_title = calc_avgdl(DL_title)

###BM 25 calculation functions:

In [None]:

def calc_bm25(x, idf_dict, DL, avgdl):
    #   Best Match 25.    
    # ----------
    # k1 : float, default 1.5
    # b : float, default 0.75
    b = 0.75
    k1 = 1.5
    token = x[0]
    posting_ls = x[1] 
    postring_bm25_ls = []
    for doc_id, tf in posting_ls:
        mone = tf*(k1+1)
    
        mehane = tf+k1*(1-b+b*(DL[doc_id]/avgdl))
        idf_token = idf_dict[token]
        bm25_score = (idf_token*mone)/mehane
        bm25_score_rounded = int(np.round(bm25_score*10**2, 0))
        postring_bm25_ls.append((doc_id, bm25_score_rounded))
    return token, postring_bm25_ls

In [None]:
postings_bm25_body = postings_filtered.map(lambda x: calc_bm25(x, bm25_idf_body_dict, DL_body, avgdl_body))
postings_bm25_anchors = postings_filtered_anchors.map(lambda x: calc_bm25(x, bm25_idf_anchors_dict, DL_anchors, avgdl_anchors))
postings_bm25_title = postings_filtered_title.map(lambda x: calc_bm25(x, bm25_idf_title_dict, DL_title, avgdl_title))

###Union all to get one rdd

In [None]:
union_posting = postings_bm25_body.union(postings_bm25_anchors).union(postings_bm25_title)

###write to disk

In [None]:
_ = partition_postings_and_write(union_posting, ftype="BM25").collect()

In [None]:
posting_locs_bm25_union = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            posting_locs_bm25_union[k].extend(v)

In [None]:
w2df_dict_super_duper['leppänen_t']

In [None]:
w2df_dict_super_duper = w2df_dict
w2df_dict_super_duper.update(w2df_title_dict)
w2df_dict_super_duper.update(w2df_anchors_dict)

In [None]:
# Create inverted index instance
inverted_bm25 = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_bm25.posting_locs = posting_locs_bm25_union
# # Add the token - df dictionary to the inverted index
inverted_bm25.df = w2df_dict_super_duper
# write the global stats out
inverted_bm25.write_index('.', 'BM25_index')


# upload to gs
index_src = "BM25_index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

#Thats it, All the indices are in the disk

### Mount files to drive:

In [None]:
!gsutil ls -lh $index_dst

In [None]:
!gsutil ls gs://'207024878'/postings_gcp

In [None]:
!mkdir postings_gcp
!gsutil -m cp -r gs://'207024878'/postings_gcp/ "postings_gcp/"

# PageRank:

Compute PageRank for the entire English Wikipedia

In [None]:
t_start = time()
pages_links = spark.read.parquet("gs://wikidata_preprocessed/*").select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr_time = time() - t_start
pr.show()

#PageViews:

In [None]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
  pickle.dump(wid2pv, f)
# # read in the counter
# with open(pv_clean, 'rb') as f:
#   wid2pv = pickle.loads(f.read())