<h1 align="center"><b>Our Backend</b></h1>

<h2 align="center"><b>Setup:</b></h2>

In [None]:
!gcloud dataproc clusters list --region us-central1
#The cluster that we used

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE  SCHEDULED_STOP
cluster-0016  GCE       2                                             RUNNING  us-central1-a


In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes
#The two lines are used to install external Python libraries (libraries/packages) into our workspace

[0m

In [None]:
#  Basic imports

import os
import sys
import pickle
from collections import *
import nltk
import pyspark
import itertools
from itertools import islice, count, groupby
from google.cloud import storage
from operator import itemgetter

from pyspark.sql.functions import col
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext

from graphframes import *



In [None]:
# Checking the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Jan 10 20:25 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [None]:
# Data Configuration & Loading

bucket_name = '319134458_214906935'
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

~ GCP setup is complete! ~

<h2 align="center"><b>Creating the Inverted Index:</b></h2>

In [None]:
# configuration:

import hashlib
import re
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords

nltk.download('stopwords')

# Constants and Helpers
NUM_BUCKETS = 124


def _hash(s):
  """
    Generates a 5-byte BLAKE2b hex digest for a given string.

    Args:
        s (str): The input string to hash.

    Returns:
        str: A 10-character hexadecimal string.
    """
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()


def token2bucket_id(token):
  """
    Maps a token to a specific bucket index using its hash value.

    Args:
        token (str): The token (word) to map.

    Returns:
        int: The bucket ID, ranging from 0 to NUM_BUCKETS - 1.
    """
    return int(_hash(token), 16) % NUM_BUCKETS

# Define Stopwords
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]


all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# read the entire corpus to an rdd, directly from Google Storage Bucket and use our code from Colab to construct an inverted index.
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

In [None]:
# Check the number of wiki pages - should be more than 6M
parquetFile.count()

                                                                                

6348910

In [None]:
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [None]:
# In Spark, our code runs on many machines simultaneously, so we need to physically send the files to everyone.
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0, SparkFiles.getRootDirectory())


In [None]:
from inverted_index_gcp import *

In [None]:
# preprocessing functions:

from collections import Counter

def partition_postings_and_write(postings, bucket_name):
    ''' Partitions posting lists into buckets and writes to disk. '''
    # Bucket assignment
    buckets = postings.map(lambda x: (token2bucket_id(x[0]), x))

    # Group By Key (term)
    buckets = buckets.groupByKey()

    # write to the disk
    buckets = buckets.map(lambda x: InvertedIndex.write_a_posting_list(x, 'postings_gcp', bucket_name))

    return buckets


def word_count(text, id):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    '''

    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]

    filtered_tokens = [w for w in tokens if w not in all_stopwords]

    map = Counter(filtered_tokens) #create a map {term : counter}

    #creats the required tuples for each term in map
    result = [(term, (id, count)) for term, count in map.items()]

    return result


def reduce_word_counts(unsorted_pl):
    ''' Returns a sorted posting list by wiki_id. '''
    return sorted(unsorted_pl)


def calculate_df(postings):
    ''' Takes a posting list RDD and calculate the df for each token. '''
    return postings.mapValues(len)


In [None]:
# Inverted Index Pipeline:

# word counts map
word_counts = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)
# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered, bucket_name).collect()


                                                                                

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'index')
# upload to gs
index_src = "index.pkl"
index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://index.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 18.5 MiB/ 18.5 MiB]                                                
Operation completed over 1 objects/18.5 MiB.                                     


In [None]:
!gsutil ls -lh $index_dst

 18.49 MiB  2026-01-11T12:37:32Z  gs://319134458_214906935/postings_gcp/index.pkl
TOTAL: 1 objects, 19384795 bytes (18.49 MiB)


<h2 align="center"><b>Creating DL for BM25 and ID-to-Title Map:</b></h2>

In [None]:
#  Calculate Document Length (DL)

DL_rdd = word_counts \
    .map(lambda x: (x[1][0], x[1][1])) \
    .reduceByKey(lambda a, b: a + b)

dl_df = DL_rdd.toDF(["id", "dl"])

dl_final = dl_df.groupBy("id").sum("dl").withColumnRenamed("sum(dl)", "dl")
dl_final = dl_final.sort(col('dl').desc())
dl_final.repartition(1).write.csv(f'gs://{bucket_name}/dl', compression="gzip", mode="overwrite")

                                                                                

In [None]:
#  Save ID to Title Map:

id_title_df = parquetFile.select("id", "title").where(col("title").isNotNull())
id_title_df.repartition(1).write.csv(f'gs://{bucket_name}/id_to_title', compression="gzip", mode="overwrite")

                                                                                

<h2 align="center"><b>PageRank:</b></h2>

In [None]:
# Calculate PageRank:

from pyspark.sql.functions import col
import pickle

def generate_graph(pages):
    ''' Compute the directed graph generated by wiki links.
    Parameters:
    -----------
      pages: RDD
        An RDD where each row consists of one wikipedia articles with 'id' and
        'anchor_text'.
    Returns:
    --------
      edges: RDD
        An RDD where each row represents an edge in the directed graph created by
        the wikipedia links. The first entry should the source page id and the
        second entry is the destination page id. No duplicates should be present.
      vertices: RDD
        An RDD where each row represents a vetrix (node) in the directed graph
        created by the wikipedia links. No duplicates should be present.
    '''
    # Page 12 (source) -> points to: [page 100, page 200, page 100] -> (12, 100), (12, 200), (12, 100)
    # Divide pages into pairs:

    # row[0] = source_id
    # row[1] = anchor_text

    # creats edges
    edges = pages.flatMap(lambda row: [(row[0], link.id) for link in row[1]])
    edges = edges.distinct()  # Multiple links from page A to page B need to be represented by a single edge (edges are not weighted)

    # creats vertices (nodes)

    vertices = edges.flatMap(lambda x: [x[0], x[1]])  # returns the list of nodes
    vertices = vertices.distinct().map(
        lambda x: (x,))  # converts each unique id to a tuple ((id1),(id2)) for mapping later

    return edges, vertices

In [None]:
pages_links =spark.read.parquet("GS://319134458_214906935/multistream*").select("id",
"anchor_text").rdd

# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")

