# PAGERANK FOR AMAZON BOOK REVIEWS
## AMD - University of Milan
### Fatemeh Amirian 34015A

# PHASE ZERO: SET UP

In [1]:
%pip install pyspark graphframes kaggle findspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
# Standard library imports
import os
import time
import zipfile
from collections import defaultdict
from math import sqrt
import numpy as np
# Third-party imports
import findspark
import matplotlib.pyplot as plt
from graphframes import GraphFrame
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    DoubleType,
    LongType,
    StringType,
    StructField,
    StructType
)

from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
import re


In [3]:
# Create Hadoop directory and download winutils.exe
import urllib.request

# Create the hadoop/bin directory
hadoop_dir = "C:\\hadoop\\bin"
os.makedirs(hadoop_dir, exist_ok=True)

# Download winutils.exe
winutils_url = "https://github.com/steveloughran/winutils/raw/master/hadoop-2.7.1/bin/winutils.exe"
winutils_path = os.path.join(hadoop_dir, "winutils.exe")

if not os.path.exists(winutils_path):
    print("Downloading winutils.exe...")
    urllib.request.urlretrieve(winutils_url, winutils_path)
    print("Downloaded winutils.exe successfully!")
else:
    print("winutils.exe already exists!")

winutils.exe already exists!


In [4]:
# Set Java path for Windows (adjust path based on your Java installation)
os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-8.0.452.9-hotspot"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 8g pyspark-shell"

In [5]:
try:
    spark.stop()
except:
    pass

try:
    sc.stop()  
except:
    pass


# Initialize findspark first
findspark.init()

# Set Hadoop home properly for Windows
os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] = os.environ["PATH"] + ";C:\\hadoop\\bin"

# Setup Spark with GraphFrames support
spark = SparkSession.builder \
    .appName("PageRankForAmazonBookReviews") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12") \
    .getOrCreate()

sc = spark.sparkContext

# Test GraphFrames import
from graphframes import GraphFrame
print("GraphFrames imported successfully!")

GraphFrames imported successfully!


In [6]:
spark

Downloading the dataset!

In [None]:
# Replace "xxxxxx" with your actual Kaggle username and API key
os.environ['KAGGLE_USERNAME'] = "xxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxx"

# Download Amazon Books Review dataset
!kaggle datasets download -d mohamedbakhet/amazon-books-reviews

In [None]:
# prompt: unzip the data

with zipfile.ZipFile('amazon-books-reviews.zip', 'r') as zip_ref:
    zip_ref.extractall('.')

# PHASE ONE : DATA PROCESSING

In [7]:
# reading the dataset and selecting the required columns
selected_book_reviews = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .csv("D:\AMD\data\Books_rating.csv") \
    .select("Id", "Title", "User_id", "review/score")


# getting a glimpse of the dataset
selected_book_reviews.printSchema()
selected_book_reviews.cache()
selected_book_reviews.show(5, truncate=False)


root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- review/score: double (nullable = true)

+----------+------------------------------+--------------+------------+
|Id        |Title                         |User_id       |review/score|
+----------+------------------------------+--------------+------------+
|1882931173|Its Only Art If Its Well Hung!|AVCGYZL8FQQTD |4.0         |
|0826414346|Dr. Seuss: American Icon      |A30TK6U7DNS82R|5.0         |
|0826414346|Dr. Seuss: American Icon      |A3UH4UZ4RSVO82|5.0         |
|0826414346|Dr. Seuss: American Icon      |A2MVUWT453QH61|4.0         |
|0826414346|Dr. Seuss: American Icon      |A22X4XUPKF66MR|4.0         |
+----------+------------------------------+--------------+------------+
only showing top 5 rows



In [8]:
# how many rows do we have in the dataset?
print(f"Total number of reviews: {selected_book_reviews.count()}")

Total number of reviews: 3000000


In [9]:
# do we have any missing values in the columns we selected?
selected_book_reviews.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in selected_book_reviews.columns]).show()

+---+-----+-------+------------+
| Id|Title|User_id|review/score|
+---+-----+-------+------------+
|  0|  208| 561787|           0|
+---+-----+-------+------------+



In [10]:
# we drop the rows with missing User_id values
selected_book_reviews = selected_book_reviews.na.drop(subset=["User_id"])

In [11]:
# we need to fill the missing values in the Title column with a placeholder - unknown
selected_book_reviews = selected_book_reviews.fillna("unknown", subset=["Title"])

# let's check again for missing values
selected_book_reviews.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in selected_book_reviews.columns]).show()

+---+-----+-------+------------+
| Id|Title|User_id|review/score|
+---+-----+-------+------------+
|  0|    0|      0|           0|
+---+-----+-------+------------+



In [12]:
# we wanna check to see if we have any duplicate reviews by the same user for the same book Id?
duplicate_reviews = selected_book_reviews.groupBy("User_id", "Id").count().filter("count > 1")

# seeing the first few duplicate reviews
print("Duplicate reviews (User_id, Book_id):")
duplicate_reviews.show(5)

# counting the number of duplicate reviews (user-book id pairs)
num_duplicate_reviews = duplicate_reviews.count()
print(f"Number of user-book pairs with duplicate reviews: {num_duplicate_reviews}")


Duplicate reviews (User_id, Book_id):
+--------------+----------+-----+
|       User_id|        Id|count|
+--------------+----------+-----+
| AWLFVCT9128JV|B000PBZH5M|    2|
|A3LN7H4OQS176F|B000PDFO2Q|    2|
|A391X5MVRLB5WR|B000N63OYK|    2|
|A3A6575YNHTF7X|0333168968|    2|
|A1F0YPF7GDUVT1|B000870E8S|    2|
+--------------+----------+-----+
only showing top 5 rows

Number of user-book pairs with duplicate reviews: 31159


In [13]:
# removing duplicates based on User_id and Id
cleaned_reviews = selected_book_reviews.dropDuplicates(["User_id", "Id"])
print(f"Number of reviews after removing duplicates: {cleaned_reviews.count()}")

Number of reviews after removing duplicates: 2397614


In [14]:
# checking to see how many unique books and user Ids we have in the dataset
unique_books = cleaned_reviews.select("Id").distinct().count()
unique_users = cleaned_reviews.select("User_id").distinct().count()

print(f"Unique books: {unique_books:,}")
print(f"Unique users: {unique_users:,}")


# See books with most reviews - we will likely see some of these books in the top lists 
cleaned_reviews.groupBy("Id", "Title") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(5)

Unique books: 216,023
Unique users: 1,008,972
+----------+--------------------+-----+
|        Id|               Title|count|
+----------+--------------------+-----+
|B000IEZE3G|Harry Potter and ...| 3663|
|B000ILIJE0|The Hobbit There ...| 3576|
|B000NWU3I4|The Hobbitt, or t...| 3562|
|B000PC54NG|          The Hobbit| 3540|
|B000NWQXBA|          The Hobbit| 3535|
+----------+--------------------+-----+
only showing top 5 rows



### **Attention!!!** Here we notice something important. We seem to have similar books (same titles) but with different book ids. we need to test this hypothesis. 

First we normalize the titles (at least as much as we can). <br>
1. Make all of them lower case 
2. Remove versions such as (CD) - (Audio Book) **but we will exculde generic titles such as poems because in that case we would lose information**
<br>

The idea is that I want to see the true books by their content and not by their version. If i am ranking the books by their importance, I dont care if it was paper back or hard cover and people do tend to leave diffrent reviwes for diferent editions of the same book. <br>
However, Perfect normalization will not be achieved in this attempt but it will make our ranking more meaningful.

In [15]:
# normalizing the titles
# define a list of generic titles that we want to keep as is - i took a wild guess with this based on my experince in reading books
# also we make it into a function because we will need it for mappings

def normalize_titles(df, title_column="Title"):

    # Define generic titles to preserve as-is
    generic_titles = [
        "poems", "selected poems", "collected poems", 
        "essays", "letters", "stories", "collected stories",
        "short stories", "anthology", "complete works",
        "selected works", "collected works", "memoir",
        "biography", "autobiography", "diary", "journals"
    ]
    
    # Create UDF to check if title contains any generic terms
    @F.udf(returnType=BooleanType())
    def contains_generic_title(title):
        if title is None:
            return False
        for generic in generic_titles:
            # Check if title contains generic phrase as a whole word
            if re.search(r'\b' + re.escape(generic) + r'\b', title):
                return True
        return False
    
    # First lowercase all titles
    df_with_lowercase = df.withColumn(
        "LowercasedTitle", 
        F.lower(F.col(title_column))
    )
    
    # Apply normalization
    df_normalized = df_with_lowercase.withColumn(
        "Title_Norm",  # Using a consistent column name
        F.when(
            contains_generic_title(F.col("LowercasedTitle")),
            F.col("LowercasedTitle")
        ).otherwise(
            F.trim(F.regexp_replace(
                F.col("LowercasedTitle"), 
                r"\[.*?\]|\(.*?\)|audiobook|unabridged|cd", 
                ""
            ))
        )
    )
    
    # Drop the intermediate column to keep the DataFrame clean
    df_normalized = df_normalized.drop("LowercasedTitle")
    
    return df_normalized


# Apply the normalization function to cleaned_reviews
cleaned_reviews_norm = normalize_titles(cleaned_reviews)

# Count unique values in both columns
orig_count = cleaned_reviews_norm.select("Title").distinct().count()
norm_count = cleaned_reviews_norm.select("Title_Norm").distinct().count()

print(f"Unique original titles: {orig_count:,}")
print(f"Unique normalized titles: {norm_count:,}")

# what percent of titles were reduced?
print(f"Reduction: {orig_count - norm_count:,} ({(orig_count - norm_count)/orig_count*100:.2f}%)")

# show some examples to see if we did a good job
print("\nSample data with normalized titles:")
cleaned_reviews_norm.select("Title", "Title_Norm").distinct().orderBy(F.rand()).show(10, truncate=False)

Unique original titles: 206,712
Unique normalized titles: 198,919
Reduction: 7,793 (3.77%)

Sample data with normalized titles:
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|Title                                                                                                                                             |Title_Norm                                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|Pinball: The Lure o

Since This looks somewhat okay, we abondon the told title colomns and continue with our new colomn.

In [16]:
# Rename NormalizedTitle to Title_Norm
cleaned_reviews_norm = cleaned_reviews_norm.withColumnRenamed("NormalizedTitle", "Title_Norm")

In [17]:
cleaned_reviews_norm = cleaned_reviews_norm.drop("Title")
cleaned_reviews_norm = cleaned_reviews_norm.drop("LowercasedTitle")

In [18]:
# Verify the change
print("Schema after replacing Title with normalized version:")
cleaned_reviews_norm.printSchema()

Schema after replacing Title with normalized version:
root
 |-- Id: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- review/score: double (nullable = true)
 |-- Title_Norm: string (nullable = false)



In [19]:
cleaned_reviews_norm.show(5, truncate=False)

+----------+---------------------+------------+-----------------------------------------------------------+
|Id        |User_id              |review/score|Title_Norm                                                 |
+----------+---------------------+------------+-----------------------------------------------------------+
|0375765263|A005300013WHNZLTK9N6F|3.0         |cracking the ap calculus ab and bc exams, 2006-2007 edition|
|B000GQG7D2|A00540411RKGTDNU543WS|5.0         |the hobbit                                                 |
|B0008849SS|A008059932M4DUB2IWDB8|5.0         |seven pillars of wisdom,: a triumph                        |
|0451521196|A00891092QIVH4W1YP46A|2.0         |wuthering heights                                          |
|B0006AQ4LI|A00891092QIVH4W1YP46A|2.0         |wuthering heights                                          |
+----------+---------------------+------------+-----------------------------------------------------------+
only showing top 5 rows



#### Now the next problem is that titles sometimes have different book IDs (despite being the same book) and we need to verify this before thinking of a solution.

In [20]:
# grouping books by Title and count how many distinct IDs exist for each title
title_id_counts = cleaned_reviews_norm.groupBy("Title_Norm").agg(F.countDistinct("Id").alias("distinct_id_count"))

# Filter titles with more than one distinct ID
titles_with_same_name_diff_id = title_id_counts.filter(col("distinct_id_count") > 1)

print("Number of titles with the same title but different IDs:")
duplicate_titles = titles_with_same_name_diff_id.count()
print(duplicate_titles)

# top ten books with the most distinct IDs - same title
print("\nTop five books with the same title and different IDs ordered by the count of distinct ids:")
titles_with_same_name_diff_id.orderBy(col("distinct_id_count").desc()).show(10, truncate=False)

Number of titles with the same title but different IDs:
12121

Top five books with the same title and different IDs ordered by the count of distinct ids:
+----------------------------+-----------------+
|Title_Norm                  |distinct_id_count|
+----------------------------+-----------------+
|emma                        |20               |
|persuasion                  |18               |
|wuthering heights           |17               |
|great expectations          |16               |
|selected poems              |15               |
|jane eyre                   |14               |
|poems                       |14               |
|the white company           |13               |
|systematic theology         |13               |
|sermons on several occasions|13               |
+----------------------------+-----------------+
only showing top 10 rows



For the purpose of truly having distincs Human - Book reviews and also easier handling in the functions, we will create integer indexes. 

In [21]:
from pyspark.sql.window import Window

def add_generated_ids_norm(df):

    title_window = Window.orderBy("Title_Norm")
    titles_with_id = df.select("Title_Norm").distinct() \
                      .withColumn("book_id", F.dense_rank().over(title_window))

    user_window = Window.orderBy("User_id")
    users_with_id = df.select("User_id").distinct() \
                     .withColumn("user_id_int", F.dense_rank().over(user_window))

    result = df.join(titles_with_id, "Title_Norm") \
               .join(users_with_id, "User_id") \
               .select("User_id", "user_id_int", "Id", "Title_Norm", "book_id", 
                       F.col("review/score").cast("double").alias("score"))

    return result


df_with_ids = add_generated_ids_norm(cleaned_reviews_norm)
df_with_ids.show(10)

+--------------------+-----------+----------+--------------------+-------+-----+
|             User_id|user_id_int|        Id|          Title_Norm|book_id|score|
+--------------------+-----------+----------+--------------------+-------+-----+
|A00274963RTZUW5BU...|          6|B0006Y8M7S|how to win friend...|  72106|  5.0|
|A00274963RTZUW5BU...|          6|B00086Q244|how to win friend...|  72106|  5.0|
|A00540411RKGTDNU5...|         17|B000NWU3I4|the hobbitt, or t...| 158532|  5.0|
|A00540411RKGTDNU5...|         17|B000GQG7D2|          the hobbit| 158528|  5.0|
|A00540411RKGTDNU5...|         17|B000NWQXBA|          the hobbit| 158528|  5.0|
|A00540411RKGTDNU5...|         17|B000NDSX6C|          the hobbit| 158528|  5.0|
|A00540411RKGTDNU5...|         17|B000H9R1Q0|          the hobbit| 158528|  5.0|
|A00540411RKGTDNU5...|         17|B000PC54NG|          the hobbit| 158528|  5.0|
|A00540411RKGTDNU5...|         17|B000ILIJE0|the hobbit there ...| 158530|  5.0|
|A00540411RKGTDNU5...|      

Checking to see if there is any collision?

In [22]:
book_collisions = df_with_ids.select("book_id", "Title_Norm").distinct().groupBy("book_id").count().filter(F.col("count") > 1).count()
user_collisions = df_with_ids.select("user_id_int", "User_id").distinct().groupBy("user_id_int").count().filter(F.col("count") > 1).count()

print(f"Book collisions: {book_collisions}")
print(f"User collisions: {user_collisions}")

Book collisions: 0
User collisions: 0


In [23]:
# only keeping what we need 
indexed_data = df_with_ids.select("user_id_int", "book_id", "Title_Norm", "score")
indexed_data.show(10)

+-----------+-------+--------------------+-----+
|user_id_int|book_id|          Title_Norm|score|
+-----------+-------+--------------------+-----+
|          6|  72106|how to win friend...|  5.0|
|          6|  72106|how to win friend...|  5.0|
|         17| 158532|the hobbitt, or t...|  5.0|
|         17| 158528|          the hobbit|  5.0|
|         17| 158528|          the hobbit|  5.0|
|         17| 158528|          the hobbit|  5.0|
|         17| 158528|          the hobbit|  5.0|
|         17| 158528|          the hobbit|  5.0|
|         17| 158530|the hobbit there ...|  5.0|
|         17| 158529|the hobbit or the...|  5.0|
+-----------+-------+--------------------+-----+
only showing top 10 rows



Here we can see that we run into more duplicate human-book pairs which makes sense since we basically turned many different book variations into one id, so we remove duplicates again.

In [24]:
# Remove duplicate user-book pairs
indexed_data_unique = indexed_data.dropDuplicates(["user_id_int", "book_id"])

a = indexed_data.count()
b = indexed_data_unique.count()

print("Original count:", a)
print("After removing duplicates:", b)
print("Duplicates removed:", a-b)

Original count: 2397614
After removing duplicates: 1931845
Duplicates removed: 465769


In [25]:
indexed_data_unique.show(5)

+-----------+-------+--------------------+-----+
|user_id_int|book_id|          Title_Norm|score|
+-----------+-------+--------------------+-----+
|          1| 181419|   this calder range|  5.0|
|          9| 146495|the breadman's he...|  5.0|
|         37|  79182|jane eyre / wuthe...|  5.0|
|         45|  80657|johnny tremain: a...|  5.0|
|         93|  39854| dawn in eclipse bay|  3.0|
+-----------+-------+--------------------+-----+
only showing top 5 rows



In [27]:
# Count unique books
unique_books = indexed_data_unique.select("book_id").distinct().count()

# Count unique users
unique_users = indexed_data_unique.select("user_id_int").distinct().count()

# Calculate average reviews per user
reviews_per_user = indexed_data_unique.groupBy("user_id_int").count()
avg_reviews_per_user = reviews_per_user.select(F.avg("count")).first()[0]

# Print the statistics
print(f"Number of unique books: {unique_books:,}")
print(f"Number of unique users: {unique_users:,}")
print(f"Average reviews per user: {avg_reviews_per_user:.2f}")

# Additional distribution statistics
reviews_distribution = reviews_per_user.select(
    F.min("count").alias("min_reviews"),
    F.expr("percentile(count, 0.25)").alias("Q1"),
    F.expr("percentile(count, 0.5)").alias("Q2"),
    F.expr("percentile(count, 0.75)").alias("Q3"),
    F.max("count").alias("max_reviews")
)

reviews_distribution.show()

# Show top 10 most active reviewers
print("\nTop 10 Most Active Reviewers:")
reviews_per_user.orderBy(F.col("count").desc()).show(10)

Number of unique books: 198,919
Number of unique users: 1,008,972
Average reviews per user: 1.91
+-----------+---+---+---+-----------+
|min_reviews| Q1| Q2| Q3|max_reviews|
+-----------+---+---+---+-----------+
|          1|1.0|1.0|1.0|       5243|
+-----------+---+---+---+-----------+


Top 10 Most Active Reviewers:
+-----------+-----+
|user_id_int|count|
+-----------+-----+
|      35080| 5243|
|     859798| 3297|
|     870831| 1524|
|     246677| 1374|
|     208408| 1071|
|     148621| 1060|
|     448655|  859|
|     164799|  779|
|     695950|  713|
|      96987|  684|
+-----------+-----+
only showing top 10 rows



**ATTENTION** <br>
Because during debugging the spark session would often crash and i had to restart the kernel, i saved the final data so each time that i would re-run the spark, i wouldnt have to re-run the data processing too. <br>
be careful this snippet of code will make and save inside the relative directory. You might wanna skip the next 2 blocks.

In [28]:
# Convert to pandas DataFrame for reliable local storage
final_data = indexed_data_unique  # Ensure we use the right DataFrame
pandas_df = final_data.toPandas()

# Create directory if needed
os.makedirs("pandas_data", exist_ok=True)

# Save as CSV using pandas (no Hadoop dependency)
pandas_df.to_csv("pandas_data/final_data.csv", index=False)

# Also save as pickle for faster reloading
pandas_df.to_pickle("pandas_data/final_data.pkl")

print(f"Successfully saved DataFrame with {len(pandas_df)} rows")

Successfully saved DataFrame with 1931845 rows


ONLY USE IF THE SPARK CRASHES AND YOU ARE TRYING TO RESTART THE SESSION. 

In [None]:

# Read the final data back from the saved files

final_data = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("pandas_data/final_data.csv")
final_data.show(5)


## PHASE ONE (1.1): MAPPINGS

In [29]:
# title - book id
def create_title_mapping(df):
    # Select only the needed columns and get distinct pairs
    title_rows = df.select("book_id", "Title_Norm").distinct().collect()
    
    # Convert to dictionary
    title_mapping = {row["book_id"]: row["Title_Norm"] for row in title_rows}
    
    print(f"Created mapping for {len(title_mapping)} unique books")
    return title_mapping

# Create the mapping
title_mapping = create_title_mapping(final_data)

# Preview a few entries
print("\nSample title mappings:")
for book_id, title in list(title_mapping.items())[:4]:
    print(f"Book ID: {book_id} ---> Title: {title}")

Created mapping for 198919 unique books

Sample title mappings:
Book ID: 9436 ---> Title: aftershocks
Book ID: 120026 ---> Title: rats
Book ID: 162847 ---> Title: the lost continent
Book ID: 70848 ---> Title: how to argue and win every time


In [30]:
# average rating - book id
def create_book_rating_mapping(df):
    # Group by book_id and calculate average score
    avg_ratings = df.groupBy("book_id") \
                    .agg(F.avg("score").alias("avg_rating"),
                         F.count("score").alias("num_ratings")) \
                    .collect()
    
    # Convert to dictionary
    rating_mapping = {row["book_id"]: {
                          "avg_rating": float(row["avg_rating"]),
                          "num_ratings": row["num_ratings"]
                      } for row in avg_ratings}
    
    print(f"Created rating mapping for {len(rating_mapping)} books")
    return rating_mapping

book_rating_mapping = create_book_rating_mapping(final_data)

# Preview a few entries
print("\nSample book rating mappings:")
sample_books = list(book_rating_mapping.keys())[:5]
for book_id in sample_books:
    rating_info = book_rating_mapping[book_id]
    print(f"Book ID: {book_id} ---> Rating: {rating_info['avg_rating']:.2f}/5 ({rating_info['num_ratings']} reviews)")

Created rating mapping for 198919 books

Sample book rating mappings:
Book ID: 120706 ---> Rating: 4.39/5 (442 reviews)
Book ID: 103357 ---> Rating: 5.00/5 (1 reviews)
Book ID: 89476 ---> Rating: 3.87/5 (231 reviews)
Book ID: 18654 ---> Rating: 3.93/5 (41 reviews)
Book ID: 7754 ---> Rating: 5.00/5 (6 reviews)


We need genres for two thing:
1. showing the genre of top books across all models.
2. making topic sensitive pagerank
<br>


First we read it from the second csv file and we normalize its titles for comparing against ours.

In [31]:
# book genre - book id

book_data = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", '"') \
    .option("escape", '"') \
    .option("multiLine", "true") \
    .csv("D:/AMD/data/Books_data.csv") \
    .select("Title", "categories")  

book_data_norm = normalize_titles(book_data)

genre_by_title = {row["Title_Norm"]: row["categories"] 
                  for row in book_data_norm.select("Title_Norm", "categories").collect() 
                  if row["categories"] is not None}

print(f"Loaded metadata for {len(genre_by_title)} books with genres")

def create_genre_mapping(df):
    books = df.select("book_id", "Title_Norm").distinct().collect()
    
    genre_mapping = {row["book_id"]: genre_by_title.get(row["Title_Norm"], "Unknown") 
                    for row in books}
    
    matched = sum(1 for genre in genre_mapping.values() if genre != "Unknown")
    print(f"Created genre mapping for {len(genre_mapping)} books")
    print(f"Matched genres: {matched} ({matched/len(genre_mapping)*100:.1f}%)")
    
    return genre_mapping

genre_mapping = create_genre_mapping(final_data)

# Preview a few entries
print("\nSample genre mappings:")
for book_id in list(genre_mapping.keys())[:4]:
    title = title_mapping.get(book_id, "Unknown")
    genre = genre_mapping[book_id]
    print(f"Book ID: {book_id} ---> Genre: {genre}")

Loaded metadata for 165975 books with genres
Created genre mapping for 198919 books
Matched genres: 161574 (81.2%)

Sample genre mappings:
Book ID: 9436 ---> Genre: ['Fiction']
Book ID: 120026 ---> Genre: ['Art']
Book ID: 162847 ---> Genre: ['Union catalogs']
Book ID: 70848 ---> Genre: ['Language Arts & Disciplines']


What genres are most common? You can use this info for gerring topic sensitive pagerank later.

In [32]:
# Get the top 10 genres
genre_counts = {}
for genre in genre_mapping.values():
    genre_counts[genre] = genre_counts.get(genre, 0) + 1

sorted_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)

print("\nTop 10 genres by book count:")
for i, (genre, count) in enumerate(sorted_genres[:10]):
    print(f"{i+1}. {genre}: {count} books")


Top 10 genres by book count:
1. Unknown: 37345 books
2. ['Fiction']: 21731 books
3. ['Religion']: 8995 books
4. ['History']: 8874 books
5. ['Juvenile Fiction']: 6267 books
6. ['Biography & Autobiography']: 6037 books
7. ['Business & Economics']: 5410 books
8. ['Computers']: 4173 books
9. ['Social Science']: 3552 books
10. ['Juvenile Nonfiction']: 3221 books


# **PHASE TWO: BIULDING THE GRAPH**

### What is the Graph logic?

Writing a fucntion that makes the graph - Books connected if they share 2 or more reviews from at least 2 unique people. <br>
BOOKA <--> BOOKB if USER-A and USER-B reviewed both A and B

One more think to point out is that pagerank needs directed graph and since our book network has no direction, we have to double each link and make it **bidirectional**.
<br>
BOOK1 -> BOOK2
BOOK1 <- BOOK2

In [52]:
from pyspark.sql import functions as F
from graphframes import GraphFrame


def build_book_graph(df, threshold, purpose):
    # 1. Prepare user-book pairs (already unique)
    user_books = df.select("user_id_int", "book_id").cache()

    # 2. Generate book-book pairs for users, count shared users (OPTIMIZED)
    book_pairs = user_books.alias("ub1").join(
        user_books.alias("ub2"),
        F.col("ub1.user_id_int") == F.col("ub2.user_id_int")
    ).filter(
        F.col("ub1.book_id") < F.col("ub2.book_id")
    ).select(
        F.col("ub1.book_id").alias("src"),
        F.col("ub2.book_id").alias("dst")
    )

    edges_df = book_pairs.groupBy("src", "dst").count() \
        .filter(F.col("count") >= threshold) \
        .select("src", "dst").cache()

    # 3. Make undirected (bidirectional) edges
    edges_df = edges_df.union(edges_df.select(F.col("dst").alias("src"), F.col("src").alias("dst")))

    # 4. Vertices DataFrame
    vertices_df = edges_df.select("src").union(edges_df.select("dst")).distinct().withColumnRenamed("src", "id")

    # 5. Build GraphFrame
    graph = GraphFrame(vertices_df, edges_df)

    if purpose == "pagerank_builtin":
        # Only return GraphFrame for built-in PageRank
        return {"graph": graph}
    elif purpose == "pagerank_rdd":
        # Return RDD of edges and the number of nodes for RDD-based algorithms
        edges_rdd = graph.edges.rdd.map(lambda row: (row["src"], row["dst"]))
        nodes_count = vertices_df.count()
        return {"edges_rdd": edges_rdd, "nodes_count": nodes_count}
    elif purpose == "pagerank_python":
        # Return local lists for pure Python algorithms (be careful with large graphs!)
        links = [(row["src"], row["dst"]) for row in edges_df.collect()]
        pages = [row["id"] for row in vertices_df.collect()]
        return {"pages": pages, "links": links}
    else:
        raise ValueError(f"Unknown purpose: {purpose}")

# **PHASE THREE: OBTAINING A BENCHMRK**

Non personalized classic pagerak from the graphframe. Its results will be a testing ground for us.

If you refer to the graphframe user giude you can see this: <br>
link: https://graphframes.io/docs/_site/user-guide.html#pagerank

You can see we have 2 oprions, either fixed iteration or fixed tolerance (not at the same time). I Went with fixed iterations, you can easily test the other one as well.

In [35]:
data_1 = build_book_graph(final_data, threshold=2, purpose="pagerank_builtin")



In [36]:
# Count vertices and edges in the graph (just to know the size of the graph)
num_vertices = data_1['graph'].vertices.count()
num_edges = data_1['graph'].edges.count()

print(f"Number of vertices in the graph: {num_vertices:,}")
print(f"Number of edges in the graph: {num_edges:,}")

Number of vertices in the graph: 47,488
Number of edges in the graph: 4,084,494


In [37]:
# since the graph was bidirectional, we can divide the number of edges by 2 to get the actual number of connections
print(f"Number of unique connections (edges/2): {num_edges // 2:,}")

Number of unique connections (edges/2): 2,042,247


In [38]:
# running the built-in PageRank algorithm in oder to get a benchmark
# ATTENTION: This may take some time and it is slow, run this at your own risk!

result_one = data_1['graph'].pageRank(resetProbability=0.15, maxIter=20)



In [39]:
# First extract the top 20 book IDs - the benchmark for PageRank
# ATTENTION: This one too will take long, run this at your own risk!

top_20_books = result_one.vertices.orderBy(result_one.vertices.pagerank.desc()).limit(20).collect()
top_20_ids = [row["id"] for row in top_20_books]



In [40]:
# Create a formatted table with all required information
print("\nTop 20 Books by PageRank:")
print("-" * 20)
print(f"{'#':^4} | {'Title':^40} | {'Genre':^20} | {'Avg Rating':^10} | {'PageRank':^10}")
print("-" * 20)

for i, row in enumerate(top_20_books):
    book_id = row["id"]
    pagerank_score = row["pagerank"]
    
    # Get metadata from your mappings
    title = title_mapping.get(book_id, "Unknown Title")
    title_display = (title[:37] + "...") if len(title) > 40 else title
    
    # Get genre - limited to 20 chars
    genre = genre_mapping.get(book_id, "Unknown")
    genre_display = (genre[:17] + "...") if len(genre) > 20 else genre
    
    # Get rating info
    rating_info = book_rating_mapping.get(book_id, {"avg_rating": 0.0})
    avg_rating = rating_info["avg_rating"]
    
    # Print formatted row
    print(f"{i+1:^4} | {title_display:<40} | {genre_display:<20} | {avg_rating:^10.2f} | {pagerank_score:.6f}")

print(f"\nTop 20 book IDs: {top_20_ids}")


Top 20 Books by PageRank:
--------------------
 #   |                  Title                   |        Genre         | Avg Rating |  PageRank 
--------------------
 1   | harry potter and the sorcerer's stone    | ['Juvenile Fiction'] |    4.69    | 56.752445
 2   | blink: the power of thinking without ... | ['Business & Econ... |    3.70    | 41.615154
 3   | the catcher in the rye                   | ['Young Adult Fic... |    3.92    | 41.596783
 4   | five people you meet in heaven           | ['Fiction']          |    4.16    | 39.573220
 5   | john adams                               | ['Electronic books'] |    4.68    | 38.860889
 6   | night                                    | ['Juvenile Fiction'] |    4.55    | 35.996820
 7   | the great gatsby                         | ['Fiction']          |    4.15    | 35.926736
 8   | guns, germs, and steel: the fates of ... | ['History']          |    4.01    | 35.807039
 9   | the tipping point: how little things ... | ['Reference']   

The above list is our benchmark, ideally we want the result of our following models to be identical or at least very close to this.

# **PHASE FOUR: ATTEMPTING TO WRITE THE ALGORITHM FROM SCRATCH**

First we write a function using only python, just to see if we can implement this logic ourselves or not.

In [45]:
def pagerank_python(pages, links, beta=0.85, max_iter=100, tol=1e-6, silent=False):

    if not pages:
        raise ValueError("Pages list cannot be empty")
    if not links:
        raise ValueError("Links list cannot be empty")
    if not (0 <= beta <= 1):
        raise ValueError(f"Beta must be between 0 and 1, got {beta}")
    if max_iter <= 0:
        raise ValueError(f"Max iterations must be positive, got {max_iter}")
    if tol <= 0:
        raise ValueError(f"Tolerance must be positive, got {tol}")
    
    N = len(pages)
    if not silent:
        print(f"Starting PageRank with {N:,} pages")
    
    ranks = {p: 1.0 / N for p in pages}
    initial_sum = sum(ranks.values())
    if not silent:
        print(f"Initialized ranks (sum={initial_sum:.8f})")
    
    # Step 2: Build adjacency list
    adjacency = defaultdict(list)
    for src, dst in links:
        adjacency[src].append(dst)
    
    nodes_with_outlinks = len(adjacency)
    total_edges = len(links)
    if not silent:
        print(f"Built adjacency list ({nodes_with_outlinks:,} nodes with outlinks, {total_edges:,} total edges)")
        print(f"Starting PageRank iterations...")
        print("-" * 60)
    
    for iteration in range(max_iter):
        new_ranks = {p: (1 - beta) / N for p in pages}

        for src in pages:
            neighbors = adjacency.get(src, [])
            if not neighbors:
                continue
            share = ranks[src] / len(neighbors)
            for dst in neighbors:
                new_ranks[dst] += beta * share

        diff = (sum((new_ranks[p] - ranks[p])**2 for p in pages))**0.5  # L2 norm
        rank_sum = sum(new_ranks.values())
        
        if not silent:
            print(f"Iteration {iteration + 1:3d}: L2 diff={diff:.6f}, sum={rank_sum:.8f}")
        
        ranks = new_ranks
        if diff < tol:
            break

    if not silent:
        print("-" * 60)
        print(f"Converged after {iteration + 1} iterations")
        
        # Final validation
        final_sum = sum(ranks.values())
        if abs(final_sum - 1.0) > 1e-6:
            print(f"Warning: Final sum ({final_sum:.8f}) deviates from 1.0")
        else:
            print(f"Rank sum validation passed")
    
    return sorted(ranks.items(), key=lambda x: -x[1]), iteration + 1

In [42]:
# first we make the graph for this specific function
data_2 = build_book_graph(final_data, threshold=2, purpose="pagerank_python")

In [46]:
result_two = pagerank_python(data_2['pages'], data_2['links'], beta=0.85, max_iter=100, tol=1e-6, silent=False)

Starting PageRank with 47,488 pages
Initialized ranks (sum=1.00000000)
Built adjacency list (47,488 nodes with outlinks, 4,084,494 total edges)
Starting PageRank iterations...
------------------------------------------------------------
Iteration   1: L2 diff=0.010299, sum=1.00000000
Iteration   2: L2 diff=0.003460, sum=1.00000000
Iteration   3: L2 diff=0.000818, sum=1.00000000
Iteration   4: L2 diff=0.000437, sum=1.00000000
Iteration   5: L2 diff=0.000266, sum=1.00000000
Iteration   6: L2 diff=0.000181, sum=1.00000000
Iteration   7: L2 diff=0.000130, sum=1.00000000
Iteration   8: L2 diff=0.000101, sum=1.00000000
Iteration   9: L2 diff=0.000079, sum=1.00000000
Iteration  10: L2 diff=0.000065, sum=1.00000000
Iteration  11: L2 diff=0.000053, sum=1.00000000
Iteration  12: L2 diff=0.000044, sum=1.00000000
Iteration  13: L2 diff=0.000037, sum=1.00000000
Iteration  14: L2 diff=0.000031, sum=1.00000000
Iteration  15: L2 diff=0.000026, sum=1.00000000
Iteration  16: L2 diff=0.000022, sum=1.0000

In [47]:
# TOP 20 Books by Custom PageRank - PURE PYTHON implementation


custom_pagerank_sorted, iterations_to_converge = result_two  
custom_pagerank_top20 = custom_pagerank_sorted[:20]  
custom_pagerank_ids = [book_id for book_id, score in custom_pagerank_top20]  

# Create a formatted table with all required information
print(f"\nTop 20 Books by Custom PageRank (converged in {iterations_to_converge} iterations):")
print("-" * 60)
print(f"{'#':^4} | {'Title':^40} | {'Genre':^20} | {'Avg Rating':^10} | {'PageRank':^10}")
print("-" * 100)

for i, (book_id, pagerank_score) in enumerate(custom_pagerank_top20):
    title = title_mapping.get(book_id, "Unknown Title")
    title_display = (title[:37] + "...") if len(title) > 40 else title
    
    genre = genre_mapping.get(book_id, "Unknown")
    genre_display = (genre[:17] + "...") if len(genre) > 20 else genre
    
    rating_info = book_rating_mapping.get(book_id, {"avg_rating": 0.0})
    avg_rating = rating_info["avg_rating"]
    
    # Print formatted row
    print(f"{i+1:^4} | {title_display:<40} | {genre_display:<20} | {avg_rating:^10.2f} | {pagerank_score:.6f}")

print(f"\nTop 20 book IDs from custom PageRank: {custom_pagerank_ids}")


Top 20 Books by Custom PageRank (converged in 35 iterations):
------------------------------------------------------------
 #   |                  Title                   |        Genre         | Avg Rating |  PageRank 
----------------------------------------------------------------------------------------------------
 1   | harry potter and the sorcerer's stone    | ['Juvenile Fiction'] |    4.69    | 0.001195
 2   | blink: the power of thinking without ... | ['Business & Econ... |    3.70    | 0.000876
 3   | the catcher in the rye                   | ['Young Adult Fic... |    3.92    | 0.000876
 4   | five people you meet in heaven           | ['Fiction']          |    4.16    | 0.000833
 5   | john adams                               | ['Electronic books'] |    4.68    | 0.000818
 6   | night                                    | ['Juvenile Fiction'] |    4.55    | 0.000758
 7   | the great gatsby                         | ['Fiction']          |    4.15    | 0.000757
 8   | guns, 

# **PHASE FOUR (4.1): USING SPARK RDD FOR PAGERNAK**

In [53]:
data_3 = build_book_graph(final_data, threshold=2, purpose="pagerank_rdd")

In [67]:
def pagerank_rdd(edges_rdd, nodes_count, damping_factor=0.85, max_iter=100, tolerance=1e-6, silent=False):

    
    # Input validation
    if not edges_rdd:
        raise ValueError("Edges RDD cannot be empty")
    if nodes_count <= 0:
        raise ValueError(f"Nodes count must be positive, got {nodes_count}")
    if not (0 <= damping_factor <= 1):
        raise ValueError(f"Damping factor must be between 0 and 1, got {damping_factor}")
    if max_iter <= 0:
        raise ValueError(f"Max iterations must be positive, got {max_iter}")
    if tolerance <= 0:
        raise ValueError(f"Tolerance must be positive, got {tolerance}")
    
    if not silent:
        print(f"Starting PageRank RDD with {nodes_count:,} nodes")
    
    if not silent:
        print("Building adjacency list...")
    adjacency_rdd = edges_rdd.groupByKey().mapValues(list).cache()
    if not silent:
        print("adjacency list built and cached")
    
    # Identify all nodes in the graph
    if not silent:
        print("Identifying all nodes in the graph...")
    all_nodes = edges_rdd.flatMap(lambda x: [x[0], x[1]]).distinct().collect()
    
    teleport_prob = (1 - damping_factor) / len(all_nodes)
    if not silent:
        print(f"Teleportation probability set: {teleport_prob:.8f}")
    
    ranks = {node: 1.0 / len(all_nodes) for node in all_nodes}
    initial_sum = sum(ranks.values())
    if not silent:
        print(f"Initial ranks set (sum={initial_sum:.8f})")
        print(f"Starting PageRank iterations...")
        print("-" * 60)
    
    for iteration in range(max_iter):
        old_ranks = ranks.copy()
        
        ranks_bc = edges_rdd.context.broadcast(ranks)
        
        contributions = adjacency_rdd.flatMap(
            lambda node_neighbors: [
                (neighbor, damping_factor * ranks_bc.value[node_neighbors[0]] / len(node_neighbors[1]))
                for neighbor in node_neighbors[1]
            ]
        ).reduceByKey(lambda a, b: a + b).collectAsMap()
        
        ranks = {node: teleport_prob + contributions.get(node, 0) for node in all_nodes}
        
        diff = sum((ranks[node] - old_ranks[node])**2 for node in all_nodes)**0.5 # L2 norm
        
        total_sum = sum(ranks.values())
        
        if not silent:
            print(f"Iteration {iteration + 1:3d}: L2 diff={diff:.6f}, sum={total_sum:.8f}")
        
        ranks_bc.unpersist()
        
        if diff < tolerance:
            if not silent:
                print("-" * 60)
                print(f"Converged after {iteration + 1} iterations")
            break
    
    if not silent:
        if iteration + 1 == max_iter:
            print("-" * 60)
            print(f"Reached maximum iterations ({max_iter}) without full convergence")
        
        print(f"Final rank sum: {sum(ranks.values()):.8f}")
        
        # Final validation
        final_sum = sum(ranks.values())
        if abs(final_sum - 1.0) > 1e-6:
            print(f"Warning: Final sum ({final_sum:.8f}) deviates from 1.0")
        else:
            print(f"Rank sum validation passed")
    
    adjacency_rdd.unpersist()
    
    # Return sorted ranks and the number of iterations performed
    return sorted(ranks.items(), key=lambda x: -x[1]), (iteration + 1)

carefull, this takes longer than the python version to run.

In [68]:
# Extract the edges_rdd and nodes_count from your graph
edges_rdd = data_3["edges_rdd"]
nodes_count = data_3["nodes_count"]

# Check partitioning
print(f"edges_rdd partitions: {edges_rdd.getNumPartitions()}")
print(f"edges_rdd is cached: {edges_rdd.is_cached}")

edges_rdd_fixed = edges_rdd.repartition(8).cache()
edges_rdd_fixed.count()  # Force caching

# Run PageRank with your function
ranks, iterations = pagerank_rdd(edges_rdd_fixed, nodes_count)


edges_rdd partitions: 400
edges_rdd is cached: False
Starting PageRank RDD with 47,488 nodes
Building adjacency list...
adjacency list built and cached
Identifying all nodes in the graph...
Teleportation probability set: 0.00000316
Initial ranks set (sum=1.00000000)
Starting PageRank iterations...
------------------------------------------------------------
Iteration   1: L2 diff=0.010299, sum=1.00000000
Iteration   2: L2 diff=0.003460, sum=1.00000000
Iteration   3: L2 diff=0.000818, sum=1.00000000
Iteration   4: L2 diff=0.000437, sum=1.00000000
Iteration   5: L2 diff=0.000266, sum=1.00000000
Iteration   6: L2 diff=0.000181, sum=1.00000000
Iteration   7: L2 diff=0.000130, sum=1.00000000
Iteration   8: L2 diff=0.000101, sum=1.00000000
Iteration   9: L2 diff=0.000079, sum=1.00000000
Iteration  10: L2 diff=0.000065, sum=1.00000000
Iteration  11: L2 diff=0.000053, sum=1.00000000
Iteration  12: L2 diff=0.000044, sum=1.00000000
Iteration  13: L2 diff=0.000037, sum=1.00000000
Iteration  14: L

In [55]:
# TOP 20 Books by RDD PageRank implementation
rdd_iterations = iterations
rdd_pagerank_top20 = ranks[:20]
rdd_pagerank_ids = [book_id for book_id, score in rdd_pagerank_top20]

# Create a formatted table with all required information
print(f"\nTop 20 Books by RDD PageRank (converged in {rdd_iterations} iterations):")
print("-" * 60)
print(f"{'#':^4} | {'Title':^40} | {'Genre':^20} | {'Avg Rating':^10} | {'PageRank':^10}")
print("-" * 100)

for i, (book_id, pagerank_score) in enumerate(rdd_pagerank_top20):
    title = title_mapping.get(book_id, "Unknown Title")
    title_display = (title[:37] + "...") if len(title) > 40 else title
    
    genre = genre_mapping.get(book_id, "Unknown")
    genre_display = (genre[:17] + "...") if len(genre) > 20 else genre
    
    rating_info = book_rating_mapping.get(book_id, {"avg_rating": 0.0})
    avg_rating = rating_info["avg_rating"]
    
    # Print formatted row
    print(f"{i+1:^4} | {title_display:<40} | {genre_display:<20} | {avg_rating:^10.2f} | {pagerank_score:.6f}")

print(f"\nTop 20 book IDs from RDD PageRank: {rdd_pagerank_ids}")


Top 20 Books by RDD PageRank (converged in 35 iterations):
------------------------------------------------------------
 #   |                  Title                   |        Genre         | Avg Rating |  PageRank 
----------------------------------------------------------------------------------------------------
 1   | harry potter and the sorcerer's stone    | ['Juvenile Fiction'] |    4.69    | 0.001195
 2   | blink: the power of thinking without ... | ['Business & Econ... |    3.70    | 0.000876
 3   | the catcher in the rye                   | ['Young Adult Fic... |    3.92    | 0.000876
 4   | five people you meet in heaven           | ['Fiction']          |    4.16    | 0.000833
 5   | john adams                               | ['Electronic books'] |    4.68    | 0.000818
 6   | night                                    | ['Juvenile Fiction'] |    4.55    | 0.000758
 7   | the great gatsby                         | ['Fiction']          |    4.15    | 0.000757
 8   | guns, ger

## COMPARING THE RESULTS BETWEEN THREE LISTS <br>
both in content (book titles) and their rankings.

In [58]:
# are they even similar?
common_all = set(top_20_ids) & set(custom_pagerank_ids) & set(rdd_pagerank_ids)
common_builtin_python = set(top_20_ids) & set(custom_pagerank_ids)
common_builtin_rdd = set(top_20_ids) & set(rdd_pagerank_ids)
common_python_rdd = set(custom_pagerank_ids) & set(rdd_pagerank_ids)

# Print overlap statistics
print(f"Common books in all three lists: {len(common_all)}/20")
print(f"Common books between Built-in and Python: {len(common_builtin_python)}/20")
print(f"Common books between Built-in and RDD: {len(common_builtin_rdd)}/20")
print(f"Common books between Python and RDD: {len(common_python_rdd)}/20")

# Check if the top books match across implementations
print("\nTop 5 books comparison:")
for i in range(5):
    if i < len(top_20_ids) and i < len(custom_pagerank_ids) and i < len(rdd_pagerank_ids):
        print(f"Position {i+1}: Built-in={top_20_ids[i]}, Python={custom_pagerank_ids[i]}, RDD={rdd_pagerank_ids[i]}")
        match = (top_20_ids[i] == custom_pagerank_ids[i] == rdd_pagerank_ids[i])
        print(f"  Match: {'yes' if match else 'no'}")

Common books in all three lists: 20/20
Common books between Built-in and Python: 20/20
Common books between Built-in and RDD: 20/20
Common books between Python and RDD: 20/20

Top 5 books comparison:
Position 1: Built-in=65953, Python=65953, RDD=65953
  Match: yes
Position 2: Built-in=22785, Python=22785, RDD=22785
  Match: yes
Position 3: Built-in=147537, Python=147537, RDD=147537
  Match: yes
Position 4: Built-in=55057, Python=55057, RDD=55057
  Match: yes
Position 5: Built-in=80380, Python=80380, RDD=80380
  Match: yes


The only difference is 2,3 of positions. Neglegable since top 20 books are the same.

# **PHASE FIVE: TOPIC SENSITIVE PAGERANK**

For this one too we will be using pure python and rdd version.
The only diffrence is that here we dont teleport to ANY place, Instead we travel to destinations that we are intrested in.

In [59]:
def pagerank_python_topic_sensitive(pages, links, genre_mapping, target_genres=None, 
                                   beta=0.85, max_iter=100, tol=1e-6, silent=False):

    
    # Input validation
    if not pages:
        raise ValueError("Pages list cannot be empty")
    if not links:
        raise ValueError("Links list cannot be empty")
    if not isinstance(genre_mapping, dict):
        raise ValueError("Genre mapping must be a dictionary")
    if not (0 <= beta <= 1):
        raise ValueError(f"Beta must be between 0 and 1, got {beta}")
    if max_iter <= 0:
        raise ValueError(f"Max iterations must be positive, got {max_iter}")
    if tol <= 0:
        raise ValueError(f"Tolerance must be positive, got {tol}")
    
    N = len(pages)
    if not silent:
        print(f"Starting Topic-Sensitive PageRank with {N:,} pages")
    
    ranks = {p: 1.0 / N for p in pages}
    initial_sum = sum(ranks.values())
    if not silent:
        print(f"Initialized ranks (sum={initial_sum:.8f})")

    adjacency = defaultdict(list)
    for src, dst in links:
        adjacency[src].append(dst)
    
    nodes_with_outlinks = len(adjacency)
    total_edges = len(links)
    if not silent:
        print(f"Built adjacency list ({nodes_with_outlinks:,} nodes with outlinks, {total_edges:,} total edges)")
    
    if target_genres is None:
        # Original behavior: uniform teleportation to all pages
        teleport_prob = (1 - beta) / N
        teleport_probs = {p: teleport_prob for p in pages}
        if not silent:
            print(f"Using uniform teleportation (original PageRank behavior)")
    else:
        topic_pages = [p for p in pages if any(str(g) in str(genre_mapping.get(p, "")) for g in target_genres)]
        
        if not topic_pages:
            # going back to uniform if no target pages found
            teleport_prob = (1 - beta) / N
            teleport_probs = {p: teleport_prob for p in pages}
            if not silent:
                print(f"Warning: No pages found for target genres {target_genres}. Using uniform teleportation.")
        else:
            topic_teleport_prob = (1 - beta) / len(topic_pages)
            teleport_probs = {p: topic_teleport_prob if p in topic_pages else 0.0 for p in pages}
            if not silent:
                print(f"Topic-sensitive teleportation: {len(topic_pages):,} pages in {target_genres}")

    if not silent:
        print(f"Starting PageRank iterations...")
        print("-" * 60)

    for iteration in range(max_iter):
        new_ranks = {p: teleport_probs[p] for p in pages}

        for src in pages:
            neighbors = adjacency.get(src, [])
            share = ranks[src] / len(neighbors)
            for dst in neighbors:
                new_ranks[dst] += beta * share

        diff = (sum((new_ranks[p] - ranks[p])**2 for p in pages))**0.5  # L2 norm
        rank_sum = sum(new_ranks.values())
        
        if not silent:
            print(f"Iteration {iteration + 1:3d}: L2 diff={diff:.6f}, sum={rank_sum:.8f}")
        
        ranks = new_ranks
        if diff < tol:
            break

    if not silent:
        print("-" * 60)
        print(f"Converged after {iteration + 1} iterations")
        
        # Final validation
        final_sum = sum(ranks.values())
        if abs(final_sum - 1.0) > 1e-6:
            print(f"Warning: Final sum ({final_sum:.8f}) deviates from 1.0")
        else:
            print(f"Rank sum validation passed")

    return sorted(ranks.items(), key=lambda x: -x[1]), iteration + 1

In [61]:
# lets choose a genre
result_two_fiction = pagerank_python_topic_sensitive(data_2['pages'], data_2['links'],genre_mapping, target_genres=['Fiction'], beta=0.85, max_iter=100, tol=1e-6, silent=False)

Starting Topic-Sensitive PageRank with 47,488 pages
Initialized ranks (sum=1.00000000)
Built adjacency list (47,488 nodes with outlinks, 4,084,494 total edges)
Topic-sensitive teleportation: 13,194 pages in ['Fiction']
Starting PageRank iterations...
------------------------------------------------------------
Iteration   1: L2 diff=0.010422, sum=1.00000000
Iteration   2: L2 diff=0.003976, sum=1.00000000
Iteration   3: L2 diff=0.001157, sum=1.00000000
Iteration   4: L2 diff=0.000737, sum=1.00000000
Iteration   5: L2 diff=0.000490, sum=1.00000000
Iteration   6: L2 diff=0.000361, sum=1.00000000
Iteration   7: L2 diff=0.000267, sum=1.00000000
Iteration   8: L2 diff=0.000213, sum=1.00000000
Iteration   9: L2 diff=0.000165, sum=1.00000000
Iteration  10: L2 diff=0.000137, sum=1.00000000
Iteration  11: L2 diff=0.000109, sum=1.00000000
Iteration  12: L2 diff=0.000093, sum=1.00000000
Iteration  13: L2 diff=0.000075, sum=1.00000000
Iteration  14: L2 diff=0.000065, sum=1.00000000
Iteration  15: L

In [62]:
# Extract the top 20 Fiction books from topic-sensitive PageRank
fiction_pagerank_sorted, iterations = result_two_fiction
fiction_pagerank_top20 = fiction_pagerank_sorted[:20]

# Create a formatted table showing the Fiction-focused PageRank results
print(f"\nTop 20 Fiction Books (Topic-Sensitive PageRank, converged in {iterations} iterations):")
print("-" * 60)
print(f"{'#':^4} | {'Title':^40} | {'Genre':^20} | {'Avg Rating':^10} | {'PageRank':^10}")
print("-" * 100)

for i, (book_id, pagerank_score) in enumerate(fiction_pagerank_top20):
    title = title_mapping.get(book_id, "Unknown Title")
    title_display = (title[:37] + "...") if len(title) > 40 else title
    
    genre = genre_mapping.get(book_id, "Unknown")
    genre_display = (genre[:17] + "...") if len(genre) > 20 else genre
    
    rating_info = book_rating_mapping.get(book_id, {"avg_rating": 0.0})
    avg_rating = rating_info["avg_rating"]
    
    print(f"{i+1:^4} | {title_display:<40} | {genre_display:<20} | {avg_rating:^10.2f} | {pagerank_score:.6f}")


Top 20 Fiction Books (Topic-Sensitive PageRank, converged in 40 iterations):
------------------------------------------------------------
 #   |                  Title                   |        Genre         | Avg Rating |  PageRank 
----------------------------------------------------------------------------------------------------
 1   | harry potter and the sorcerer's stone    | ['Juvenile Fiction'] |    4.69    | 0.001366
 2   | five people you meet in heaven           | ['Fiction']          |    4.16    | 0.000905
 3   | the catcher in the rye                   | ['Young Adult Fic... |    3.92    | 0.000893
 4   | the hobbit                               | ['Juvenile Fiction'] |    4.68    | 0.000892
 5   | the hobbit or there and back again       | ['Juvenile Fiction'] |    4.68    | 0.000891
 6   | the hobbitt, or there and back again;... | ['Fiction']          |    4.68    | 0.000887
 7   | the hobbit there and back again          | ['Adventure stori... |    4.68    | 0.00088

In [66]:
def pagerank_rdd_topic_sensitive(edges_rdd, nodes_count, genre_mapping, target_genres=None,
                                damping_factor=0.85, max_iter=100, tolerance=1e-6, silent=False):

    # Input validation 
    if not edges_rdd:
        raise ValueError("Edges RDD cannot be empty")
    if nodes_count <= 0:
        raise ValueError(f"Nodes count must be positive, got {nodes_count}")
    if not isinstance(genre_mapping, dict):
        raise ValueError("Genre mapping must be a dictionary")
    if not (0 <= damping_factor <= 1):
        raise ValueError(f"Damping factor must be between 0 and 1, got {damping_factor}")
    if max_iter <= 0:
        raise ValueError(f"Max iterations must be positive, got {max_iter}")
    if tolerance <= 0:
        raise ValueError(f"Tolerance must be positive, got {tolerance}")
    
    if not silent:
        print(f"Starting Topic-Sensitive PageRank RDD with {nodes_count:,} nodes")
    
    if not silent:
        print("Building adjacency list...")
    adjacency_rdd = edges_rdd.groupByKey().mapValues(list).cache()
    if not silent:
        print("Adjacency list built and cached")
    
    if not silent:
        print("Identifying all nodes in the graph...")
    all_nodes = edges_rdd.flatMap(lambda x: [x[0], x[1]]).distinct().collect()
    
    if target_genres is None:
        teleport_prob = (1 - damping_factor) / len(all_nodes)
        teleport_probs = {node: teleport_prob for node in all_nodes}
        if not silent:
            print(f"Using uniform teleportation (original PageRank behavior)")
            print(f"Teleportation probability set: {teleport_prob:.8f}")
    else:
        topic_nodes = [n for n in all_nodes if any(str(g) in str(genre_mapping.get(n, "")) for g in target_genres)]
        
        if not topic_nodes:
            
            teleport_prob = (1 - damping_factor) / len(all_nodes)
            teleport_probs = {node: teleport_prob for node in all_nodes}
            if not silent:
                print(f"Warning: No nodes found for target genres {target_genres}. Using uniform teleportation.")
                print(f"Teleportation probability set: {teleport_prob:.8f}")
        else:
            topic_teleport_prob = (1 - damping_factor) / len(topic_nodes)
            teleport_probs = {n: topic_teleport_prob if n in topic_nodes else 0.0 for n in all_nodes}
            if not silent:
                print(f"Topic-sensitive teleportation: {len(topic_nodes):,} nodes in {target_genres}")
                print(f"Topic teleportation probability: {topic_teleport_prob:.8f}")
    
    ranks = {node: 1.0 / len(all_nodes) for node in all_nodes}
    initial_sum = sum(ranks.values())
    if not silent:
        print(f"Initial ranks set (sum={initial_sum:.8f})")
        print(f"Starting PageRank iterations...")
        print("-" * 60)
    
    for iteration in range(max_iter):
        old_ranks = ranks.copy()
        
        ranks_bc = edges_rdd.context.broadcast(ranks)
        
        contributions = adjacency_rdd.flatMap(
            lambda node_neighbors: [
                (neighbor, damping_factor * ranks_bc.value[node_neighbors[0]] / len(node_neighbors[1]))
                for neighbor in node_neighbors[1]
            ]
        ).reduceByKey(lambda a, b: a + b).collectAsMap()
        
        ranks = {node: teleport_probs[node] + contributions.get(node, 0) for node in all_nodes}
        
        diff = sum((ranks[node] - old_ranks[node])**2 for node in all_nodes)**0.5  # L2 norm
        total_sum = sum(ranks.values())
        
        if not silent:
            print(f"Iteration {iteration + 1:3d}: L2 diff={diff:.6f}, sum={total_sum:.8f}")
        
        ranks_bc.unpersist()
        
        if diff < tolerance:
            if not silent:
                print("-" * 60)
                print(f"Converged after {iteration + 1} iterations")
            break
    
    if not silent:
        if iteration + 1 == max_iter:
            print("-" * 60)
            print(f"Reached maximum iterations ({max_iter}) without full convergence")
        
        print(f"Final rank sum: {sum(ranks.values()):.8f}")
        
        # Final validation
        final_sum = sum(ranks.values())
        if abs(final_sum - 1.0) > 1e-6:
            print(f"Warning: Final sum ({final_sum:.8f}) deviates from 1.0")
        else:
            print(f"Rank sum validation passed")
    
    adjacency_rdd.unpersist()
    
    # sorted ranks and the number of iterations performed
    return sorted(ranks.items(), key=lambda x: -x[1]), (iteration + 1)

In [64]:
ranks_topic_sensitive, iterations = pagerank_rdd_topic_sensitive(edges_rdd_fixed, nodes_count, genre_mapping, target_genres=["Fiction"], damping_factor=0.85, max_iter=100, tolerance=1e-6, silent=False)

Starting Topic-Sensitive PageRank RDD with 47,488 nodes
Building adjacency list...
Adjacency list built and cached
Identifying all nodes in the graph...
Topic-sensitive teleportation: 13,194 nodes in ['Fiction']
Topic teleportation probability: 0.00001137
Initial ranks set (sum=1.00000000)
Starting PageRank iterations...
------------------------------------------------------------
Iteration   1: L2 diff=0.010422, sum=1.00000000
Iteration   2: L2 diff=0.003976, sum=1.00000000
Iteration   3: L2 diff=0.001157, sum=1.00000000
Iteration   4: L2 diff=0.000737, sum=1.00000000
Iteration   5: L2 diff=0.000490, sum=1.00000000
Iteration   6: L2 diff=0.000361, sum=1.00000000
Iteration   7: L2 diff=0.000267, sum=1.00000000
Iteration   8: L2 diff=0.000213, sum=1.00000000
Iteration   9: L2 diff=0.000165, sum=1.00000000
Iteration  10: L2 diff=0.000137, sum=1.00000000
Iteration  11: L2 diff=0.000109, sum=1.00000000
Iteration  12: L2 diff=0.000093, sum=1.00000000
Iteration  13: L2 diff=0.000075, sum=1.0

In [65]:
# Extract top 20 Fiction books from the RDD-based topic-sensitive PageRank
rdd_fiction_top20 = ranks_topic_sensitive[:20]

# Create a formatted table showing the Fiction-focused PageRank results (RDD version)
print(f"\nTop 20 Fiction Books (RDD Topic-Sensitive PageRank, converged in {iterations} iterations):")
print("-" * 60)
print(f"{'#':^4} | {'Title':^40} | {'Genre':^20} | {'Avg Rating':^10} | {'PageRank':^10}")
print("-" * 100)

for i, (book_id, pagerank_score) in enumerate(rdd_fiction_top20):
    title = title_mapping.get(book_id, "Unknown Title")
    title_display = (title[:37] + "...") if len(title) > 40 else title
    
    genre = genre_mapping.get(book_id, "Unknown")
    genre_display = (genre[:17] + "...") if len(genre) > 20 else genre
    
    rating_info = book_rating_mapping.get(book_id, {"avg_rating": 0.0})
    avg_rating = rating_info["avg_rating"]
    
    print(f"{i+1:^4} | {title_display:<40} | {genre_display:<20} | {avg_rating:^10.2f} | {pagerank_score:.6f}")


Top 20 Fiction Books (RDD Topic-Sensitive PageRank, converged in 40 iterations):
------------------------------------------------------------
 #   |                  Title                   |        Genre         | Avg Rating |  PageRank 
----------------------------------------------------------------------------------------------------
 1   | harry potter and the sorcerer's stone    | ['Juvenile Fiction'] |    4.69    | 0.001366
 2   | five people you meet in heaven           | ['Fiction']          |    4.16    | 0.000905
 3   | the catcher in the rye                   | ['Young Adult Fic... |    3.92    | 0.000893
 4   | the hobbit                               | ['Juvenile Fiction'] |    4.68    | 0.000892
 5   | the hobbit or there and back again       | ['Juvenile Fiction'] |    4.68    | 0.000891
 6   | the hobbitt, or there and back again;... | ['Fiction']          |    4.68    | 0.000887
 7   | the hobbit there and back again          | ['Adventure stori... |    4.68    | 0.0

You can use the above functions for the classic pagerank too: <br>

classic_ranks, iterations = pagerank_rdd_topic_sensitive(<br>
&nbsp;&nbsp;&nbsp;&nbsp;    edges_rdd_fixed, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    nodes_count, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    genre_mapping, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    target_genres=None,  # This makes it behave like classic PageRank<br>
&nbsp;&nbsp;&nbsp;&nbsp;    damping_factor=0.85, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    max_iter=100, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    tolerance=1e-6, <br>
&nbsp;&nbsp;&nbsp;&nbsp;    silent=False<br>
)<br>

You can use the above functions for the classic pagerank too: <br>

classic_ranks, iterations = pagerank_python_topic_sensitive(<br>
&nbsp;&nbsp;&nbsp;&nbsp;data_2['pages'], <br>
&nbsp;&nbsp;&nbsp;&nbsp;data_2['links'],<br>
&nbsp;&nbsp;&nbsp;&nbsp;genre_mapping, <br>
&nbsp;&nbsp;&nbsp;&nbsp;target_genres=None,  # This makes it behave like classic PageRank<br>
&nbsp;&nbsp;&nbsp;&nbsp;beta=0.85, <br>
&nbsp;&nbsp;&nbsp;&nbsp;max_iter=100, <br>
&nbsp;&nbsp;&nbsp;&nbsp;tol=1e-6, <br>
&nbsp;&nbsp;&nbsp;&nbsp;silent=False<br>
)<br>