# DATA 612 FINAL PROJECT
Amazon Product Recommender Model Using Reviews

* Farhod Ibragimov
* Gillian McGovern

## Objective

Create an offline Amazon product (specifically Amazon appliances) recommender model using user ratings and reviews.

## Data Sources

Source: https://amazon-reviews-2023.github.io/

This is a large-scale Amazon Reviews dataset, collected in 2023 by McAuley Lab, and it includes rich features such as:

*   User Reviews (ratings, text, helpfulness votes, etc.);
*   Item Metadata (descriptions, price, raw image, etc.);
*   Links (user-item / bought together graphs).


User review structure can be found [here](https://amazon-reviews-2023.github.io/#for-user-reviews) and item metadata structure can be found [here](https://amazon-reviews-2023.github.io/#for-item-metadata).

We will be specifically looking at the Appliances category of products, which includes:

* 1.8M Users
* 94.3K Appliances
* 2.1M Ratings/Reviews

The original data is in JSON format.

## Read in the Data

In [26]:
# Import packages and declare global variables

import os
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import floor, round, monotonically_increasing_id, col
from pyspark.sql import SparkSession
import pyarrow.parquet as pq
import s3fs


# CONFIG
REVIEW_PATH = r"Appliances.jsonl"
META_PATH   = r"meta_Appliances.jsonl"
OUT_DIR     = r"output"
CHUNK_SIZE  = 200_000   # tune based on your machine's RAM
SEED        = 42
TRAIN_PARQUET_PATH = r"output/train.parquet"
TEST_PARQUET_PATH = r"output/test.parquet"
FULL_REVIEW_PATH = r"output/full_review.parquet"
EMB_OUT_PATH       = r"output/embeddings/item_embeddings.parquet"
SAMPLE_PER    = 5
BATCH_SIZE    = 64

# S3 variables
BUCKET = os.getenv('S3_BUCKET', 'farhodibr')
PREFIX = "notebook-data"
S3_BASE = f"s3://{BUCKET}/{PREFIX}"

os.makedirs(OUT_DIR, exist_ok=True)
fs = s3fs.S3FileSystem()

In [3]:
# Write test, train, and valid parquet files locally within folder
# Uncomment if files are needed

#
# os.makedirs(OUT_DIR, exist_ok=True)
#
# #LOAD METADATA
# meta_pd = pd.read_json(
#     META_PATH,
#     lines=True
# )[["parent_asin", "average_rating", "rating_number"]]
#
# writers = {"train": None, "valid": None, "test": None}
#
# # STREAM, FILTER, SPLIT, AND WRITE
# for chunk in pd.read_json(
#     REVIEW_PATH,
#     lines=True,
#     chunksize=CHUNK_SIZE
# ):
#     # 1) keep needed cols + timestamp
#     chunk = chunk[["user_id", "parent_asin", "rating", "timestamp", "text"]]
#
#     # 2) filter to years 2021–2023
#     dt = pd.to_datetime(chunk["timestamp"], unit="ms")
#     mask_year = dt.dt.year.between(2021, 2023)
#     chunk = chunk.loc[mask_year]
#     if chunk.empty:
#         continue
#
#     # 3) assign random float for splitting
#     rng = np.random.RandomState(SEED)
#     chunk["_rand"] = rng.rand(len(chunk))
#
#     # 4) merge metadata
#     chunk = chunk.merge(meta_pd, on="parent_asin", how="left")
#
#     # 5) define split masks
#     masks = {
#         "train": chunk["_rand"] <  0.8,
#         "valid": (chunk["_rand"] >= 0.8) & (chunk["_rand"] < 0.9),
#         "test":  chunk["_rand"] >= 0.9
#     }
#
#     # 6) write each split to its Parquet
#     for split, m in masks.items():
#         sub = chunk.loc[m, [
#             "user_id",
#             "parent_asin",
#             "rating",
#             "text",
#             "average_rating",
#             "rating_number"
#         ]]
#         if sub.empty:
#             continue
#         tbl = pa.Table.from_pandas(sub, preserve_index=False)
#         path = os.path.join(OUT_DIR, f"{split}.parquet")
#         if writers[split] is None:
#             writers[split] = pq.ParquetWriter(path, schema=tbl.schema)
#         writers[split].write_table(tbl)
#
# # close Parquet writers
# for w in writers.values():
#     if w:
#         w.close()
#
# print("Finished writing filtered splits to:", OUT_DIR)
# print("  •", os.path.join(OUT_DIR, "train.parquet"))
# print("  •", os.path.join(OUT_DIR, "valid.parquet"))
# print("  •", os.path.join(OUT_DIR, "test.parquet"))


In [4]:
# for split in ("train", "valid", "test"):
#     path = rf"output/{split}.parquet"
#     pf = pq.ParquetFile(path)
#     print(f"{split.capitalize()} split: {pf.metadata.num_rows} reviews")


Train split: 616463 reviews
Valid split: 77195 reviews
Test split: 76002 reviews


In [21]:
spark = SparkSession.builder \
    .appName("CheckRatingRange") \
    .config("spark.driver.memory","4g") \
    .getOrCreate()

# Load just the rating column from the train split
# train = spark.read.parquet(
#     r"output/train.parquet"
# )

# Reading from s3
train_pd = pd.read_parquet(f"{S3_BASE}/{TRAIN_PARQUET_PATH}")
train = spark.createDataFrame(train_pd)

# 1) Preview the data
train.show(5)

# 2) min & max
train.selectExpr("min(rating) AS min_rating", "max(rating) AS max_rating") \
     .show()

# 3) basic summary (mean, stddev, etc.)
train.describe("rating").show()

# 4) breakdown by rating value
train.groupBy("rating").count().orderBy("rating").show(5, truncate=False)

spark.stop()


25/07/16 16:59:29 WARN TaskSetManager: Stage 84 contains a task of very large size (19238 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+-----------+------+--------------------+--------------+-------------+
|             user_id|parent_asin|rating|                text|average_rating|rating_number|
+--------------------+-----------+------+--------------------+--------------+-------------+
|AHWWLSPCJMALVHDDV...| B07DD37QPZ|     5|Little on the thi...|           4.4|         3186|
|AEUH4EH6XHROLT7UZ...| B099ZKQJHK|     5|After buying this...|           4.1|          506|
|AHCV2CNCOCG6WECDR...| B001TH7H0O|     2|Not the best quality|           4.3|        11035|
|AFUOYIZBU3MTBOLYK...| B085C6C7WH|     2|The company respo...|           3.8|          223|
|AHPUT3ITXCHQJO7OM...| B09CBF2XCF|     4|Love little kitch...|           4.5|        15159|
+--------------------+-----------+------+--------------------+--------------+-------------+
only showing top 5 rows


25/07/16 16:59:31 WARN TaskSetManager: Stage 85 contains a task of very large size (19238 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----------+----------+
|min_rating|max_rating|
+----------+----------+
|         1|         5|
+----------+----------+



25/07/16 16:59:34 WARN TaskSetManager: Stage 88 contains a task of very large size (19238 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------+-----------------+
|summary|           rating|
+-------+-----------------+
|  count|           616463|
|   mean|4.079331606276451|
| stddev|1.495287136111409|
|    min|                1|
|    max|                5|
+-------+-----------------+



25/07/16 16:59:36 WARN TaskSetManager: Stage 91 contains a task of very large size (19238 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------+------+
|rating|count |
+------+------+
|1     |92213 |
|2     |26887 |
|3     |32717 |
|4     |52611 |
|5     |412035|
+------+------+



## BERT Content Based Recommender Model

### Create BERT Embeddings

In [6]:
# import os
# import pandas as pd
# import torch
# from sentence_transformers import SentenceTransformer
# from pyspark.sql import SparkSession
# import pyarrow as pa
# import pyarrow.parquet as pq
#
# DEVICE        = "cuda" if torch.cuda.is_available() else "cpu"
# print(f"Using device: {DEVICE}")
#
# os.makedirs(os.path.dirname(EMB_OUT), exist_ok=True)
#
# spark = SparkSession.builder \
#     .appName("ItemBERTEmbeddings") \
#     .config("spark.driver.memory", "16g") \
#     .getOrCreate()
#
# df = spark.read.parquet(TRAIN_PARQUET_PATH).select("parent_asin", "text")
# pdf = df.toPandas()
# spark.stop()
#
# model = SentenceTransformer("all-MiniLM-L6-v2", device=DEVICE)
#
# writer = None
# schema = None
#
# for pid, group in pdf.groupby("parent_asin", sort=False):
#     texts = group["text"].sample(
#         n=min(len(group), SAMPLE_PER),
#         random_state=42
#     ).tolist()
#
#     embs = model.encode(
#         texts,
#         batch_size=BATCH_SIZE,
#         show_progress_bar=False,
#         convert_to_numpy=True
#     )
#     mean_emb = embs.mean(axis=0)
#
#     data = {"parent_asin": [pid]}
#     for i, v in enumerate(mean_emb):
#         data[f"emb_{i}"] = [float(v)]
#     table = pa.Table.from_pydict(data)
#
#     if writer is None:
#         schema = table.schema
#         writer = pq.ParquetWriter(EMB_OUT, schema=schema)
#
#
#     writer.write_table(table)
#
#
# if writer:
#     writer.close()
#
# print("Wrote item embeddings to:", EMB_OUT)


### Create Content Based Model Using BERT Embeddings

In [7]:
# import pandas as pd
# import numpy as np
# from sklearn.neighbors import NearestNeighbors
#
# # ─── CONFIG ──────────────────────────────────────────────────────────────────
# TOP_K      = 5
#
# # ─── 1) Load embeddings and metadata ─────────────────────────────────────────
# df_emb = pd.read_parquet(EMB_PATH)
# df_meta = pd.read_json(META_PATH, lines=True)[["parent_asin", "title"]]
# df_meta = df_meta.rename(columns={"parent_asin": "item_id", "title": "product_title"})
#
# # ─── 2) Merge to get titles alongside embeddings ─────────────────────────────
# df = df_emb.rename(columns={"parent_asin": "item_id"}).merge(df_meta, on="item_id", how="left")
#
# # ─── 3) Fit Nearest Neighbors on embedding vectors ────────────────────────────
# X = df.filter(regex="^emb_").values
# item_ids = df["item_id"].values
# titles   = df["product_title"].values
#
# nn = NearestNeighbors(n_neighbors=TOP_K+1, metric="cosine")
# nn.fit(X)
#
# # ─── 4) Recommendation function returning titles ─────────────────────────────
# def recommend_titles(item_id: str, top_k: int = TOP_K):
#     if item_id not in item_ids:
#         raise ValueError(f"Item ID {item_id} not found.")
#     idx = np.where(item_ids == item_id)[0][0]
#     distances, indices = nn.kneighbors([X[idx]], n_neighbors=top_k+1)
#     rec_idxs = indices[0][1:]
#     return titles[rec_idxs].tolist()
#
# # ─── 5) Show for a sample item ───────────────────────────────────────────────
# sample_id = item_ids[:5]
# print("Sample item:")
# print(" - ID:   ", sample_id)
# print(" - Title:", titles[0])
# print("\nTop 5 similar items by title:")
# for rank, pt in enumerate(recommend_titles(sample_id), start=1):
#     print(f"{rank}. {pt}")


## Collaborative Filtering Spark Model (ALS)

In [8]:
# # Can keep this commented out bc we already ran this
# # Parquets should be present in GitHub repo

# review_json_chunks = pd.read_json(REVIEW_PATH, lines=True, chunksize=CHUNK_SIZE)
#
# writer = None
# schema = None
# # Iterate through the chunks and process each DataFrame
# all_dfs = []
# for i, chunk_df in enumerate(review_json_chunks):
#     print(f"Processing Chunk {i+1}, Shape: {chunk_df.shape}")
#     chunk_df = chunk_df[["user_id", "parent_asin", "rating"]]
#
#     table = pa.Table.from_pandas(chunk_df, preserve_index=False)
#
#     if writer is None:
#         schema = table.schema
#         writer = pq.ParquetWriter(r"output/full_review.parquet", schema=schema)
#
#
#     writer.write_table(table)
#
#
# if writer:
#     writer.close()
#
# print("Completed writing review parquet file")

In [27]:
spark = SparkSession.builder \
    .appName("CheckRatingRange") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Load the training and test data
# als_full_df = spark.read.parquet(r"output/full_review.parquet")

# read parquets from s3
als_full_pd = pd.read_parquet(f"{S3_BASE}/{FULL_REVIEW_PATH}")
als_full_df = spark.createDataFrame(als_full_pd)
als_full_df_cached = als_full_df.cache()
del als_full_pd

In [28]:
# ALS data preparation - ALS good for sparse data

users = als_full_df_cached.select("user_id").distinct()
# We want to prevent the same data/id to be used in different partitions, so just force use 1 partition for this
users = users.coalesce(1)
users = users.withColumn("userIntId", monotonically_increasing_id()).persist() # Use persist to keep these values the same

products = als_full_df_cached.select("parent_asin").distinct()
products = products.coalesce(1)
products = products.withColumn("productIntId", monotonically_increasing_id()).persist()

als_df_int_ids = als_full_df_cached.join(users, "user_id", "left").join(products, "parent_asin", "left")
display(als_df_int_ids.show(5))

25/07/16 17:07:07 WARN TaskSetManager: Stage 0 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
25/07/16 17:07:12 WARN TaskSetManager: Stage 1 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
25/07/16 17:07:13 WARN TaskSetManager: Stage 2 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
25/07/16 17:07:16 WARN TaskSetManager: Stage 5 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
25/07/16 17:07:18 WARN TaskSetManager: Stage 8 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
[Stage 16:>                                                         (0 + 1) / 1]

+-----------+--------------------+------+---------+------------+
|parent_asin|             user_id|rating|userIntId|productIntId|
+-----------+--------------------+------+---------+------------+
| B08D6KFGW7|AEUJN55YZH6HCUFUP...|     5|     5187|       60769|
| B08R896G15|AF4A4E4B53V37NKW3...|     1|   153629|       78293|
| B07BGHC4TD|AFJFAWK2477YBZZRL...|     5|    21878|       77321|
| B09YC8YCV6|AGD2MDSKRK6NY5EAH...|     5|   116233|       13620|
| B07633SRDK|AGD2MDSKRK6NY5EAH...|     5|   116233|       74929|
+-----------+--------------------+------+---------+------------+
only showing top 5 rows


                                                                                

None

In [29]:
# Split the ratings into training and test data
als_df_final = als_df_int_ids.select(col("userIntId").alias("userId"), col("productIntId").alias("productId"), col("rating"), col('parent_asin'))
als_df_final_cached = als_df_final.cache()

(training_data, test_data) = als_df_final_cached.randomSplit([0.7, 0.3], seed=42)
test_data_cached = test_data.cache()
training_data_cached = training_data.cache()

In [30]:
# Create model without any hyperparameter tuning

# Set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="userId", itemCol="productId", ratingCol="rating", rank = 10, maxIter = 15, regParam = .1,
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

# Fit the model to the training_data
model = als.fit(training_data_cached)

# Generate predictions on the test_data
test_predictions = model.transform(test_data_cached)

# Preview the predictions result
display(test_predictions.show(10))

spark.stop()

25/07/16 17:09:06 WARN TaskSetManager: Stage 17 contains a task of very large size (10053 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+------+---------+------+-----------+----------+
|userId|productId|rating|parent_asin|prediction|
+------+---------+------+-----------+----------+
|    28|     4748|     4| B0963ZXRM6|  3.393458|
|    28|     5684|     5| B0B9Z7PKW5| 3.5533638|
|    28|    70604|     4| B093FM7GJD|  2.520297|
|   148|     3331|     5| B00DM8J15C| 3.2995844|
|   148|    21264|     5| B00ECV2MRC| 2.8162053|
|   155|    70608|     1| B0014DZ2YG| 3.4768877|
|   183|    53764|     4| B01N0TQ0OH| 3.2370622|
|   211|    10331|     5| B001ICYB2M|  4.537841|
|   385|    36806|     4| B00T3JMVY2| 2.3790338|
|   496|      139|     4| B098NG86D7| 2.1709495|
+------+---------+------+-----------+----------+
only showing top 10 rows


None

### Hyperparameter Tuning

In [36]:
# TO DO: hyperparameter tuning

# # Hyperparameter Tuning
#
# # Use pyspark grid search
# param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [10, 50, 75, 100]) \
#            .addGrid(als.maxIter, [10]) \
#            .addGrid(als.regParam, [.05, .1, .15]) \
#            .build()
#
# # Create RMSE evaluator
# evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
#
# # Use cross validation
# cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, collectSubModels=True)
#
# # Checkpoint the training data to truncate its lineage.
# # This is a lazy operation, it will be triggered by the .fit() call.
# training_data_chkp = training_data_cached.checkpoint()
#
# # Fit the cross validator on the CHECKPOINTED DataFrame.
# model = cv.fit(training_data_chkp)
#
# # Best model
# best_model = model.bestModel
#
# # Average RMSE for each model
# avg_rmse_models = model.avgMetrics
#
# display(f"{len(param_grid)} models tested")