# **Imports & Setup**

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
# Access to the bucket
bucket_name = '209502079'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)

# Define prefixes to filter out specific blob names
prefixes_to_ignore = ['graphframes.sh', 'postings_gcp', 'pagerank', 'title', 'body', 'id_to_title_dict', 'title_lengths', 'document_lengths', 'queries_train']

for b in blobs:
    # Check if the blob name does not start with any of the prefixes to ignore
    if not any(b.name.startswith(prefix) for prefix in prefixes_to_ignore):
        paths.append(full_path + b.name)

In [None]:
parquetFile = spark.read.parquet(*paths)

                                                                                

In [None]:
# Count number of wiki pages
parquetFile.count()

                                                                                

6348910

In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [None]:
from inverted_index_gcp import InvertedIndex

### **index functions**:

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  # Count the frequency of each token in the text
  token_counts = Counter(tokens)
  # Filter out stopwords
  token_counts = {token: count for token, count in token_counts.items() if token not in all_stopwords}
  # Convert counts to (token, (doc_id, tf)) format
  results = [(token, (id, tf)) for token, tf in token_counts.items()]
  return results

def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # Sort the posting list by wiki_id
  sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
  return sorted_pl

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # Count the number of documents in which each token appears
  df_rdd = postings.map(lambda x: (x[0], len(x[1])))
  return df_rdd

def partition_postings_and_write(postings, base_dir, bucket_name='209502079'):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket. Partitioning should be done through the use of `token2bucket`
  above. Writing to disk should use the function  `write_a_posting_list`, a
  static method implemented in inverted_index_colab.py under the InvertedIndex
  class.
  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and
      offsets its posting list was written to. See `write_a_posting_list` for
      more details.
  '''
  def write_bucket(bucket_id, token_postings):
      """Write all token postings for a bucket to disk and return their locations."""
      posting_locs = InvertedIndex.write_a_posting_list((bucket_id, token_postings), base_dir, bucket_name)
      return posting_locs

  # Partition postings into buckets based on token2bucket_id function
  bucketed_postings = postings.map(lambda x: (token2bucket_id(x[0]), (x[0], x[1])))

  # Write each bucket to disk and return posting locations for each bucket
  bucket_posting_locs = bucketed_postings.groupByKey().map(lambda x: write_bucket(x[0], list(x[1])))

  return bucket_posting_locs

# **index for title**:

In [None]:
# Create a folder named "title_index" if it doesn't exist
base_dir_title = 'title_index'
if not os.path.exists(base_dir_title):
    os.makedirs(base_dir_title)

In [None]:
wiki_pages_for_work = parquetFile
title_text_pairs = wiki_pages_for_work.select("title", "id").rdd

# Tokenize titles, count occurrences, and filter stopwords
title_word_counts = title_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))

# Partitioning Postings and Writing to Disk
title_postings = title_word_counts.groupByKey().mapValues(reduce_word_counts)
title_posting_locs_list = partition_postings_and_write(title_postings, base_dir='title_index/posting').collect()

# Merging Posting Locations
title_posting_locs = defaultdict(list)
for title_posting_loc in title_posting_locs_list:
    for k, v in title_posting_loc.items():
        title_posting_locs[k].extend(v)

# Create a new InvertedIndex instance for titles
title_inverted = InvertedIndex()
title_inverted.posting_locs = title_posting_locs
title_inverted.df = calculate_df(title_postings).collectAsMap()

# Writing Global index for title to Disk
title_inverted.write_index(base_dir_title, 'index_for_title')

In [None]:
index_src = "title_index/index_for_title.pkl"
index_dst = f'gs://{bucket_name}/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst

# **index for body**:

In [None]:
# Create a folder named "body_index" if it doesn't exist
base_dir_body = 'body_index'
if not os.path.exists(base_dir_body):
    os.makedirs(base_dir_body)

In [None]:
wiki_pages_for_work = parquetFile
doc_text_pairs = wiki_pages_for_work.select("text", "id").rdd

# Tokenization and Word Counting
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))

# Partitioning Postings and Writing to Disk
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
postings_filtered = postings.filter(lambda x: len(x[1])>50)
posting_locs_list = partition_postings_and_write(postings_filtered, base_dir='body_index/posting').collect()

# Merging Posting Locations
super_posting_locs = defaultdict(list)
for posting_loc in posting_locs_list:
    for k, v in posting_loc.items():
        super_posting_locs[k].extend(v)

# Creating an Inverted Index
inverted = InvertedIndex()
inverted.posting_locs = super_posting_locs
inverted.df = calculate_df(postings_filtered).collectAsMap()

# Writing Global index for body to Disk
inverted.write_index(base_dir_body, 'index_for_body')

In [None]:
index_src = "body_index/index_for_body.pkl"
index_dst = f'gs://{bucket_name}/{index_src}'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst

# **PageRank**:

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  # Extract edges from anchor texts
  edges = pages.flatMap(lambda row: [(row.id, anchor.id) for anchor in row.anchor_text])

  # Remove duplicate edges
  edges = edges.distinct()

  # Extract vertices
  vertices_src = pages.map(lambda row: Row(id=row.id))
  vertices_dst = edges.map(lambda pair: Row(id=pair[1]))
  vertices = vertices_src.union(vertices_dst)

  # Remove duplicate vertices
  vertices = vertices.distinct()

  return edges, vertices

In [None]:
pages_links = parquetFile.select ("id","anchor_text").rdd
# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr.show()

In [None]:
# Convert PageRank DataFrame to dictionary
pr_dict = pr.rdd.map(lambda row: (row['id'], row['pagerank'])).collectAsMap()

# Save dictionary as a pickle file
pickle_path = 'pagerank_dict.pkl'
with open(pickle_path, 'wb') as f:
    pickle.dump(pr_dict, f)

# Upload pickle file to GCP bucket
!gsutil cp {pickle_path} gs://{bucket_name}/

# **dictonaries**

In [None]:
wiki_pages_for_work = parquetFile

In [None]:
doc_text_pairs = wiki_pages_for_work.select("text", "id").rdd

In [None]:
title_text_pairs = wiki_pages_for_work.select("title", "id").rdd

In [None]:
def calculate_document_lengths(rdd):
    """Calculate and return document lengths for each document in the RDD."""
    document_lengths = rdd.map(lambda row: (row["id"], len(row["text"].split()))).collectAsMap()
    return document_lengths

In [None]:
# Call the function to calculate document lengths
document_lengths = calculate_document_lengths(doc_text_pairs)

In [None]:
# Save dictionary as a pickle file
pickle_path = 'document_lengths.pkl'
with open(pickle_path, 'wb') as f:
    pickle.dump(document_lengths, f)

# Upload pickle file to GCP bucket
!gsutil cp {pickle_path} gs://{bucket_name}/

In [None]:
def calculate_title_lengths(rdd):
    """Calculate and return title lengths for each document in the RDD."""
    title_lengths = rdd.map(lambda row: (row["id"], len(row["title"].split()))).collectAsMap()
    return title_lengths

In [None]:
# Call the function to calculate title lengths
title_lengths = calculate_title_lengths(title_text_pairs)

In [None]:
# Save dictionary as a pickle file
pickle_path = 'title_lengths.pkl'
with open(pickle_path, 'wb') as f:
    pickle.dump(title_lengths, f)

# Upload pickle file to GCP bucket
!gsutil cp {pickle_path} gs://{bucket_name}/

In [None]:
def calculate_id_to_title_dict(rdd):
    """Calculate and return a dictionary mapping document IDs to titles."""
    id_to_title_dict = rdd.map(lambda row: (row.id, row.title)).collectAsMap()
    return id_to_title_dict

In [None]:
# Call the function to calculate the dictionary
id_to_title_dict = calculate_id_to_title_dict(title_text_pairs)

In [None]:
# Save dictionary as a pickle file
pickle_path = 'id_to_title_dict.pkl'
with open(pickle_path, 'wb') as f:
    pickle.dump(id_to_title_dict, f)

# Upload pickle file to GCP bucket
!gsutil cp {pickle_path} gs://{bucket_name}/