In [1]:
import json
import pandas as pd
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode


spark = SparkSession.builder.appName("ProcessingDataset")\
    .config("spark.kryoserializer.buffer.max", "512m") \
    .getOrCreate()
spark.conf.set('spark.sql.caseSensitive', True)

In [2]:
user_reviews= spark.read.parquet("Dataset/user_reviews.parquet")

In [3]:
user_reviews.show(5, False)

+------------+--------------------------------------------------------+------+------------+----------------------------+----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+-----------------+-------------------+
|review_id   |ti

In [4]:
user_reviews_demo = user_reviews.limit(50000)

In [5]:
user_reviews_demo.show(5, False)

+------------+--------------------------------------------------------+------+------------+----------------------------+----------+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+-----------------+-------------------+
|review_id   |ti

In [6]:
metadata=spark.read.parquet("Dataset/metadata.parquet")
metadata.show(5, False)

+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-------------+-------------------------+-------------------------------------------------------------------+-----+-----------+
|parent_asin|title                                                                                                                                                                                                   |average_rating|rating_number|main_category            |categories                                                         |price|store      |
+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-------------+-------------------------+-------------------

In [7]:
full_data = user_reviews_demo.join(metadata, user_reviews_demo.parent_asin == metadata.parent_asin, "inner")

In [8]:
full_data_filter = full_data.select(
	"review_id",
	user_reviews_demo["title"].alias("title"),
	metadata["title"].alias("product_title"),
	"rating",
	"average_rating",
	"rating_number",
	user_reviews_demo["asin"].alias("asin"),
	metadata["parent_asin"].alias("parent_asin"),
	"user_id",
	"helpful_vote",
	"categories"
)
full_data_filter.show(5, False)

+------------+----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+--------------+-------------+----------+-----------+----------------------------+------------+---------------------------------------------------------------------+
|review_id   |title                                   |product_title                                                                                                                                                                                           |rating|average_rating|rating_number|asin      |parent_asin|user_id                     |helpful_vote|categories                                                           |
+------------+----------------------------------------+-----------------------------------------------------------------------------------------

In [9]:
full_data_filter.printSchema()

root
 |-- review_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Simple recommendation model


In [10]:
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec, VectorAssembler, StringIndexer
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

# 1. Tiền xử lý dữ liệu
cleaned_data = full_data_filter.filter(
    (col("rating").isNotNull()) &
    (col("user_id").isNotNull()) &
    (col("asin").isNotNull())
)

# 2. Xử lý văn bản
tokenizer = Tokenizer(inputCol="title", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
word2vec = Word2Vec(vectorSize=100, minCount=5, inputCol="filtered_words", outputCol="title_vector")

# 3. Mã hóa user_id và asin
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_index")
item_indexer = StringIndexer(inputCol="asin", outputCol="item_index")

# 4. Kết hợp các đặc trưng
assembler = VectorAssembler(
    inputCols=["title_vector", "rating_number", "helpful_vote"],
    outputCol="features"
)

# 5. Chuẩn bị dữ liệu
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    word2vec,
    user_indexer,
    item_indexer,
    assembler
])
processed_data = pipeline.fit(cleaned_data).transform(cleaned_data)

# 6. Chia tập train/test
(train, test) = processed_data.randomSplit([0.8, 0.2])

# 7. Xây dựng mô hình
gbt = GBTRegressor(featuresCol="features", labelCol="rating", maxIter=10)
model = gbt.fit(train)

# 8. Dự đoán và đánh giá
predictions = model.transform(test)
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
print(f"RMSE = {evaluator.evaluate(predictions)}")

RMSE = 1.2111761710415698


In [11]:
# Hiển thị 10 dòng đầu tiên của DataFrame predictions
predictions.select("user_id", "asin", "rating", "prediction").show(10)

+--------------------+----------+------+------------------+
|             user_id|      asin|rating|        prediction|
+--------------------+----------+------+------------------+
|AFZIKYRBTX3ETK3Y2...|B000TP2V3A|   5.0| 5.003063318598128|
|AHR7UVTO3OYYC4P4I...|B005HRZ3N0|   4.0|3.0628716956246946|
|AHJJUHFEN3OBRTTWY...|B00C0Z6KCA|   1.0| 3.166722358340764|
|AGD7HDPMJPUDRHKUJ...|B09PVCVQDP|   1.0|1.0494720107970563|
|AH7N3Q4O4P2QT3BCM...|B000HZFCT2|   5.0| 5.003063318598128|
|AEEB2COGEIWCUR2F7...|B00KSQHX1K|   5.0| 5.003063318598128|
|AH45BPEN63SXGJJVK...|B00HZWEB74|   5.0| 4.488732758582935|
|AHBP7E3NBVL34EWV3...|B07FFN4H9G|   5.0| 4.731403270558192|
|AGPKGKBWZTBCTUDHM...|B07WW24S58|   5.0| 4.507616603582671|
|AFM7J3OCFMHI4V23S...|B07RCXNW3K|   1.0|3.6515832793621192|
+--------------------+----------+------+------------------+
only showing top 10 rows



## Another model


In [12]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy("asin")
data_with_label = full_data_filter.withColumn(
    "label", F.avg("rating").over(window_spec)
)

In [13]:
cleaned_data = data_with_label.dropna(subset=["asin", "title", "label","product_title","parent_asin"])

In [14]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tokenizer = Tokenizer(inputCol="title", outputCol="words")
words_df = tokenizer.transform(cleaned_data)

remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_df)

In [15]:
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize=100, inputCol="filtered_words", outputCol="title_vector")
w2v_model = word2vec.fit(filtered_df)
title_vectors_df = w2v_model.transform(filtered_df)

In [16]:
from pyspark.ml.feature import StringIndexer

asin_indexer = StringIndexer(inputCol="asin", outputCol="asin_index", handleInvalid="keep")
parent_asin_indexer = StringIndexer(inputCol="parent_asin", outputCol="parent_asin_index", handleInvalid="keep")
indexed_df = asin_indexer.fit(title_vectors_df).transform(title_vectors_df)
indexed_df = parent_asin_indexer.fit(indexed_df).transform(indexed_df)

In [17]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["asin_index", "title_vector","parent_asin_index"],
    outputCol="features"
)
final_df = assembler.transform(indexed_df).select("features", "label", "asin","parent_asin")

In [18]:
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=42)

In [19]:
# Re-assemble features to exclude asin_index (categorical with too many values)
from sklearn.linear_model import SGDRegressor


assembler = VectorAssembler(
	inputCols=["title_vector"],
	outputCol="features"
)
final_df_no_cat = assembler.transform(title_vectors_df).select("features", "label", "asin","parent_asin")

# Split again for training/testing
train_df_no_cat, test_df_no_cat = final_df_no_cat.randomSplit([0.8, 0.2], seed=42)

# Convert Spark DataFrame to pandas DataFrame for sklearn
train_pd = train_df_no_cat.select("features", "label").toPandas()
X_train = train_pd["features"].apply(lambda v: v.toArray()).tolist()
y_train = train_pd["label"].values

sgd = SGDRegressor(max_iter=10)
model = sgd.fit(X_train, y_train)

In [20]:
# Convert Spark DataFrame to pandas DataFrame for sklearn prediction (use test_df_no_cat for correct feature size)
test_pd = test_df_no_cat.select("features", "label").toPandas()
X_test = test_pd["features"].apply(lambda v: v.toArray()).tolist()
y_test = test_pd["label"].values

# Predict using the trained SGDRegressor
y_pred = model.predict(X_test)

# Evaluate RMSE using sklearn
from sklearn.metrics import mean_squared_error
import numpy as np

rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f"RMSE = {rmse}")

RMSE = 1.0057047165400845


In [21]:
predictions.printSchema()

root
 |-- review_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_vector: vector (nullable = true)
 |-- user_index: double (nullable = false)
 |-- item_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [22]:
# Giữ lại average_rating và title gốc từ dữ liệu ban đầu
predictions_alias = predictions.alias("pred")
cleaned_data_alias = cleaned_data.select("asin", "average_rating", "product_title", "parent_asin") \
    .dropDuplicates(["asin"]).alias("cd")

test_with_avg = predictions_alias.join(
    cleaned_data_alias,
    predictions_alias.asin == cleaned_data_alias.asin,
    how="inner"
)

# Hiển thị kết quả
test_with_avg.select(
    test_with_avg["pred.asin"].alias("asin"),
    test_with_avg["pred.product_title"].alias("product_title"),
    test_with_avg["pred.prediction"].alias("prediction"),
    test_with_avg["cd.average_rating"].alias("average_rating"),
    test_with_avg["cd.parent_asin"].alias("parent_asin")
).show()

+----------+--------------------+------------------+--------------+-----------+
|      asin|       product_title|        prediction|average_rating|parent_asin|
+----------+--------------------+------------------+--------------+-----------+
|0375869026|              Wonder| 5.003063318598128|           4.7| 0375869026|
|0375869026|              Wonder| 5.003063318598128|           4.7| 0375869026|
|0375869026|              Wonder| 4.395240822587136|           4.7| 0375869026|
|1529110947|When Breath Becom...| 4.786231492981767|           4.7| 1529110947|
|1529110947|When Breath Becom...| 4.054776196618482|           4.7| 1529110947|
|907843905X| Ship Simulator 2008| 3.509583553268774|           3.3| 907843905X|
|9629971372|Dotop Sony Playst...| 4.343345010216033|           4.4| 9629971372|
|9629971372|Dotop Sony Playst...|3.5856983020777635|           4.4| 9629971372|
|B00000I1BQ|Crash Bandicoot [...| 4.298879075070092|           3.8| B004HILZV4|
|B00000JLQU|Barbie Super Spor...| 4.7026

In [23]:
metadata.printSchema()

root
 |-- parent_asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- main_category: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- price: string (nullable = true)
 |-- store: string (nullable = true)



In [24]:
# Fit the asin_indexer if not already fitted (use title_vectors_df as in cell 17)
asin_indexer_model = asin_indexer.fit(title_vectors_df)

def predict_rating(asin_input):
    # Lấy title từ metadata
    title_row = metadata.filter(metadata.parent_asin == asin_input).select("title").first()
    if title_row is None:
        raise ValueError(f"ASIN {asin_input} not found in metadata.")
    title = title_row["title"]
    # Tạo DataFrame với asin và title
    df = spark.createDataFrame([(asin_input, title)], ["parent_asin", "title"])
    df = asin_indexer_model.transform(df)
    df = tokenizer.transform(df)
    df = remover.transform(df)
    df = w2v_model.transform(df)
    df = assembler.transform(df)
    # Convert Spark DataFrame to pandas and extract features as numpy array
    features_np = df.select("features").toPandas()["features"].apply(lambda v: v.toArray()).tolist()
    prediction = model.predict(features_np)[0]
    return prediction

# Dùng thử
predicted_rating = predict_rating("B004HD55V0")
print(f"Dự đoán rating cho B004HD55V0: {predicted_rating}")

Dự đoán rating cho B004HD55V0: 3.7608281946646858


In [25]:
from pyspark.sql.functions import lit

def recommend_top_n_for_user(user_id, n=10):
    # Lấy lịch sử các sản phẩm user đã đánh giá
    user_history = cleaned_data.filter(col("user_id") == user_id).select("asin").distinct()
    user_history_asins = [row.asin for row in user_history.collect()]

    # Lấy danh sách tất cả sản phẩm chưa đánh giá
    all_asins = cleaned_data.select("asin", "title", "product_title").distinct()
    unseen_products = all_asins.filter(~col("asin").isin(user_history_asins))

    # Chuẩn bị DataFrame cho dự đoán: mỗi sản phẩm unseen + user_id
    user_products = unseen_products.withColumn("user_id", lit(user_id))

    # Tiền xử lý giống train
    user_products = tokenizer.transform(user_products)
    user_products = remover.transform(user_products)
    user_products = w2v_model.transform(user_products)
    user_products = assembler.transform(user_products)

    # Dự đoán rating
    features_np = user_products.select("features").toPandas()["features"].apply(lambda v: v.toArray()).tolist()
    predicted_ratings = model.predict(features_np)

    # Gắn lại vào DataFrame
    user_products_pd = user_products.select("asin", "product_title").toPandas()
    user_products_pd["predicted_rating"] = predicted_ratings

    # Sắp xếp và lấy top n
    top_n = user_products_pd.sort_values("predicted_rating", ascending=False).head(n)

    # Hiển thị kết quả
    print(top_n[["asin", "product_title", "predicted_rating"]])
    return top_n

# Ví dụ sử dụng:
user_id = "AHP4ABT4A0UOKHKXCUT3JCFU623A"  # thay bằng user_id thực tế
recommend_top_n_for_user(user_id, n=10)

             asin                                      product_title  \
39276  B07W548HVT  Boowen Wireless Controller for PS4,Double Shoc...   
27654  B01N4JYY1H                     Xbox Wireless Controller – Red   
18631  B00IZ9T9QE  AKIBA'S TRIP: Undead & Undressed - PlayStation...   
42041  B08DJJ9WLW                         EA SPORTS UFC 4 - Xbox One   
13499  B008Y1XJGE          LEGO Batman 2: DC Super Heroes [Download]   
42681  B08JHQ4NBB  Sackboy: A Big Adventure Special Edition - Pla...   
128    B00000J97G                              Game Boy Color - Teal   
6643   B002BVQNMU              101-in-1 Party Megamix - Nintendo Wii   
12530  B007MZUFUG                                              Brave   
38411  B07SMBJVS8  Legend of Zelda Link's Awakening - Nintendo Sw...   

       predicted_rating  
39276          6.400832  
27654          5.021356  
18631          4.970993  
42041          4.970993  
13499          4.970993  
42681          4.970993  
128            4.970993  

Unnamed: 0,asin,product_title,predicted_rating
39276,B07W548HVT,"Boowen Wireless Controller for PS4,Double Shoc...",6.400832
27654,B01N4JYY1H,Xbox Wireless Controller – Red,5.021356
18631,B00IZ9T9QE,AKIBA'S TRIP: Undead & Undressed - PlayStation...,4.970993
42041,B08DJJ9WLW,EA SPORTS UFC 4 - Xbox One,4.970993
13499,B008Y1XJGE,LEGO Batman 2: DC Super Heroes [Download],4.970993
42681,B08JHQ4NBB,Sackboy: A Big Adventure Special Edition - Pla...,4.970993
128,B00000J97G,Game Boy Color - Teal,4.970993
6643,B002BVQNMU,101-in-1 Party Megamix - Nintendo Wii,4.970993
12530,B007MZUFUG,Brave,4.970993
38411,B07SMBJVS8,Legend of Zelda Link's Awakening - Nintendo Sw...,4.970993
