In [28]:
# # Khởi tạo Spark Session
# from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("BookRecommendationSystem") \
#     .config("spark.driver.memory", "4g") \
#     .config("spark.executor.memory", "4g") \
#     .config("spark.sql.shuffle.partitions", "10") \
#     .config("spark.default.parallelism", "10") \
#     .getOrCreate()

# # Hiển thị thông tin phiên bản Spark
# print(f"Spark version: {spark.version}")

# # Tải dữ liệu từ HDFS

# # Đường dẫn đến dữ liệu trên HDFS
# books_file = "hdfs://namenode:9000/user/hadoop/book_recommendation/books_data.csv"
# ratings_file = "hdfs://namenode:9000/user/hadoop/book_recommendation/Books_rating.csv"

# # Đọc dữ liệu
# try:
#     books_df = spark.read.csv(books_file, header=True, inferSchema=True)
#     ratings_df = spark.read.csv(ratings_file, header=True, inferSchema=True)
#     print("Đã tải dữ liệu từ HDFS thành công!")
# except Exception as e:
#     print(f"Lỗi khi tải dữ liệu từ HDFS: {str(e)}")
#     # Dự phòng: Tải từ local nếu HDFS không hoạt động
#     books_df = spark.read.csv("/path/to/local/books_data.csv", header=True, inferSchema=True)
#     ratings_df = spark.read.csv("/path/to/local/Books_rating.csv", header=True, inferSchema=True)

# Khởi tạo Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, isnull, lit
from pyspark.sql.types import FloatType, StringType
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.default.parallelism", "10") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# Đường dẫn đến dữ liệu trên HDFS
books_file = "hdfs://namenode:9000/user/hadoop/book_recommendation/books_data.csv"
ratings_file = "hdfs://namenode:9000/user/hadoop/book_recommendation/Books_rating.csv"

# Tải dữ liệu
book_rating = spark.read.csv(
    ratings_file,
    header=True,
    inferSchema=True
)

book_data = spark.read.csv(
    books_file,
    header=True,
    inferSchema=True
)

# Kiểm tra số lượng bản ghi ban đầu
print(f"Số lượng bản ghi đánh giá ban đầu: {book_rating.count()}")
print(f"Số lượng bản ghi sách ban đầu: {book_data.count()}")

# Kiểm tra giá trị null/rỗng
print("\nKiểm tra giá trị null/rỗng trong dữ liệu đánh giá:")
book_rating.select([count(when(isnull(c) | isnan(c) | (col(c) == ""), c)).alias(c) for c in book_rating.columns]).show()

print("\nKiểm tra giá trị null/rỗng trong dữ liệu sách:")
book_data.select([count(when(isnull(c) | isnan(c) | (col(c) == ""), c)).alias(c) for c in book_data.columns]).show()

# Đổi tên cột và ép kiểu dữ liệu
ratings_raw = book_rating.select(
    col("User_id").alias("UserID"),
    col("Id").alias("BookID"),
    col("title"),
    col("review/score").cast(FloatType()).alias("Rating")
)

books_raw = book_data.select("title", "categories")

# Kiểm tra phạm vi của Rating
print("\nThống kê về Rating:")
ratings_raw.select("Rating").summary().show()

# Loại bỏ các giá trị không hợp lệ
ratings_clean = ratings_raw.filter(
    # Loại bỏ null
    col("UserID").isNotNull() & 
    col("BookID").isNotNull() & 
    col("Rating").isNotNull() &
    # Loại bỏ chuỗi rỗng
    (col("UserID") != "") &
    (col("BookID") != "") &
    # Đảm bảo rating trong phạm vi hợp lệ (1-5)
    (col("Rating") >= 1) & 
    (col("Rating") <= 5)
)

books_clean = books_raw.filter(
    col("title").isNotNull() &
    (col("title") != "")
)

# Kiểm tra số lượng bản ghi sau khi lọc
print(f"Số lượng bản ghi đánh giá sau khi lọc dữ liệu không hợp lệ: {ratings_clean.count()}")
print(f"Số lượng bản ghi sách sau khi lọc: {books_clean.count()}")

# Lấy mẫu nếu dữ liệu quá lớn (tránh lỗi bộ nhớ)
sample_ratio = 0.2  # Có thể điều chỉnh
ratings_sampled = ratings_clean.sample(fraction=sample_ratio, seed=42)
print(f"Số lượng bản ghi đánh giá sau khi lấy mẫu: {ratings_sampled.count()}")

# Kiểm tra số lượng người dùng và sách độc nhất
distinct_users = ratings_sampled.select("UserID").distinct().count()
distinct_books = ratings_sampled.select("BookID").distinct().count()
print(f"Số lượng người dùng độc nhất: {distinct_users}")
print(f"Số lượng sách độc nhất: {distinct_books}")

# Chuyển đổi ID thành chỉ số sử dụng StringIndexer
userIndexer = StringIndexer(
    inputCol="UserID", 
    outputCol="userIndex", 
    handleInvalid="skip"
)

bookIndexer = StringIndexer(
    inputCol="BookID", 
    outputCol="bookIndex", 
    handleInvalid="skip"
)

# Áp dụng StringIndexer
user_indexer_model = userIndexer.fit(ratings_sampled)
ratings_indexed = user_indexer_model.transform(ratings_sampled)

book_indexer_model = bookIndexer.fit(ratings_indexed)
ratings_indexed = book_indexer_model.transform(ratings_indexed)

# Loại bỏ các giá trị null sau khi indexing (đề phòng)
ratings_final = ratings_indexed.na.drop()

print(f"Số lượng bản ghi đánh giá cuối cùng: {ratings_final.count()}")

# Lưu labels để dùng sau
user_labels = user_indexer_model.labels
book_labels = book_indexer_model.labels

# Tạo ánh xạ từ index sang ID gốc (để sử dụng sau này)
user_index_to_id = {i: label for i, label in enumerate(user_labels)}
book_index_to_id = {i: label for i, label in enumerate(book_labels)}

# Hiển thị dữ liệu đã xử lý
print("\nDữ liệu đánh giá sau khi xử lý:")
ratings_final.select("UserID", "userIndex", "BookID", "bookIndex", "Rating").show(5)

# Phân chia dữ liệu thành tập huấn luyện và tập kiểm thử
train, test = ratings_final.randomSplit([0.8, 0.2], seed=42)
print(f"Số lượng mẫu huấn luyện: {train.count()}")
print(f"Số lượng mẫu kiểm thử: {test.count()}")

Spark version: 3.5.0
Số lượng bản ghi đánh giá ban đầu: 3000000
Số lượng bản ghi sách ban đầu: 212404

Kiểm tra giá trị null/rỗng trong dữ liệu đánh giá:
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+
| Id|Title|  Price|User_id|profileName|review/helpfulness|review/score|review/time|review/summary|review/text|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+
|  0|  208|2517579| 562250|     562250|               367|         130|         27|            65|         43|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+


Kiểm tra giá trị null/rỗng trong dữ liệu sách:
+-----+-----------+-------+-----+-----------+---------+-------------+--------+----------+------------+
|Title|description|authors|image|previewLink|publisher|publishedDate|infoLink|categories|ratingsCount|
+-----+-----------+-------+-----+---

In [29]:


from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userIndex",
    itemCol="bookIndex",
    ratingCol="Rating",
    coldStartStrategy="drop",  # Quan trọng để xử lý các giá trị NaN
    nonnegative=True
)

# Huấn luyện mô hình
model = als.fit(train)

hdfs_model_path = "hdfs://namenode:9000/user/hadoop/book_recommendation/models/als_model"

try:
    # Lưu mô hình
    model.write().overwrite().save(hdfs_model_path)
    print(f"Mô hình đã được lưu thành công tại: {hdfs_model_path}")
except Exception as e:
    print(f"Lỗi khi lưu mô hình: {str(e)}")
    
    # Nếu không thể lưu vào HDFS, thử lưu vào thư mục local
    try:
        local_model_path = "/home/jovyan/work/notebooks/als_model"
        model.write().overwrite().save(local_model_path)
        print(f"Mô hình đã được lưu thành công vào thư mục local: {local_model_path}")
    except Exception as e2:
        print(f"Lỗi khi lưu mô hình vào thư mục local: {str(e2)}")

# Dự đoán
predictions = model.transform(test)

# Đánh giá mô hình
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) = {rmse:.4f}")

Mô hình đã được lưu thành công tại: hdfs://namenode:9000/user/hadoop/book_recommendation/models/als_model
Root Mean Square Error (RMSE) = 1.4173


In [30]:
# Đánh giá mô hình
# Dự đoán đánh giá trên tập kiểm thử
predictions = model.transform(test)
predictions.show(5)

# Tính toán RMSE (Root Mean Square Error)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE) trên tập kiểm thử = {rmse:.4f}")

+--------------+----------+--------------------+------+---------+---------+----------+
|        UserID|    BookID|               title|Rating|userIndex|bookIndex|prediction|
+--------------+----------+--------------------+------+---------+---------+----------+
|A2F6N60Z96CAJI|B0007PC4W0|I'll Be Watching You|   4.0|     22.0|  13934.0| 2.9670706|
|A2F6N60Z96CAJI|B000N761GW|       Halfway House|   5.0|     22.0|  11989.0|  3.805781|
|A2F6N60Z96CAJI|1423319621|The Saboteurs (Me...|   5.0|     22.0|   4150.0| 1.8978461|
|A2F6N60Z96CAJI|0670030554|Ready to Roll: A ...|   5.0|     22.0|  19282.0| 4.1872807|
|A2F6N60Z96CAJI|B000QBF896|           Samaritan|   5.0|     22.0|   7112.0|  3.782685|
+--------------+----------+--------------------+------+---------+---------+----------+
only showing top 5 rows

Root Mean Square Error (RMSE) trên tập kiểm thử = 1.4173


In [33]:
# from pyspark.sql.functions import explode, split, col, count, desc, avg, array, lit, udf, row_number
# from pyspark.sql.types import ArrayType, StringType, FloatType, StructType, StructField
# from pyspark.sql.window import Window

# # Kiểm tra dữ liệu books trước khi xử lý
# print("Số lượng bản ghi sách:", books.count())
# print("Số lượng bản ghi đánh giá:", ratings.count())

# # Kiểm tra cấu trúc dữ liệu sách
# print("Kiểm tra cấu trúc dữ liệu sách:")
# books.printSchema()

# # Kiểm tra dữ liệu mẫu
# print("Mẫu dữ liệu sách:")
# books.show(5, truncate=False)

# # Tạo một hàm để trích xuất danh sách thể loại từ cột categories
# @udf(returnType=ArrayType(StringType()))
# def extract_categories(categories_str):
#     if categories_str is None or categories_str == "":
#         return []
#     # Xử lý nhiều trường hợp phân cách
#     separators = [',', '|', ';']
#     for sep in separators:
#         if sep in categories_str:
#             return [cat.strip() for cat in categories_str.split(sep) if cat.strip()]
#     # Nếu không có dấu phân cách, coi như một thể loại duy nhất
#     return [categories_str.strip()] if categories_str.strip() else []

# # Áp dụng UDF để trích xuất danh sách thể loại
# try:
#     books_with_categories = books.withColumn(
#         "category_list", 
#         extract_categories(col("categories"))
#     )
    
#     # Kiểm tra kết quả
#     print("Mẫu sách với danh sách thể loại:")
#     books_with_categories.select("title", "categories", "category_list").show(5, truncate=False)
    
#     # Kiểm tra số lượng bản ghi có danh sách thể loại rỗng
#     empty_categories = books_with_categories.filter(size(col("category_list")) == 0).count()
#     print(f"Số lượng sách có danh sách thể loại rỗng: {empty_categories}")
    
# except Exception as e:
#     print(f"Lỗi khi trích xuất thể loại: {str(e)}")
    
#     # Nếu có lỗi, thử phương pháp đơn giản hơn
#     print("Thử phương pháp thay thế...")
#     books_with_categories = books.withColumn(
#         "category_list", 
#         array(col("categories"))  # Tạo mảng từ một phần tử duy nhất
#     ).filter(col("categories").isNotNull() & (col("categories") != ""))
    
#     print("Mẫu sách với danh sách thể loại (phương pháp thay thế):")
#     books_with_categories.select("title", "categories", "category_list").show(5, truncate=False)

# # Kết hợp thông tin đánh giá với thể loại sách
# try:
#     # Kiểm tra các cột join
#     print("Kiểm tra các cột join:")
#     print("Cột 'title' trong ratings:", "title" in ratings.columns)
#     print("Cột 'title' trong books_with_categories:", "title" in books_with_categories.columns)
    
#     ratings_with_info = ratings.join(
#         books_with_categories,
#         ratings.title == books_with_categories.title,
#         "left"
#     )
    
#     # Kiểm tra kết quả join
#     matched_count = ratings_with_info.filter(col("category_list").isNotNull()).count()
#     total_count = ratings_with_info.count()
#     print(f"Tỷ lệ đánh giá có thông tin thể loại: {matched_count}/{total_count} ({matched_count/total_count*100:.2f}%)")
    
# except Exception as e:
#     print(f"Lỗi khi join dữ liệu: {str(e)}")
#     # Kiểm tra các giá trị trùng lặp có thể gây lỗi khi join
#     print("Kiểm tra giá trị trùng lặp trong cột title của books:")
#     books_title_counts = books.groupBy("title").count().filter(col("count") > 1)
#     print("Số lượng tiêu đề sách trùng lặp:", books_title_counts.count())
#     if books_title_counts.count() > 0:
#         print("Các tiêu đề trùng lặp (top 5):")
#         books_title_counts.orderBy(desc("count")).show(5, truncate=False)
    
#     # Thử join với distinct titles
#     books_distinct = books_with_categories.dropDuplicates(["title"])
#     ratings_with_info = ratings.join(
#         books_distinct,
#         ratings.title == books_distinct.title,
#         "left"
#     )

# # Phân rã danh sách thể loại thành các hàng riêng biệt
# try:
#     from pyspark.sql.functions import size
    
#     # Lọc bỏ các đánh giá không có thông tin thể loại
#     ratings_with_categories = ratings_with_info.filter(
#         col("category_list").isNotNull() & 
#         (size(col("category_list")) > 0)
#     )
    
#     print(f"Số lượng đánh giá có thông tin thể loại: {ratings_with_categories.count()}")
    
#     # Phân rã danh sách
#     ratings_exploded = ratings_with_categories.withColumn(
#         "category", 
#         explode(col("category_list"))
#     ).filter(col("category").isNotNull() & (col("category") != ""))
    
#     # Kiểm tra kết quả
#     print("Số lượng hàng sau khi explode:", ratings_exploded.count())
#     print("Mẫu dữ liệu sau khi explode:")
#     ratings_exploded.select("UserID", "BookID", "title", "Rating", "category").show(5, truncate=False)
    
# except Exception as e:
#     print(f"Lỗi khi explode thể loại: {str(e)}")
#     # Kiểm tra cấu trúc của category_list
#     print("Kiểm tra cấu trúc category_list:")
#     ratings_with_info.select("category_list").distinct().limit(5).show(truncate=False)

# # Tính điểm trung bình và số lượng đánh giá theo người dùng và thể loại
# try:
#     user_category_stats = ratings_exploded.groupBy("UserID", "category").agg(
#         avg("Rating").alias("avg_rating"),
#         count("Rating").alias("rating_count")
#     )
    
#     # Kiểm tra kết quả
#     print("Thống kê người dùng và thể loại:")
#     user_category_stats.orderBy(desc("rating_count")).show(5)
    
#     # Tạo điểm kết hợp
#     user_favorite_categories = user_category_stats.withColumn(
#         "score", 
#         col("avg_rating") * col("rating_count")
#     )
    
#     # Xếp hạng thể loại cho mỗi người dùng
#     window_spec = Window.partitionBy("UserID").orderBy(desc("score"))
    
#     # Sử dụng try-except để phát hiện lỗi từ row_number
#     try:
#         user_top_categories = user_favorite_categories.withColumn(
#             "rank", 
#             row_number().over(window_spec)
#         ).filter(col("rank") <= 3)
        
#         # Kiểm tra kết quả
#         print("Thể loại yêu thích của người dùng:")
#         user_top_categories.orderBy("UserID", "rank").show(15, truncate=False)
#     except Exception as e:
#         print(f"Lỗi khi sử dụng row_number: {str(e)}")
        
#         # Phương pháp thay thế: sắp xếp thủ công
#         from pyspark.sql.functions import rank
#         user_top_categories = user_favorite_categories.withColumn(
#             "rank", 
#             rank().over(window_spec)
#         ).filter(col("rank") <= 3)
        
#         print("Thể loại yêu thích (phương pháp thay thế):")
#         user_top_categories.orderBy("UserID", "rank").show(15, truncate=False)
    
# except Exception as e:
#     print(f"Lỗi khi tính thống kê: {str(e)}")

# # Lấy danh sách người dùng mẫu để đề xuất
# try:
#     sample_users = ratings.select("UserID").distinct().limit(5)
#     sample_users_list = [row.UserID for row in sample_users.collect()]
    
#     print(f"Danh sách người dùng mẫu: {sample_users_list}")
    
#     # Đề xuất sách cho người dùng dựa trên thể loại yêu thích
#     recommendations = []
    
#     for user_id in sample_users_list:
#         print(f"\nĐề xuất sách cho người dùng {user_id}:")
        
#         # Lấy top thể loại yêu thích của người dùng
#         favorite_categories = user_top_categories.filter(
#             col("UserID") == user_id
#         ).orderBy("rank").select("category").collect()
        
#         if favorite_categories:
#             favorite_categories_list = [row.category for row in favorite_categories]
#             print(f"Thể loại yêu thích: {', '.join(favorite_categories_list)}")
            
#             # Lấy sách đã được đánh giá bởi người dùng
#             rated_books = ratings.filter(col("UserID") == user_id).select("BookID").rdd.flatMap(lambda x: [x.BookID]).collect()
            
#             # Tìm sách theo từng thể loại
#             for category in favorite_categories_list:
#                 # Tìm sách thuộc thể loại này với rating cao mà người dùng chưa đánh giá
#                 category_books = ratings_exploded.filter(
#                     (col("category") == category) & 
#                     (~col("BookID").isin(rated_books))
#                 ).groupBy("title", "BookID").agg(
#                     avg("Rating").alias("avg_rating"),
#                     count("Rating").alias("rating_count")
#                 ).filter(col("rating_count") >= 3)  # Giảm yêu cầu về số lượng đánh giá
                
#                 # Sắp xếp và lấy top 3
#                 category_recs = category_books.orderBy(desc("avg_rating"), desc("rating_count")).limit(3)
                
#                 # Thu thập kết quả
#                 category_books_list = category_recs.collect()
                
#                 if category_books_list:
#                     print(f"\nSách thuộc thể loại '{category}':")
#                     for i, book in enumerate(category_books_list, 1):
#                         print(f"{i}. '{book.title}' - Đánh giá trung bình: {book.avg_rating:.2f} ({book.rating_count} lượt)")
#                         recommendations.append((
#                             user_id, 
#                             book.BookID, 
#                             book.title, 
#                             category, 
#                             float(book.avg_rating)
#                         ))
#                 else:
#                     print(f"Không tìm thấy sách phù hợp thuộc thể loại '{category}'")
#         else:
#             print("Không thể xác định thể loại yêu thích.")
    
#     # Tạo DataFrame từ danh sách đề xuất
#     if recommendations:
#         schema = StructType([
#             StructField("UserID", StringType(), False),
#             StructField("BookID", StringType(), False),
#             StructField("Title", StringType(), False),
#             StructField("Category", StringType(), False),
#             StructField("AverageRating", FloatType(), False)
#         ])
        
#         recommendations_df = spark.createDataFrame(recommendations, schema)
        
#         print("\nBảng đề xuất sách theo thể loại:")
#         recommendations_df.show(truncate=False)
        
#         # Lưu kết quả đề xuất
#         try:
#             recommendations_df.write.csv(
#                 "hdfs://namenode:9000/user/hadoop/book_recommendation/category_recommendations", 
#                 header=True, 
#                 mode="overwrite"
#             )
#             print("Đã lưu kết quả đề xuất thành công!")
#         except Exception as e:
#             print(f"Lỗi khi lưu kết quả: {str(e)}")
#     else:
#         print("Không có đề xuất nào được tạo ra.")
    
# except Exception as e:
#     print(f"Lỗi trong quá trình tạo đề xuất: {str(e)}")

In [35]:
# # Hiển thị thông tin chi tiết về các sách được gợi ý
# # Join với dataframe sách để lấy thông tin chi tiết
# def get_book_info(book_id):
#     return books_df.filter(col("book_id") == book_id).first()

# # Hiển thị các gợi ý cùng với thông tin sách
# for user_rec in user_recs.collect():
#     user_id = user_rec.user_id
#     print(f"\nGợi ý cho người dùng ID {user_id}:")
    
#     # Lấy danh sách các sách đã được gợi ý
#     recommendations = user_rec.recommendations
    
#     for i, rec in enumerate(recommendations, 1):
#         book_id = rec.book_id
#         predicted_rating = rec.rating
        
#         # Tìm thông tin sách
#         book_info = get_book_info(book_id)
#         if book_info:
#             title = book_info["title"] if "title" in book_info else "Unknown"
#             author = book_info["author"] if "author" in book_info else "Unknown"
#             print(f"{i}. '{title}' by {author} (ID: {book_id}, Predicted Rating: {predicted_rating:.2f})")
#         else:
#             print(f"{i}. Book ID: {book_id}, Predicted Rating: {predicted_rating:.2f}")

In [36]:
# # Đánh giá hiệu suất dựa trên Mean Average Precision (MAP)
# from pyspark.ml.evaluation import RankingEvaluator

# # Chuẩn bị dữ liệu cho đánh giá ranking
# # Tạo ground truth - các sách mà người dùng đã đánh giá cao (rating >= 4)
# ground_truth = ratings_clean_df.filter(col("rating") >= 4.0) \
#                             .groupBy("user_id") \
#                             .agg(expr("collect_list(book_id) as actual_items"))

# # Tạo danh sách các sách được gợi ý
# users = ratings_clean_df.select("user_id").distinct()
# recommendations = model.recommendForAllUsers(20)  # 20 gợi ý cho mỗi người dùng
# predictions_for_eval = recommendations.withColumn(
#     "recommended_items", 
#     expr("TRANSFORM(recommendations, x -> x.book_id)")
# )

# # Join dữ liệu ground truth và predictions
# pred_vs_actual = predictions_for_eval.join(
#     ground_truth, 
#     "user_id", 
#     "inner"
# ).select("user_id", "recommended_items", "actual_items")

# # Tính MAP
# rank_evaluator = RankingEvaluator(
#     predictionCol="recommended_items", 
#     labelCol="actual_items", 
#     metricName="meanAveragePrecision"
# )

# map_score = rank_evaluator.evaluate(pred_vs_actual)
# print(f"Mean Average Precision (MAP) = {map_score:.4f}")

In [37]:
# # Lưu mô hình để sử dụng sau này
# hdfs_model_path = "hdfs://namenode:9000/user/hadoop/book_recommendation/models/book_recommender_als"


# try:
#     model.save(model_path)
#     print(f"Đã lưu mô hình tại: {model_path}")
# except Exception as e:
#     print(f"Lỗi khi lưu mô hình: {str(e)}")
#     # Lưu trên HDFS
#     hdfs_model_path = "hdfs://localhost:9000/user/hadoop/book_recommendation/models/book_recommender_als"
#     model.save(hdfs_model_path)
#     print(f"Đã lưu mô hình trên HDFS tại: {hdfs_model_path}")

In [41]:
# Tạo chức năng gợi ý sách cho người dùng cụ thể
from pyspark.sql.functions import col, desc, expr

def recommend_books_for_user(user_id, top_n=10):
    """
    Tạo gợi ý sách cho một người dùng cụ thể
    
    Parameters:
    user_id: ID người dùng cần gợi ý
    top_n: Số lượng sách gợi ý tối đa
    
    Returns:
    DataFrame chứa sách được gợi ý
    """
    try:
        # Kiểm tra xem người dùng có tồn tại trong dữ liệu không
        if ratings.filter(col("UserID") == user_id).count() == 0:
            print(f"Không tìm thấy dữ liệu đánh giá cho người dùng {user_id}")
            return spark.createDataFrame([], "BookID STRING, predicted_rating FLOAT")
            
        # Lấy sách đã được đánh giá bởi người dùng
        rated_books = ratings.filter(col("UserID") == user_id) \
                         .select("BookID").rdd.flatMap(lambda x: [x.BookID]).collect()
        
        print(f"Người dùng {user_id} đã đánh giá {len(rated_books)} sách")
        
        # Tìm thể loại yêu thích của người dùng
        if 'user_top_categories' in globals():
            favorite_categories = user_top_categories.filter(col("UserID") == user_id) \
                                   .orderBy("rank").select("category").collect()
            
            if favorite_categories:
                favorite_category_list = [row.category for row in favorite_categories]
                print(f"Thể loại yêu thích: {', '.join(favorite_category_list)}")
                
                # Tạo gợi ý dựa trên thể loại yêu thích
                recommendations = []
                
                for category in favorite_category_list:
                    # Tìm sách thuộc thể loại này với rating cao mà người dùng chưa đánh giá
                    category_books = ratings_exploded.filter(
                        (col("category") == category) & 
                        (~col("BookID").isin(rated_books))
                    ).groupBy("BookID", "title").agg(
                        avg("Rating").alias("avg_rating"),
                        count("Rating").alias("rating_count")
                    ).filter(col("rating_count") >= 2)  # Sách có ít nhất 2 đánh giá
                    
                    # Sắp xếp và lấy top 3 sách cho mỗi thể loại
                    category_recs = category_books.orderBy(desc("avg_rating"), desc("rating_count")).limit(3)
                    recommendations.append(category_recs)
                
                if recommendations:
                    # Ghép các gợi ý từ các thể loại khác nhau
                    combined_recs = recommendations[0]
                    for i in range(1, len(recommendations)):
                        combined_recs = combined_recs.union(recommendations[i])
                    
                    # Sắp xếp lại và lấy top_n
                    final_recs = combined_recs.orderBy(desc("avg_rating"), desc("rating_count")).limit(top_n)
                    return final_recs
                else:
                    print("Không tìm thấy gợi ý phù hợp dựa trên thể loại")
            else:
                print(f"Không tìm thấy thể loại yêu thích cho người dùng {user_id}")
        else:
            print("Chưa phân tích thể loại yêu thích của người dùng")
            
        # Fallback: Tìm sách phổ biến mà người dùng chưa đánh giá
        print("Đang sử dụng phương pháp dự phòng (sách phổ biến)...")
        popular_books = ratings.filter(~col("BookID").isin(rated_books)) \
                       .groupBy("BookID", "title").agg(
                           avg("Rating").alias("avg_rating"),
                           count("Rating").alias("rating_count")
                       ).filter(col("rating_count") >= 5) \
                       .orderBy(desc("avg_rating"), desc("rating_count")) \
                       .limit(top_n)
        
        return popular_books
        
    except Exception as e:
        print(f"Lỗi khi tạo gợi ý: {str(e)}")
        # Trả về DataFrame trống nếu có lỗi
        return spark.createDataFrame([], "BookID STRING, title STRING, avg_rating FLOAT, rating_count INT")

# Lấy danh sách mẫu người dùng để thử nghiệm
sample_users = ratings.select("UserID").distinct().limit(5)
User_ids = [row.UserID for row in sample_users.collect()]

# Thử nghiệm hàm gợi ý
test_user_id = User_ids[0]  # Lấy một người dùng từ mẫu
print(f"Gợi ý sách cho người dùng {test_user_id}:")
recommendations_df = recommend_books_for_user(test_user_id, top_n=10)

# Hiển thị gợi ý
if recommendations_df.count() > 0:
    print("\nSách được gợi ý:")
    recommendations_df.show(truncate=False)
    
    # Hiển thị thông tin chi tiết về sách
    print("\nThông tin chi tiết:")
    for i, book in enumerate(recommendations_df.collect(), 1):
        book_id = book.BookID
        title = book.title
        avg_rating = book.avg_rating
        count = book.rating_count
        
        # Tìm thể loại của sách
        book_categories = books_with_categories.filter(col("title") == title) \
                         .select("category_list").collect()
        categories = []
        if book_categories and len(book_categories) > 0 and book_categories[0].category_list:
            categories = book_categories[0].category_list
            
        print(f"{i}. '{title}' (ID: {book_id})")
        print(f"   Đánh giá: {avg_rating:.2f}/5 ({count} lượt)")
        if categories:
            print(f"   Thể loại: {', '.join(categories)}")
        print()
else:
    print("Không tìm được gợi ý phù hợp")

# Thử nghiệm với các người dùng khác
print("\n=== Gợi ý cho tất cả người dùng mẫu ===")
for idx, user_id in enumerate(User_ids, 1):
    print(f"\n--- Người dùng {idx}: {user_id} ---")
    
    # Tạo gợi ý
    user_recs = recommend_books_for_user(user_id, top_n=5)
    
    # Hiển thị gợi ý
    if user_recs.count() > 0:
        print(f"Top 5 sách được gợi ý cho người dùng {user_id}:")
        user_recs.show(truncate=False)
    else:
        print(f"Không tìm được gợi ý phù hợp cho người dùng {user_id}")

AttributeError: 'NoneType' object has no attribute 'setCallSite'

In [None]:
# Gợi ý các sách phù hợp (item-item collaborative filtering)
def find_similar_books(book_id, top_n=10):
    """
    Tìm các sách tương tự với một cuốn sách cụ thể
    
    Parameters:
    -----------
    book_id : int
        ID của cuốn sách
    top_n : int
        Số lượng sách tương tự cần tìm
        
    Returns:
    --------
    DataFrame
        DataFrame chứa các sách tương tự và độ tương tự
    """
    # Lấy các yếu tố tiềm ẩn cho các sách
    item_factors = model.itemFactors
    
    # Tìm vector yếu tố cho sách được chỉ định
    book_factor = item_factors.filter(col("id") == book_id).select("features").first()
    
    if book_factor is None:
        print(f"Không tìm thấy thông tin cho sách có ID: {book_id}")
        return None
    
    # Tính khoảng cách cosine giữa cuốn sách này và tất cả các sách khác
    from pyspark.ml.functions import vector_to_array
    from pyspark.sql.functions import arrays_zip, expr, array
    
    # Tính toán khoảng cách cosine
    dot_product = expr("dotProduct(features, array_features)")
    norm_target = expr("sqrt(dotProduct(array_features, array_features))")
    norm_item = expr("sqrt(dotProduct(features, features))")
    
    target_features = book_factor.features
    
    similar_books = item_factors.filter(col("id") != book_id) \
                               .withColumn("array_features", array([lit(f) for f in target_features])) \
                               .withColumn("similarity", dot_product / (norm_target * norm_item)) \
                               .select("id", "similarity") \
                               .orderBy(desc("similarity")) \
                               .limit(top_n) \
                               .withColumnRenamed("id", "book_id")
    
    # Join với dữ liệu sách để lấy thông tin chi tiết
    result = similar_books.join(books_df, "book_id", "left")
    
    return result

# Thử nghiệm hàm tìm sách tương tự
# Lấy một ID sách ngẫu nhiên
sample_book_id = books_df.select("book_id").limit(1).first().book_id
print(f"\nTìm sách tương tự với sách có ID {sample_book_id}:")

similar_books_df = find_similar_books(sample_book_id, top_n=10)
if similar_books_df:
    sample_book = get_book_info(sample_book_id)
    if sample_book:
        print(f"Sách gốc: '{sample_book['title']}' by {sample_book['author']}")
    
    print("Các sách tương tự:")
    similar_books_df.show(truncate=False)

In [42]:
# Đóng SparkSession
spark.stop()
print("Đã đóng SparkSession")

Đã đóng SparkSession
