# Imports & Setup

In [None]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security â†’ Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
import numpy as np
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import hashlib
#from google.colab import auth



nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [5]:
spark

In [6]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'ln3250'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name[0] == 'm':
        paths.append(full_path+b.name)

In [7]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
from inverted_index_gcp import InvertedIndex

In [8]:
wiki_corpus = spark.read.parquet(*paths)

                                                                                

# Inverted indexes

In [9]:

english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

def _hash(s):
    """
    :param s: term
    :return: hashed bucket_id
    """
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

def tokenize(text):
    """
    tokenizes the text and removes stopwords
    :param text: text
    :return: text in a tokenized format, stopwords removed
    """
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    tokens_filtered = [term for term in tokens if term not in all_stopwords]
    return tokens_filtered


def word_count(text, doc_id):
    """
    counts the doc freq for each term
    :param text: text
    :param doc_id: doc_id
    :return: list in the form of [(term ,(doc_id, cnt))...] for each term in the doc
    """
    tokens = tokenize(text)
    tok_val_cnt = dict(Counter(tokens))
    return [(term ,(doc_id, cnt)) for term, cnt in tok_val_cnt.items()]


def doc_len(text, doc_id):
    """
    counts the number of words in a document
    :param text: text
    :param doc_id: doc_id
    :return: tuple of (doc_id, doc_len)
    """
    tokens = tokenize(text)
    return doc_id, len(tokens)


def reduce_word_counts(unsorted_pl):
    """
    sorts the pl by the frequency
    :param unsorted_pl: unsorted_pl
    :return: sorted pl
    """
    return sorted(unsorted_pl, key = lambda x: x[1], reverse = True)

def token2bucket_id(token, bucket_num):
    """
    convert token to bucket id via hash
    :param token: token
    :param bucket_num: num of buckets to create
    :return: hashed token (number)
    """
    return int(_hash(token),16) % bucket_num

def partition_postings_and_write(postings, bucket_name, bucket_num, ii = InvertedIndex()):
    """
    writing the posting list to the bucket
    :param postings:
    :param bucket_name:
    :param bucket_num:
    :param ii:
    :return:
    """
    bucket_docs = postings.map(lambda x: (token2bucket_id(x[0], bucket_num), x)).groupByKey()
    return bucket_docs.map(lambda x: ii.write_a_posting_list(x, bucket_name)) # GCP line

def calculate_df(posting_lists):
    """
    calculate the number of documents that contain each term
    :param posting_lists: posting_lists
    :return: tuple of (term, df)
    """
    return posting_lists.mapValues(len)


def doc_norm(text, doc_id ,df , N):
    """
    calculate the doc norm in order to calculate the cosine similarity in the future
    :param text: text
    :param doc_id: doc id
    :param df: list in the form of (term, doc freq')...
    :param N: corpus size
    :return: the doc norm
    """
    tokens = tokenize(text)
    dl = len(tokens)
    term_cnt = Counter(tokens)
    norm = 0
    for term, cnt in term_cnt.items():
        if term in df:
            norm += (cnt / dl * np.log(N/df[term]))**2
    return doc_id, np.sqrt(norm)


def index_writing(ii, bucket_name):
# write the inverted index instance as a pkl to the bucket
  ii.write_index('.', f'index_{bucket_name}')
  index_src = f"index_{bucket_name}.pkl"
  index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
  !gsutil cp $index_src $index_dst

def postings_writing(bucket_name):
    # getting the final paths to the postings list which will be linked to the inverted index instance.
  super_posting_locs = defaultdict(list)
  for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
    if not blob.name.endswith("pickle"):
      continue
    with blob.open("rb") as f:
      posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
       super_posting_locs[k].extend(v)
  return super_posting_locs


In [10]:
def create_index(wiki_corpus, bucket_name, index_on, bucket_num,  filter_size):
    """
    create the inverted index and the postings
    :param wiki_corpus: wiki corpus
    :param bucket_name: bucket to write stuff into
    :param index_on: one of "title", "text", "anchor_text"
    :param bucket_num: number of buckets
    :param filter_size: determines the minimum frequency for terms
    """
    if index_on == "anchor":
        doc_item_pairs = wiki_corpus.select(index_on+"_text").rdd\
        .flatMap(lambda x: x['anchor_text']).reduceByKey(lambda x, y: x + " " + y)\
        .map(lambda x: (x[1], x[0]))
    else:
        doc_item_pairs = wiki_corpus.select(index_on, "id").rdd
    
    corpus_size = wiki_corpus.count()
    word_counts = doc_item_pairs.flatMap(lambda x: word_count(x[0], x[1]))
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    postings_filtered = postings.filter(lambda x: len(x[1]) > filter_size)
    posting_locs_list = partition_postings_and_write(postings_filtered, bucket_name, bucket_num).collect()
    super_posting_locs = postings_writing(bucket_name)

    d2dl = doc_item_pairs.map(lambda x: doc_len(x[0], x[1]))
    w2df = calculate_df(postings_filtered)

    d2dl_dict = dict(d2dl.collect())
    w2df_dict = w2df.collectAsMap()

    d2tfidf_norm = doc_item_pairs.map(lambda x: doc_norm(x[0], x[1], w2df_dict, corpus_size))
    d2tfidf_norm_dict = d2tfidf_norm.collectAsMap()

    inverted = InvertedIndex()
    inverted.posting_locs = super_posting_locs

    inverted.df = w2df_dict
    inverted.dl = d2dl_dict
    inverted.d_norms = d2tfidf_norm_dict

    index_writing(inverted, bucket_name)

In [None]:
#Create inverted indexes
create_index(wiki_corpus, 'title_idx', "title", 124, filter_size = 0)
print("title index created")
create_index(wiki_corpus, 'anchor_idx', "anchor", 248, filter_size = 20)
print("anchor index created")
create_index(wiki_corpus, 'body_idx_dan_2', "text", 248, filter_size = 50)
print("body index created")

[Stage 5:====>                                                  (10 + 16) / 124]

In [None]:
print("finished")

# Page rank

In [None]:
def generate_graph(pages):
  edges = pages.flatMap(lambda x: map(lambda y: (x[0], y[0]), x[1])).distinct()
  vertices = edges.flatMap(lambda x: x).distinct().map(lambda x:(x,))
  return edges, vertices

In [None]:
pages_links = corpus.select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())

In [None]:
# Store the page rank in bucket
pr.repartition(1).write.csv('gs://title_inverted_index/pr', compression="gzip")

# Page views

In [None]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
bucket_name = 'ln3250'
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the 
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path) 
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB) 
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly 
# total number of page views (5). Then, remove lines with article id or page 
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same 
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
  for line in f:
    parts = line.split(' ')
    wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
  pickle.dump(wid2pv, f)


# read in the counter
# with open(pv_clean, 'rb') as f:
#   wid2pv = pickle.loads(f.read())
# Create a Storage client
auth.authenticate_user()
storage_client = storage.Client()
# Get a reference to the bucket
bucket = storage_client.bucket(bucket_name)
# Create a new blob in the bucket
blob = bucket.blob('page_views.pkl')
# Serialize the object and write it to the blob
blob.upload_from_filename(pv_clean)


# Create a Storage client
storage_client = storage.Client()
# Get a reference to the bucket
bucket = storage_client.bucket(bucket_name)
# Get a reference to the blob
blob = bucket.blob('page_views.pkl')
# Download the contents of the blob
data_bytes = blob.download_as_string()
# Deserialize the data
data = pickle.loads(data_bytes)
print(data[57974])

# ID to title mapping

In [None]:
id2title = wiki_corpus.select("id", "title").toPandas()
id2title.index = id2title.id
id2title = id2title['title']
id2title

In [None]:
# open a file, where you ant to store the data
file = open('id2title.pkl', 'wb')

# dump information to that file
pickle.dump(id2title, file)

# close the file
file.close()