In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import functions as F

In [2]:
# Khởi tạo Spark session
spark = SparkSession.builder.appName('ALSExample').getOrCreate()

In [3]:
# Đọc dữ liệu từ file CSV
review_data = pd.read_csv(r"C:\Users\anhn2\Documents\DJANGO\DA\TIKI\comments_data.csv")
# Chuyển đổi pandas DataFrame thành Spark DataFrame
df = spark.createDataFrame(review_data)
# Chia dữ liệu thành train và test (80%-20%)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [4]:
# Khởi tạo mô hình ALS
als = ALS(
    userCol="user_id", 
    itemCol="product_id", 
    ratingCol="rating", 
    nonnegative=True, 
    coldStartStrategy="drop"
)
# Huấn luyện mô hình ALS
model = als.fit(train_data)

# Dự đoán trên tập test
predictions = model.transform(test_data)

In [5]:
# Đánh giá mô hình với RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

RMSE: 1.3545765709904707


In [6]:
# Tuning tham số bằng CrossValidator và GridSearch
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 50]) \
    .addGrid(als.maxIter, [10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1]) \
    .build()

In [7]:
# CrossValidator
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)

crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5
)

# Fit mô hình với cross-validation
cv_model = crossval.fit(train_data)

In [8]:
# Lấy các kết quả của Cross-Validation và hiển thị
results = pd.DataFrame(cv_model.avgMetrics, columns=["RMSE"])
param_combinations = []

for params in cv_model.getEstimatorParamMaps():
    param_dict = {param.name: params[param] for param in params}
    param_combinations.append(param_dict)

param_combinations_df = pd.DataFrame(param_combinations)

# Kết hợp các tham số và kết quả RMSE
final_results = pd.concat([param_combinations_df, results], axis=1)
final_results

Unnamed: 0,rank,maxIter,regParam,RMSE
0,10,10,0.01,2.27067
1,10,10,0.1,1.44813
2,10,20,0.01,1.871057
3,10,20,0.1,1.221431
4,20,10,0.01,1.824959
5,20,10,0.1,1.442916
6,20,20,0.01,1.600707
7,20,20,0.1,1.239311
8,50,10,0.01,1.532266
9,50,10,0.1,1.415782


In [9]:

# Dự đoán với mô hình tốt nhất từ CrossValidator
best_model = cv_model.bestModel
cv_predictions = best_model.transform(test_data)

# Đánh giá mô hình tốt nhất với RMSE
cv_rmse = evaluator.evaluate(cv_predictions)
print(f"RMSE from CrossValidator: {cv_rmse}")


RMSE from CrossValidator: 1.1100242372643303


In [10]:
from pyspark.sql import functions as F
from pyspark.ml.recommendation import ALSModel

def get_top_k_recommendations(predictions, k=10):
    # Lấy Top-K sản phẩm gợi ý cho mỗi người dùng
    top_k = predictions.groupBy("user_id") \
        .agg(F.collect_list("product_id").alias("product_ids"),
             F.collect_list("prediction").alias("predictions")) \
        .withColumn("top_k", F.expr("slice(predictions, 1, {})".format(k))) \
        .withColumn("top_k_ids", F.expr("slice(product_ids, 1, {})".format(k))) \
        .select("user_id", "top_k", "top_k_ids")
    
    return top_k

def precision_recall_at_k(predictions, test_data, k=10, threshold=3.5):
    # Lấy Top-K gợi ý
    top_k_recommendations = get_top_k_recommendations(predictions, k)
    
    # Join predictions với test_data để so sánh rating thực tế với prediction
    correct_recommendations = top_k_recommendations.join(predictions, on="user_id", how="left")
    
    # Xử lý cột actual và predicted
    correct_recommendations = correct_recommendations.withColumn(
        "actual", F.when(correct_recommendations["rating"] >= threshold, 1).otherwise(0)
    )
    
    correct_recommendations = correct_recommendations.withColumn(
        "predicted", F.when(correct_recommendations["prediction"] >= threshold, 1).otherwise(0)
    )
    
    # Tính Precision và Recall
    tp = correct_recommendations.filter((correct_recommendations["actual"] == 1) & (correct_recommendations["predicted"] == 1)).count()
    fp = correct_recommendations.filter((correct_recommendations["actual"] == 0) & (correct_recommendations["predicted"] == 1)).count()
    fn = correct_recommendations.filter((correct_recommendations["actual"] == 1) & (correct_recommendations["predicted"] == 0)).count()
    
    precision = tp / (tp + fp) if tp + fp != 0 else 0
    recall = tp / (tp + fn) if tp + fn != 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if precision + recall != 0 else 0
    
    return precision, recall, f1_score

# Tính toán Precision, Recall và F1-Score
precision, recall, f1_score = precision_recall_at_k(predictions, test_data, k=10)

# In kết quả
print(f"Precision@10: {precision:.4f}")
print(f"Recall@10: {recall:.4f}")
print(f"F1-Score@10: {f1_score:.4f}")


Precision@10: 0.9756
Recall@10: 0.7742
F1-Score@10: 0.8633


In [12]:
import time

# Lấy mô hình tốt nhất từ CrossValidator
best_model_params = best_model.extractParamMap()

# Khởi tạo lại ALS với các tham số tốt nhất
als_best = ALS(
    userCol="user_id",
    itemCol="product_id",
    ratingCol="rating",
    nonnegative=True,
    coldStartStrategy="drop",
    rank=10,
    maxIter=20,
    regParam=0.1
)

# Đo thời gian bắt đầu
training_start_time = time.time()

# Huấn luyện lại mô hình với tham số tốt nhất
final_model = als_best.fit(train_data)

# Đo thời gian kết thúc
training_end_time = time.time()

# Tính thời gian huấn luyện
training_time = training_end_time - training_start_time
print(f"Thời gian huấn luyện mô hình với tham số tốt nhất: {training_time:.2f} giây")


Thời gian huấn luyện mô hình với tham số tốt nhất: 12.41 giây
