In [1]:
from google.colab import files

In [2]:
!pip -q install kaggle pyspark pyarrow

In [3]:
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{\n  "username": "vadimsokol",\n  "key": "KGAT_1943f2f81f04e39172723dd31c2ac3c0"\n}\n'}

In [4]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json


In [5]:
!kaggle datasets list | head


ref                                                                 title                                                     size  lastUpdated                 downloadCount  voteCount  usabilityRating  
------------------------------------------------------------------  --------------------------------------------------  ----------  --------------------------  -------------  ---------  ---------------  
saidaminsaidaxmadov/chocolate-sales                                 Chocolate Sales                                         468320  2026-01-04 14:23:35.490000          10551        172  1.0              
jayjoshi37/customer-subscription-churn-and-usage-patterns           Customer Subscription Churn and Usage Patterns           34246  2026-01-27 13:53:52.857000            655         23  1.0              
vishardmehta/indian-engineering-college-placement-dataset           Indian Engineering College Placement Dataset            137603  2026-01-24 15:23:40.150000           1622         44

In [None]:
DATASET = "mohamedbakhet/amazon-books-reviews"

!mkdir -p /content/data
!kaggle datasets download -d {DATASET} -p /content/data --unzip

Dataset URL: https://www.kaggle.com/datasets/mohamedbakhet/amazon-books-reviews
License(s): CC0-1.0


In [7]:
!ls -lah /content/data


total 2.9G
drwxr-xr-x 2 root root 4.0K Feb  1 11:32 .
drwxr-xr-x 1 root root 4.0K Feb  1 11:31 ..
-rw-r--r-- 1 root root 173M Feb  1 11:32 books_data.csv
-rw-r--r-- 1 root root 2.7G Feb  1 11:32 Books_rating.csv


In [8]:
DATA_DIR = "/content/data"
RATINGS_CSV = f"{DATA_DIR}/Books_rating.csv"
BOOKS_CSV = f"{DATA_DIR}/books_data.csv"


USE_FULL_DATA = False # just to save runtime
# 5 percent sample,
# TODO: try 10% later
SAMPLE_FRAC = 0.05
SEED = 12345




In [9]:
# Start the SPark session

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("amd-recsys-books")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")


In [10]:
# Load book ratings into Spark:
from pyspark.sql import functions as F

ratings = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(RATINGS_CSV)
)

ratings.printSchema()
ratings.show(5, truncate=False)


root
 |-- Id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- User_id: string (nullable = true)
 |-- profileName: string (nullable = true)
 |-- review/helpfulness: string (nullable = true)
 |-- review/score: string (nullable = true)
 |-- review/time: string (nullable = true)
 |-- review/summary: string (nullable = true)
 |-- review/text: string (nullable = true)

+----------+------------------------------+-----+--------------+--------------------------------------+------------------+------------+-----------+-----------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
# check the columns to understand what's inside

cols = {c.lower(): c for c in ratings.columns}
cols

{'id': 'Id',
 'title': 'Title',
 'price': 'Price',
 'user_id': 'User_id',
 'profilename': 'profileName',
 'review/helpfulness': 'review/helpfulness',
 'review/score': 'review/score',
 'review/time': 'review/time',
 'review/summary': 'review/summary',
 'review/text': 'review/text'}

In [16]:
# Normalize and select core columns
from pyspark.sql import functions as functions

score_string = functions.regexp_extract(functions.col("review/score"), r"([0-9]+(\.[0-9]+)?)", 1)

ratings_0 = (
    ratings
    .select(
        functions.col("User_id").alias("user_id"),
        functions.col("Id").alias("book_id"),
        # safe numeric extraction for odd strings
        functions.when(score_string != "", score_string.cast("double")).otherwise(F.lit(None).cast("double")).alias("rating"),
        functions.col("Title").alias("title"),
    )
    .dropna(subset=["user_id", "book_id", "rating"])
)

ratings_0.select("rating").describe().show()
ratings_0.show(3, truncate=False)


+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|           2433202|
|   mean|2029.6013891366192|
| stddev|1580342.9802411501|
|    min|               0.0|
|    max|        1.295568E9|
+-------+------------------+

+--------------+----------+------+------------------------------+
|user_id       |book_id   |rating|title                         |
+--------------+----------+------+------------------------------+
|AVCGYZL8FQQTD |1882931173|4.0   |Its Only Art If Its Well Hung!|
|A30TK6U7DNS82R|0826414346|5.0   |Dr. Seuss: American Icon      |
|A3UH4UZ4RSVO82|0826414346|5.0   |Dr. Seuss: American Icon      |
+--------------+----------+------+------------------------------+
only showing top 3 rows


In [17]:
# clear bad formed data for ratings, first got 10^8 score

# keep only realistic star ratings
ratings_1 = ratings_0.filter((F.col("rating") >= 1.0) & (F.col("rating") <= 5.0))

ratings_1.select("rating").describe().show()


+-------+-----------------+
|summary|           rating|
+-------+-----------------+
|  count|          2426292|
|   mean|4.222532572336718|
| stddev|1.183760589211074|
|    min|              1.0|
|    max|              5.0|
+-------+-----------------+



In [18]:
ratings.select("review/score").show(5, truncate=False)
ratings.select("review/helpfulness").show(5, truncate=False)


+------------+
|review/score|
+------------+
|4.0         |
|5.0         |
|5.0         |
|4.0         |
|4.0         |
+------------+
only showing top 5 rows
+------------------+
|review/helpfulness|
+------------------+
|7/7               |
|10/10             |
|10/11             |
|7/7               |
|3/3               |
+------------------+
only showing top 5 rows


In [45]:
# Sample and reduce sparcity, for speed
USE_FULL_DATA = False
SAMPLE_FRAC = 0.20
SEED = 12345

ratings = ratings_1
if not USE_FULL_DATA:
    ratings = ratings.sample(False, SAMPLE_FRAC, seed=SEED)

MIN_USER_RATINGS = 5
MIN_BOOK_RATINGS = 5

user_count = ratings.groupBy("user_id").count().withColumnRenamed("count", "user_count")
book_count = ratings.groupBy("book_id").count().withColumnRenamed("count", "book_count")

ratings = (
    ratings.join(user_count, "user_id")
     .join(book_count, "book_id")
     .filter((functions.col("user_count") >= MIN_USER_RATINGS) & (functions.col("book_count") >= MIN_BOOK_RATINGS))
     .select("user_id", "book_id", "rating", "title")
)

print("rows:", ratings.count())
print("users:", ratings.select("user_id").distinct().count())
print("books:", ratings.select("book_id").distinct().count())

ratings.cache()


rows: 85603
users: 9795
books: 13660


DataFrame[user_id: string, book_id: string, rating: double, title: string]

In [46]:
from pyspark.sql import functions as F

ratings.groupBy("book_id").count().orderBy(F.col("count").desc()).show(20)
ratings.groupBy("user_id").count().orderBy(F.col("count").desc()).show(20)


+----------+-----+
|   book_id|count|
+----------+-----+
|0141804459|  178|
|B000GQG7D2|  171|
|B000C1X8JC|  168|
|B000NDSX6C|  168|
|0786135034|  167|
|B000F6H01Q|  165|
|B000ILIJE0|  164|
|B000Q032UY|  164|
|B000GQG5MA|  164|
|B000GDLGSG|  164|
|1844560333|  161|
|0451513967|  159|
|B000NWQXBA|  159|
|0435126075|  158|
|1901768945|  157|
|8188280046|  157|
|B000EVI8O0|  155|
|1566190932|  150|
|B000H9R1Q0|  148|
|B000PC54NG|  145|
+----------+-----+
only showing top 20 rows
+--------------+-----+
|       user_id|count|
+--------------+-----+
|A1D2C0WDCSHUWZ|  568|
|   AFVQZQ8PW0L|  290|
|A14OJS0VWMOSWO|  266|
|A20EEWWSFMZ1PN|  212|
|A1X8VZWTOG8IS6|  211|
| AHD101501WCN1|  184|
|A1K1JW1C5CUSUZ|  179|
|A1EKTLUL24HDG8|  145|
|A1G37DFO8MQW0M|  143|
|A1N1YEMTI9DJ86|  139|
|A3QVAKVRAH657N|  136|
| AHXAPVSHPJ6OJ|  132|
|A2F6N60Z96CAJI|  131|
|A1L43KWWR05PCS|  129|
|A1T17LMQABMBN5|  117|
|A319KYEIAZ3SON|  113|
|A3M174IC0VXOS2|  111|
| AJQ1S39GZBKUG|  109|
|A3LKWMM12AF0PU|  108|
|A28WJUJF6D2U

In [47]:
# Baseline recommender by popularity
# Check it to compare later

MIN_REVIEWS_FOR_POPULARITY = 10

popular = (
    ratings.groupBy("book_id", "title")
     .agg(functions.count("*").alias("n"), functions.avg("rating").alias("average_rating"))
     .filter(functions.col("n") >= MIN_REVIEWS_FOR_POPULARITY)
     .orderBy(functions.col("average_rating").desc(), functions.col("n").desc())
)

popular.show(20, truncate=False)


+----------+-----------------------------------------------------------------------------------------+---+-----------------+
|book_id   |title                                                                                    |n  |average_rating   |
+----------+-----------------------------------------------------------------------------------------+---+-----------------+
|B000OZRZ90|Milton's Paradise Lost                                                                   |12 |5.0              |
|1854596209|Hamlet (The Shakespeare Folios)                                                          |11 |5.0              |
|B000JWHH32|The Jungle Book (Companion Library)                                                      |11 |5.0              |
|B0007G64NO|The Screwtape letters & Screwtape proposes a toast (Time reading program special edition)|10 |5.0              |
|B0007J8XJ4|The varieties of religious experience: A study in human nature (Gifford lectures)        |10 |5.0              |


In [48]:
# Train test split
train_df, test_df = ratings.randomSplit([0.8, 0.2], seed=SEED)

train_df = train_df.cache()
test_df  = test_df.cache()

print("train:", train_df.count(), "test:", test_df.count())


train: 68323 test: 17280


Item-item colaborative filtering

In [49]:
# build user to list of pairs item, rating from train data

# resilient distr dataset
train_rdd = train_df.select("user_id", "book_id", "rating").rdd

user_items = (
    train_rdd
    .map(lambda row: (row["user_id"], (row["book_id"], float(row["rating"]))))
    .groupByKey()
    .mapValues(list)
)


In [50]:
# Generate item pairs per user and get stats aggregation

import itertools
import math
# safety cap for many reviews
MAX_ITEMS_PER_USER = 50

def make_pairs(items):
    # items are list of (book_id, rating)
    if len(items) > MAX_ITEMS_PER_USER:
        items = items[:MAX_ITEMS_PER_USER]  # deterministic cap, todo: improve
    out = []
    for (i, ri), (j, rj) in itertools.combinations(items, 2):
        if i == j:
            continue
        a, b = (i, j) if i < j else (j, i)
        ra, rb = (ri, rj) if i < j else (rj, ri)
        out.append(((a, b), (ra*rb, ra*ra, rb*rb, 1)))
    return out

pair_stats = (
    user_items
    .flatMap(lambda x: make_pairs(x[1]))
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3]))
)


In [51]:
# Compute cosine similarity

MIN_COMMON_USERS = 2

similarities = (
    pair_stats
    .filter(lambda kv: kv[1][3] >= MIN_COMMON_USERS)
    .mapValues(lambda v: (
        v[0] / (math.sqrt(v[1]) * math.sqrt(v[2]) + 1e-12),  # cosine similarity
        v[3]  # common users
    ))
)

print("number of item pairs with common users >= min common users:", similarities.count())
print("sample similarity:", similarities.take(5))


number of item pairs with common users >= min common users: 22493
sample similarity: [(('1420926810', '1570020981'), (0.9374252720097401, 2)), (('B00069X44Y', 'B00086KNY4'), (0.9999999999999798, 2)), (('0792717848', 'B00005QTHG'), (0.8531145054348057, 3)), (('0520050894', 'B000GTEU8I'), (0.9999999999999908, 5)), (('B000JQXNSQ', 'B000NNOTXI'), (0.890870806374732, 3))]


In [52]:
# KNN per item, top

K = 30

neighbors = similarities.flatMap(lambda kv: [
    (kv[0][0], (kv[0][1], kv[1][0])),  # i -> (j, sim)
    (kv[0][1], (kv[0][0], kv[1][0]))   # j -> (i, sim)
])

def topk(xs, k=K):
    xs = sorted(xs, key=lambda t: t[1], reverse=True)
    return xs[:k]

item_neighbors = neighbors.groupByKey().mapValues(lambda xs: topk(list(xs), K))

# collect is ok at this sample size
item_neighbors_map = dict(item_neighbors.collect())
print("items with neighbor lists:", len(item_neighbors_map))


items with neighbor lists: 4647


In [53]:
# Broadcast!!

broadcast_neighbours = spark.sparkContext.broadcast(item_neighbors_map)

In [67]:
train_ui = (
    train_df.groupBy("user_id", "book_id")
            .agg(functions.avg("rating").alias("rating"))
)

# Broadcast user history from train
user_history_map = dict(
    train_ui.rdd
    .map(lambda r: (r["user_id"], (r["book_id"], float(r["rating"]))))
    .groupByKey()
    .mapValues(list)
    .collect()
)

broadcast_history = spark.sparkContext.broadcast(user_history_map)
print("users in history:", len(user_history_map))


users in history: 9725


In [120]:
import math

def predict_one(user_id, target_item):
    history = broadcast_history.value.get(user_id, [])
    neighbours = broadcast_neighbours.value.get(target_item, [])
    if not history or not neighbours:
        return None

    history_map = dict(history)

    num = 0.0
    den = 0.0
    for j, sim in neighbours:
        if j in history_map:
            num += sim * history_map[j]
            den += abs(sim)

    if den < 1e-9:
        return None

    prediction = num / den
    return max(1.0, min(5.0, prediction))

predicted_test_true = (
    test_df.select("user_id","book_id","rating").rdd
    .map(lambda r: (predict_one(r["user_id"], r["book_id"]), float(r["rating"])))
    .filter(lambda x: x[0] is not None)
)

mse = predicted_test_true.map(lambda x: (x[0] - x[1])**2).mean()
rmse = math.sqrt(mse)

predicted_count = predicted_test_true.count()
test_count = test_df.count()
coverage = predicted_count / test_count

print("RMSE:", rmse)
print("Coverage:", coverage)
print("Predicted test points:", predicted_count, "/", test_count)


RMSE: 0.4215375035604698
Coverage: 0.37471064814814814
Predicted test points: 6475 / 17280


COverage improved significantly.
Now Baseline RMSE

In [121]:
import math
from pyspark.sql import functions as functions

global_mean = train_df.agg(functions.avg("rating")).first()[0]
baseline_mse = test_df.select("rating").rdd.map(lambda r: (float(r[0]) - global_mean)**2).mean()
baseline_rmse = math.sqrt(baseline_mse)

print("Global mean:", global_mean)
print("Baseline RMSE (global mean):", baseline_rmse)


Global mean: 4.269396835619045
Baseline RMSE (global mean): 1.037404870258155


In [122]:
# Top N recommendations for a user
book_titles = dict(
    ratings.select("book_id", "title")
           .dropna(subset=["title"])
           .dropDuplicates(["book_id"])
           .rdd.map(lambda r: (r["book_id"], r["title"]))
           .collect()
)

print("book titles len:", len(book_titles))


book titles len: 13659


In [123]:
def recommend_for_user(user_id, N=10):
    history = broadcast_history.value.get(user_id, [])
    # cold start
    if not history:
        return []

    seen = set(i for i, _ in history)
    # item -> (num, den, contribs)
    scores = {}

    for i, r_ui in history:
        for j, sim in broadcast_neighbours.value.get(i, []):
            if j in seen:
                continue
            num, den, c = scores.get(j, (0.0, 0.0, 0))
            num += sim * r_ui
            den += abs(sim)
            c += 1
            scores[j] = (num, den, c)

    predictions = []
    for j, (num, den, c) in scores.items():
        if den > 1e-9 and c >= 2:  # small filter makes output nicer
            prediction = max(1.0, min(5.0, num / den))
            predictions.append((j, prediction, c))

    predictions.sort(key=lambda x: (x[1], x[2]), reverse=True)
    return predictions[:N]


In [124]:
# Let's pick one of the users and ptint top rated

def top_rated_unique(user_id, n=10):
    history = broadcast_history.value.get(user_id, [])
    # sort by rating, then unique by book_id
    history = sorted(history, key=lambda x: x[1], reverse=True)
    seen = set()
    out = []
    for bid, r in history:
        if bid in seen:
            continue
        seen.add(bid)
        out.append((bid, r))
        if len(out) >= n:
            break
    return out

USER = "A1D2C0WDCSHUWZ"
for bid, r in top_rated_unique(USER, 10):
    print(f"{r:.1f}  {bid}  -  {book_titles.get(bid, '<no title>')}")


5.0  0020554303  -  House of Mirth
5.0  0060765461  -  The Lion, the Witch and the Wardrobe Movie Tie-in Edition (The Chronicles of Narnia)
5.0  0075543893  -  Little Women
5.0  0140351310  -  Jane Eyre: Complete and Unabridged (Puffin Classics)
5.0  0140860428  -  Jane Eyre (Penguin Classics)
5.0  0140860436  -  The Age of Innocence (Classic, 20th-Century, Audio)
5.0  0141804459  -  Pride & Prejudice (Penguin Classics)
5.0  0141804564  -  Persuasion (Penguin Audio Classics)
5.0  0141804572  -  Sense & Sensibility Cds (Penguin Classics)
5.0  0192503561  -  Persuasion (World's Classics)


In [125]:
# SO, he is hitting 5/5 often
# Let's check what user liked


recommendations = recommend_for_user(USER, 10)
for bid, prediction, c in recommendations:
    print(f"{prediction:.3f} (from {c} neighbour)  {bid}  -  {book_titles.get(bid, '<no title>')}")


5.000 (from 23 neighbour)  B000G1TD92  -  Persuasion
5.000 (from 18 neighbour)  1576463451  -  Persuasion
5.000 (from 15 neighbour)  0748608370  -  Treasure Island
5.000 (from 13 neighbour)  1578400317  -  Treasure Island (Classic Illustrated)
5.000 (from 13 neighbour)  1556909330  -  Dune
5.000 (from 12 neighbour)  1559350334  -  The Lord of the Rings: The Fellowship of the Ring (BBC Audio Collection)
5.000 (from 11 neighbour)  0808510258  -  Harper Lee's To Kill a Mockingbird (Barron's Book Notes)
5.000 (from 11 neighbour)  0192817728  -  The Secret Garden (Worlds Classics)
5.000 (from 11 neighbour)  B000N28H2I  -  The Two Towers
5.000 (from 9 neighbour)  0681994940  -  Great Expectations


In [126]:
# Let's check abother user

USER = "A14OJS0VWMOSWO"
for bid, r in top_rated_unique(USER, 40):
    print(f"{r:.1f}  {bid}  -  {book_titles.get(bid, '<no title>')}")

recommendations = recommend_for_user(USER, 40)
for bid, prediction, c in recommendations:
    print(f"{prediction:.3f} (from {c} neighbour)  {bid}  -  {book_titles.get(bid, '<no title>')}")



5.0  0066212448  -  Wittgenstein's Poker: The Story of a Ten-Minute Argument Between Two Great Philosophers
5.0  0140298401  -  Forever Liesl: A Memoir of The Sound of Music
5.0  0156949601  -  The Waves
5.0  0192817728  -  The Secret Garden (Worlds Classics)
5.0  0192818201  -  THE WAVES (WORLD'S CLASSICS)
5.0  0201722089  -  The Photoshop 6 WOW! Book
5.0  0307165205  -  The Children's Bible
5.0  0307280721  -  Eldest (Inheritance, Book 2)
5.0  0316785644  -  The Fig Eater
5.0  0333908937  -  ABC Murders
5.0  0340283947  -  Alice in Wonderland
5.0  0340283955  -  Alice in Wonderland
5.0  0345283937  -  Safire's Political Dictionary
5.0  0345325699  -  Working
5.0  0374336857  -  Jitterbug Jam (New York Times Best Illustrated Books (Awards))
5.0  0375414126  -  John James Audubon: The Making of an American
5.0  0375831908  -  Little Golden Book Collection: Farm Tales (Little Golden Book Treasury)
5.0  0375831916  -  Traction Man Is Here! (Boston Globe-Horn Book Awards (Awards))
5.0  03

In [133]:
# recommend for user and donâ€™t clip at 5.0 for display
def recommend_for_user(user_id, N=10):
    history = broadcast_history.value.get(user_id, [])
    if not history:
        return []

    seen = set(i for i, _ in history)
    scores = {}

    for i, r_ui in history:
        for j, sim in broadcast_neighbours.value.get(i, []):
            if j in seen:
                continue
            num, den, c = scores.get(j, (0.0, 0.0, 0))
            num += sim * r_ui
            den += abs(sim)
            c += 1
            scores[j] = (num, den, c)

    predictions = []
    for j, (num, den, c) in scores.items():
        if den > 1e-9 and c >= 2:
            raw = num / den
            clipped = max(1.0, min(5.0, raw))
            predictions.append((j, raw, clipped, c))

    # sort by clipped then contribs
    predictions.sort(key=lambda x: (x[2], x[3]), reverse=True)
    return predictions[:N]

recommendations = recommend_for_user(USER, 200)
for bid, raw, clipped, c in recommendations:
    print(f"{clipped:.3f} (raw {raw:.3f}, from {c} neigh)  {bid}  -  {book_titles.get(bid, '<no title>')}")


5.000 (raw 5.000, from 5 neigh)  0460872702  -  Great Gatsby (Everyman)
5.000 (raw 5.000, from 4 neigh)  140505347X  -  Alice's Adventures in Wonderland
5.000 (raw 5.000, from 3 neigh)  B0006AGHH4  -  Uncle Tom's cabin;: Or, Life among the lowly
5.000 (raw 5.000, from 3 neigh)  B0006AQ6MA  -  Ulysses,
5.000 (raw 5.000, from 3 neigh)  0140351310  -  Jane Eyre: Complete and Unabridged (Puffin Classics)
5.000 (raw 5.000, from 3 neigh)  0517124203  -  Alice's Adventures in Wonderland
5.000 (raw 5.000, from 3 neigh)  0747553688  -  Alice's Adventures in Wonderland
5.000 (raw 5.000, from 3 neigh)  0904724719  -  Alice in Wonderland (Tell tales)
5.000 (raw 5.000, from 3 neigh)  0786261080  -  Alice's Adventures in Wonderland
5.000 (raw 5.000, from 3 neigh)  068983375X  -  Alice's Adventures in Wonderland (Aladdin Classics)
5.000 (raw 5.000, from 3 neigh)  0848809904  -  Lost World
5.000 (raw 5.000, from 3 neigh)  B000MUFLHU  -  The Wonderful Wizard Of Oz
5.000 (raw 5.000, from 3 neigh)  B000H

In [134]:
# Mean centered item item
from pyspark.sql import functions as F

user_mean_map = dict(
    train_df.groupBy("user_id")
            .agg(F.avg("rating").alias("mu"))
            .rdd.map(lambda r: (r["user_id"], float(r["mu"])))
            .collect()
)
bum = spark.sparkContext.broadcast(user_mean_map)

print("users with mean:", len(user_mean_map))


users with mean: 9725


In [135]:
# Mean centered predictor

def predict_one_centered(user_id, target_item):
    history = broadcast_history.value.get(user_id, [])
    neighbours = broadcast_neighbours.value.get(target_item, [])
    if not history or not neighbours:
        return None

    mu = bum.value.get(user_id, 4.0)
    history_map = dict(history)

    num = 0.0
    den = 0.0
    contribution = 0

    for j, sim in neighbours:
        if j in history_map:
            num += sim * (history_map[j] - mu)   # deviation from user mean
            den += abs(sim)
            contribution += 1

    if den < 1e-9:
        return None

    prediction = mu + num / den
    prediction = max(1.0, min(5.0, prediction))
    return prediction, contribution


In [136]:
# recompute RMSE and coverage with mean centered predictor
import math

predicted_true = (
    test_df.select("user_id","book_id","rating").rdd
    .map(lambda r: (predict_one_centered(r["user_id"], r["book_id"]), float(r["rating"])))
    .filter(lambda x: x[0] is not None)
    .map(lambda x: (x[0][0], x[1]))  # keep only pred, true
)

mse = predicted_true.map(lambda x: (x[0] - x[1])**2).mean()
rmse_centered = math.sqrt(mse)

predicted_count = predicted_true.count()
test_count = test_df.count()
coverage_centered = predicted_count / test_count

print("Centered RMSE:", rmse_centered)
print("Centered Coverage:", coverage_centered)
print("Predicted:", predicted_count, "/", test_count)


Centered RMSE: 0.4215375035604698
Centered Coverage: 0.37471064814814814
Predicted: 6475 / 17280


In [137]:
# Mean centered top N recommendations
def recommend_for_user_centered(user_id, N=10, min_contrib=2):
    history = broadcast_history.value.get(user_id, [])
    if not history:
        return []

    mu = bum.value.get(user_id, 4.0)
    seen = set(i for i, _ in history)
    history_map = dict(history)

    scores = {}  # item -> (num, den, contrib)
    for i, r_ui in history:
        for j, sim in broadcast_neighbours.value.get(i, []):
            if j in seen:
                continue
            num, den, c = scores.get(j, (0.0, 0.0, 0))
            num += sim * (history_map[i] - mu)   # use deviation
            den += abs(sim)
            c += 1
            scores[j] = (num, den, c)

    predictions = []
    for j, (num, den, c) in scores.items():
        if den > 1e-9 and c >= min_contrib:
            prediction = mu + num / den
            prediction = max(1.0, min(5.0, prediction))
            predictions.append((j, prediction, c))

    predictions.sort(key=lambda x: (x[1], x[2]), reverse=True)
    return predictions[:N]


In [138]:
USER = "A3M174IC0VXOS2"
recommendations = recommend_for_user_centered(USER, 100)

for bid, prediction, c in recommendations:
    print(f"{prediction:.3f} (from {c} neigh)  {bid}  -  {book_titles.get(bid, '<no title>')}")


5.000 (from 3 neigh)  0736605010  -  Wuthering Heights
5.000 (from 3 neigh)  B000P4Q3JS  -  Wuthering Heights (The Franklin Library)
5.000 (from 3 neigh)  0435126083  -  Wuthering Heights (New Windmill)
5.000 (from 3 neigh)  1590862899  -  The Scarlet Letter
5.000 (from 3 neigh)  B000F6H01Q  -  Pride and Prejudice
5.000 (from 3 neigh)  157646346X  -  Persuasion
5.000 (from 3 neigh)  B000NPAT6W  -  Narrative of the Life of Frederick Douglass, An American Slave. Written by Himself
5.000 (from 3 neigh)  B000L3AP9C  -  A Tree Grows in Brooklyn
5.000 (from 3 neigh)  B0006BV6RY  -  Wuthering Heights (College classics in English)
5.000 (from 3 neigh)  B0006DXUU8  -  A Christmas carol: In prose
5.000 (from 3 neigh)  0742623157  -  A Christmas Carol, in Prose: Being a Ghost Story of Christmas (Collected Works of Charles Dickens)
5.000 (from 3 neigh)  0721417299  -  Christmas Carol (Ladybird Classics)
5.000 (from 3 neigh)  B000056MLJ  -  A Christmas Carol [One Voice Recordings Edition]
5.000 (fr

In [139]:
def explain_recommendation(user_id, target_item, top_m=3):
    history = broadcast_history.value.get(user_id, [])
    mu = bum.value.get(user_id, 4.0)
    history_map = dict(history)

    reasons = []
    for j, sim in broadcast_neighbours.value.get(target_item, []):
        if j in history_map:
            reasons.append((j, sim, history_map[j], sim * (history_map[j] - mu)))
    reasons.sort(key=lambda x: abs(x[3]), reverse=True)
    return reasons[:top_m]

USER = "A3M174IC0VXOS2"
recommendations = recommend_for_user_centered(USER, 10)

for bid, prediction, c in recommendations:
    print(f"\n{prediction:.3f} (from {c} neigh)  {book_titles.get(bid, bid)}")
    for j, sim, rj, contrib in explain_recommendation(USER, bid, top_m=3):
        print(f"  because: {book_titles.get(j, j)} | sim={sim:.3f}, your_rating={rj:.1f}")



5.000 (from 3 neigh)  Wuthering Heights
  because: Wuthering Heights | sim=1.000, your_rating=5.0
  because: Wuthering Heights (Riverside editions) | sim=1.000, your_rating=5.0

5.000 (from 3 neigh)  Wuthering Heights (The Franklin Library)
  because: Wuthering Heights (Riverside editions) | sim=1.000, your_rating=5.0
  because: The book of three | sim=1.000, your_rating=5.0

5.000 (from 3 neigh)  Wuthering Heights (New Windmill)
  because: Wuthering Heights | sim=1.000, your_rating=5.0
  because: Wuthering Heights (Riverside editions) | sim=1.000, your_rating=5.0

5.000 (from 3 neigh)  The Scarlet Letter
  because: Wuthering Heights (Riverside editions) | sim=1.000, your_rating=5.0
  because: Wuthering Heights | sim=1.000, your_rating=5.0

5.000 (from 3 neigh)  Pride and Prejudice
  because: Wuthering Heights (Riverside editions) | sim=1.000, your_rating=5.0
  because: The book of three | sim=1.000, your_rating=5.0

5.000 (from 3 neigh)  Persuasion
  because: My Life in France | sim=

In [140]:
from pyspark.sql import functions as F

train_df.groupBy("rating").count().orderBy("rating").show()


+------+-----+
|rating|count|
+------+-----+
|   1.0| 2195|
|   2.0| 2997|
|   3.0| 7411|
|   4.0|17324|
|   5.0|38396|
+------+-----+



In [141]:
USER = "A3M174IC0VXOS2"
print("user mean:", bum.value.get(USER))


user mean: 4.736263736263736


In [142]:
def recommend_for_user_ranked(user_id, N=10, min_contrib=2, support_bonus=0.02):
    hist = broadcast_history.value.get(user_id, [])
    if not hist:
        return []

    mu = bum.value.get(user_id, 4.0)
    seen = set(i for i, _ in hist)
    hist_map = dict(hist)

    scores = {}  # item -> (num, den, contrib)
    for i, r_ui in hist:
        for j, sim in broadcast_neighbours.value.get(i, []):
            if j in seen:
                continue
            num, den, c = scores.get(j, (0.0, 0.0, 0))
            num += sim * (hist_map[i] - mu)
            den += abs(sim)
            c += 1
            scores[j] = (num, den, c)

    out = []
    for j, (num, den, c) in scores.items():
        if den > 1e-9 and c >= min_contrib:
            pred = mu + num / den
            pred = max(1.0, min(5.0, pred))
            rank_score = pred + support_bonus * c   # tie-break
            out.append((j, pred, c, rank_score))

    out.sort(key=lambda x: x[3], reverse=True)
    return out[:N]


In [143]:
USER = "A3M174IC0VXOS2"
recommendations = recommend_for_user_ranked(USER, N=15)

for bid, prediction, c, score in recommendations:
    print(f"score={score:.3f} pred={prediction:.3f} (c={c})  {book_titles.get(bid, bid)}")


score=5.060 pred=5.000 (c=3)  Wuthering Heights
score=5.060 pred=5.000 (c=3)  Wuthering Heights (The Franklin Library)
score=5.060 pred=5.000 (c=3)  Wuthering Heights (New Windmill)
score=5.060 pred=5.000 (c=3)  The Scarlet Letter
score=5.060 pred=5.000 (c=3)  Pride and Prejudice
score=5.060 pred=5.000 (c=3)  Persuasion
score=5.060 pred=5.000 (c=3)  Narrative of the Life of Frederick Douglass, An American Slave. Written by Himself
score=5.060 pred=5.000 (c=3)  A Tree Grows in Brooklyn
score=5.060 pred=5.000 (c=3)  Wuthering Heights (College classics in English)
score=5.060 pred=5.000 (c=3)  A Christmas carol: In prose
score=5.060 pred=5.000 (c=3)  A Christmas Carol, in Prose: Being a Ghost Story of Christmas (Collected Works of Charles Dickens)
score=5.060 pred=5.000 (c=3)  Christmas Carol (Ladybird Classics)
score=5.060 pred=5.000 (c=3)  A Christmas Carol [One Voice Recordings Edition]
score=5.060 pred=5.000 (c=3)  A Christmas Carol (Enriched Classics (Pocket))
score=5.060 pred=5.000 