In [1]:
import numpy as np
import pandas as pd
from datetime import datetime

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [98]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH

In [5]:
spark = SparkSession.builder\
    .master("local[*]")\
    .config("spark.executor.memory", "20g")\
    .config("spark.driver.memory", "10g")\
    .config("spark.memory.offHeap.enabled",True)\
    .config("spark.memory.offHeap.size","8g")\
    .appName("Book Recommendations").getOrCreate()

# Amazon Books

In [6]:
df_meta = spark.read.json("../data/Amazon/meta_Books.json")

In [8]:
df_meta.count()

2934949

In [6]:
df_meta.show()

+--------------------+--------------------+----------+--------------------+--------------------+----+--------------------+-------+-------+---+--------+---------------+--------+-------+--------------------+------------+-----+-----+--------------------+
|            also_buy|           also_view|      asin|               brand|            category|date|         description|details|feature|fit|imageURL|imageURLHighRes|main_cat|  price|                rank|similar_item|tech1|tech2|               title|
+--------------------+--------------------+----------+--------------------+--------------------+----+--------------------+-------+-------+---+--------+---------------+--------+-------+--------------------+------------+-----+-----+--------------------+
|[0669009075, B000...|[0019777701, B000...|0000092878|        Keith Graham|                  []|    |[It is a biology ...|   null|     []|   |      []|             []|   Books| $39.94|1,349,781 in Books (|            |     |     |Biology Gods L

In [27]:
df_meta.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- \n    Item Weight: \n    : string (nullable = true)
 |    |-- \n    Package Dimensions: \n    : string (nullable = true)
 |    |-- \n    Product Dimensions: \n    : string (nullable = true)
 |    |--  Date first listed on Amazon:: string (nullable = true)
 |    |-- 3.5" and 5.25" disks:: string (nullable = true)
 |    |-- 3.5" disk:: string (nullable = true)
 |    |-- 5.25" disk:: string (nullable = true)
 |    |-- ASIN:: string (nullable = true)
 |    |-- ASIN: : string (nullable

In [7]:
df = spark.read.json("../data/Amazon/Books.json")

In [None]:
df.show()

+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|      asin|image|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|               style|             summary|unixReviewTime|verified|vote|
+----------+-----+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------------+--------------+--------+----+
|0001713353| null|    5.0|This book is a wi...|08 12, 2005|A1C6M8LCIX4M6M|            June Bug|{null,  Paperback...| Children's favorite|    1123804800|   false|null|
|0001713353| null|    5.0|The King, the Mic...|03 30, 2005|A1REUF3A1YCPHM|         TW Ervin II|{null,  Hardcover...|A story children ...|    1112140800|   false|null|
|0001713353| null|    5.0|My daughter got h...| 04 4, 2004| A1YRBRK2XM5D5|   Rebecca L. Menner|{null,  Hardcover...|          Third copy|    1081036800|   false|   5

In [8]:
# Lựa chọn các cột cần merge
df_meta_selected = df_meta.select('title', 'asin')

# Thực hiện merge bằng cách join DataFrame
df = df.join(df_meta_selected, on=['asin'], how='left')

In [9]:
# Kiểm tra giá trị null trong DataFrame
null_counts = df.select([f.sum(f.col(column).isNull().cast("int")).alias(column) for column in df.columns])

# Hiển thị kết quả
null_counts.show()

+----+--------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+--------+-----+
|asin|   image|overall|reviewText|reviewTime|reviewerID|reviewerName|  style|summary|unixReviewTime|verified|    vote|title|
+----+--------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+--------+-----+
|   0|51158841|      0|     13834|         0|         0|        1842|1787079|  13922|             0|       0|40882790| 2590|
+----+--------+-------+----------+----------+----------+------------+-------+-------+--------------+--------+--------+-----+



In [9]:
df = df.drop('image','style','verified','vote')

In [None]:
df.count()

51343361

In [10]:
df = df.dropna(subset=["title"])

In [None]:
df.count()

51340771

In [11]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [12]:
from pyspark.sql.functions import to_date, dayofmonth, month, year

# Chuyển đổi cột "reviewTime" sang định dạng ngày tháng
df = df.withColumn("reviewTime", to_date(df["reviewTime"], "MM dd, yyyy"))

# # Tạo các cột "day", "month", và "year" từ cột "reviewTime"
# df = df.withColumn("day", dayofmonth(df["reviewTime"]))
# df = df.withColumn("month", month(df["reviewTime"]))
# df = df.withColumn("year", year(df["reviewTime"]))

In [None]:
# xem phân bố các giá trị ratings
value_counts = df.groupBy("overall").agg(f.count("*").alias("count"))
value_counts.show()

+-------+--------+
|overall|   count|
+-------+--------+
|    1.0| 2089704|
|    4.0| 9562873|
|    3.0| 3837521|
|    2.0| 1851747|
|    5.0|33998918|
|    0.0|       8|
+-------+--------+



In [13]:
# xoá các dòng overall = 0.0 gây mất cân bằng
df = df.filter(df.overall != 0)

In [15]:
value_counts = df.groupBy("overall").agg(f.count("*").alias("count"))
value_counts.show()

+-------+--------+
|overall|   count|
+-------+--------+
|    1.0| 2089704|
|    4.0| 9562873|
|    3.0| 3837521|
|    2.0| 1851747|
|    5.0|33998918|
+-------+--------+



In [14]:
df.show()

+----------+-------+--------------------+----------+--------------+--------------------+--------------------+--------------+--------------------+---+-----+----+
|      asin|overall|          reviewText|reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|               title|day|month|year|
+----------+-------+--------------------+----------+--------------+--------------------+--------------------+--------------+--------------------+---+-----+----+
|0001714422|    5.0|               Nice.|2017-11-08|A1LTMOQIYR3UTJ|       Debra B. Fish|          Five Stars|    1510099200|'C' Is for Clown ...|  8|   11|2017|
|0001714422|    5.0|Very well done my...|2016-11-11|A1QOGGM5FCNIFO|     Amazon Customer|          Five Stars|    1478822400|'C' Is for Clown ...| 11|   11|2016|
|0001714422|    5.0|Unfortunately, no...|2016-01-20| ABO1XPSEK0Y57|     Philip A. Smith|          Five Stars|    1453248000|'C' Is for Clown ...| 20|    1|2016|
|0001714422|    5.0|I bought this 

# CONTENT-BASED

In [None]:
df_meta.createOrReplaceTempView("books")
df.createOrReplaceTempView("ratings")

query = "SELECT ratings.asin, reviewerID, overall, books.title, category \
        FROM ratings left join books on ratings.asin=books.asin"
books_ratings = spark.sql(query)
books_ratings = books_ratings.filter("title is not null")

In [None]:
books_ratings.show()

+----------+--------------+-------+--------------------+--------------------+
|      asin|    reviewerID|overall|               title|            category|
+----------+--------------+-------+--------------------+--------------------+
|0001048775|A2M4YJ7ANBGYKD|    2.0|Measure for Measu...|[Books, Literatur...|
|0001714422| AGG9C66TOLJZB|    5.0|'C' Is for Clown ...|                  []|
|0001714422|A2L4G3DWHN86N1|    4.0|'C' Is for Clown ...|                  []|
|0001714422|A2U1HRXXPZV60W|    5.0|'C' Is for Clown ...|                  []|
|0001714422| ABO1XPSEK0Y57|    5.0|'C' Is for Clown ...|                  []|
|0001714422|A1QOGGM5FCNIFO|    5.0|'C' Is for Clown ...|                  []|
|0001714422|A1PQVNPJY4BC6R|    5.0|'C' Is for Clown ...|                  []|
|0001714422|A1LTMOQIYR3UTJ|    5.0|'C' Is for Clown ...|                  []|
|0001714422|A3SRPFVPZQV05X|    5.0|'C' Is for Clown ...|                  []|
|0001842439|A38YT94H3EGA1L|    5.0| Family at Ditlabeng|[Books, 

In [None]:
books_ratings.createOrReplaceTempView("bookRates")
query = "SELECT asin as bookID, title, avg(overall) as average \
        FROM bookRates \
        GROUP BY bookID, title "
mean_rates = spark.sql(query)

In [None]:
mean_rates.show()

+----------+--------------------+-----------------+
|    bookID|               title|          average|
+----------+--------------------+-----------------+
|0000095699|Everyday Writer, ...|              3.0|
|0001048775|Measure for Measu...|              2.0|
|0001203088|Hilda Boswell's O...|              5.0|
|0001380877|Best Christmas Bo...|              5.0|
|0001384163|Mog and Me (Mog t...|4.666666666666667|
|0001384198|The Little Engine...|4.335112059765208|
|0001714422|'C' Is for Clown ...|            4.875|
|0001720252|Dr.Seuss's ABC (B...|              2.0|
|0001720279|Hooray for Diffen...|4.779220779220779|
|0001720295|Lion, the Witch a...|4.660907127429805|
|0001821121|Paddington Bear (...|              4.5|
|0001842439| Family at Ditlabeng|              5.0|
|0001843834|Gulpilil's Storie...|              5.0|
|0001850164| The Rainbow Serpent|              3.5|
|0001857169|A Treasury of Narnia|              5.0|
|0001931113|       Slice of Snow|              3.0|
|0001932160|

In [None]:
explode_cat = df_meta.select(f.col("asin").alias("bookID"),f.explode("category").alias("genre"))

In [None]:
explode_cat.filter(f.col('genre').isNull()).count()

0

In [None]:
explode_cat.show()

+----------+--------------------+
|    bookID|               genre|
+----------+--------------------+
|000047715X|               Books|
|000047715X|New, Used & Renta...|
|000047715X|Medicine & Health...|
|0000004545|               Books|
|0000004545|  Arts & Photography|
|0000004545|               Music|
|0000013765|               Books|
|0000013765|  Arts & Photography|
|0000013765|               Music|
|0000555010|               Books|
|0000555010|New, Used & Renta...|
|0000555010|Medicine & Health...|
|0000477141|               Books|
|0000477141|       Medical Books|
|0000477141|            Medicine|
|0000230022|               Books|
|0000230022|New, Used & Renta...|
|0000230022|  Business & Finance|
|0000038504|               Books|
|0000038504|Education & Teaching|
+----------+--------------------+
only showing top 20 rows



In [None]:
cat_table = explode_cat.crosstab('bookID', 'genre')

In [None]:
cat_table.show()

+------------+----------+----------------------------+---------------------+---------------------------+---------------------+-----------+-------------------------------------------------------------+-------------------------+----------------------------------+----------------------------+------------------+-----------------+-------------------------------------+-------+---------------------+------------+---------------------+------------------------------+-------+----------+-------+-------------------------------------------------------+--------------------------------------------------------------------------------------------+-------------------------------------------+--------------+-----+------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------

In [None]:
db = books_ratings.select(f.col('asin').alias('bookID'),'category')

In [None]:

pipeModel = Pipeline(stages=[
        HashingTF(inputCol="category", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = pipeModel.transform(db)

db_matches = pipeModel.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)



In [None]:
#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

# COLABORATIVE FILTER

In [14]:
reviewer = df.select('reviewerID','reviewerName').distinct()

In [15]:
reviewer = reviewer.withColumn('newreviewerID', f.monotonically_increasing_id())

In [16]:
reviewer.show()

+--------------+------------------+-------------+
|    reviewerID|      reviewerName|newreviewerID|
+--------------+------------------+-------------+
|A2T7HQQJ3FP9UO|ReadingintheGarden|            0|
|A3PLFWSYCLLQ6S|             sarah|            1|
| AE0DRVSZV9KTI|  RevengeOfThePiff|            2|
| AAIMUROK4XDND|         Sparelead|            3|
| AY0ZFOR0DER5M|              Otac|            4|
|A3QSYJ4NH05RWY|   Amazon Customer|            5|
|A1Y9F2P34K5GL8|            George|            6|
|A3F4P07ULDIQEF|   Amazon Customer|            7|
|A1GYQ0CNZJJ1TT|               n/a|            8|
|A3PAEZJDS0EHKK|        Moshe Amir|            9|
| A7O0Q0KMXUGZG|        Gregory S.|           10|
|A27AWGIJVW0NST|       TJ Fretwell|           11|
|A2NPLH7APMYVBT|             MARTY|           12|
|A2RRP1N82PL7TA|     Daniele Manno|           13|
|A2X4MTE0W8DF6V|        Serious Ed|           14|
|A25GDJ31WLG4U1|           Jenny Q|           15|
|A1N5CRXNUDGH5Q|      P. Stevenson|           16|


In [17]:
df_reviewer_selected = reviewer.select('reviewerID','newreviewerID')

# Thực hiện merge bằng cách join DataFrame
df = df.join(df_reviewer_selected, on=['reviewerID'], how='left')

In [18]:
df2 = df.select('asin', 'newreviewerID', 'overall')

In [19]:
df2 = df2.withColumn("asin", f.col("asin").cast("int"))\
        .withColumn("newreviewerID", f.col("newreviewerID").cast("int"))

In [20]:
df2 = df2.withColumnRenamed("asin","itemID")\
        .withColumnRenamed("newreviewerID","userID")\
        .withColumnRenamed("overall","rating")

In [21]:
df2.printSchema()

root
 |-- itemID: integer (nullable = true)
 |-- userID: integer (nullable = true)
 |-- rating: double (nullable = true)



In [25]:
df2.count()

52426535

In [22]:
# drop null itemID
df2 = df2.dropna(subset=["itemID"])

In [28]:
df2.count()

45580808

In [23]:
training, testing = df2.randomSplit([0.8, 0.2], seed = 42)
training.cache()
testing.cache()

DataFrame[itemID: int, userID: int, rating: double]

## Train ALS model

In [24]:
als = ALS(userCol="userID", itemCol="itemID", ratingCol="rating")


In [25]:
model = als.fit(training)

In [28]:
# Tạo ra top 10 bộ phim khuyến nghị cho mỗi một user
userRecs = model.recommendForAllUsers(10)

In [29]:
# Tạo ra top 10 user khuyến nghị cho mỗi một bộ phim
movieRecs = model.recommendForAllItems(10)

In [30]:
userRecs.printSchema()

root
 |-- userID: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemID: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



### Testing

In [31]:
userID = 433

getuser = userRecs.filter(userRecs.userID == userID).select(f.col('recommendations'))

In [32]:
kq = getuser.rdd.collect()[0][0]

In [54]:
from pyspark.sql import Row
kq_df = spark.createDataFrame(kq)

In [56]:
kq_df.show()

+----------+-----------------+
|    itemID|           rating|
+----------+-----------------+
| 533119871|5.849311351776123|
|1609836189|5.841963291168213|
| 816040141|5.835705280303955|
|1456422804|5.815028190612793|
|1782357556|5.806621551513672|
|1502316439|5.791800022125244|
|1495402681|5.790798187255859|
|1561704075|5.783150672912598|
| 877280592|5.781488418579102|
| 756609631|5.761760234832764|
+----------+-----------------+



In [57]:
df_book_selected = df_meta.select(f.col("asin").alias("itemID"),'title')

kq_df = kq_df.join(df_book_selected, on=['itemID'], how='left')

In [58]:
kq_df.show()

+----------+-----------------+--------------------+
|    itemID|           rating|               title|
+----------+-----------------+--------------------+
|1495402681|5.790798187255859|Detour: road to s...|
|1456422804|5.815028190612793|Winter Foods: Rec...|
|1502316439|5.791800022125244|History of Gustav...|
|1561704075|5.783150672912598|The Zone: Enter t...|
| 877280592|5.781488418579102|          Hatha Yoga|
| 533119871|5.849311351776123|Oh, God, Where Ar...|
|1782357556|5.806621551513672|Three Little Pigs...|
|1609836189|5.841963291168213|Concrete Manual B...|
| 756609631|5.761760234832764|Arts and Crafts (...|
| 816040141|5.835705280303955|Dorothy Einon's L...|
+----------+-----------------+--------------------+



## Đánh giá

In [26]:
predictions = model.transform(testing)

In [27]:
predictions.show()

+-------+------+------+----------+
| itemID|userID|rating|prediction|
+-------+------+------+----------+
|1939777| 10148|   5.0| 4.3585916|
|1720392|  2940|   5.0|  4.601542|
|1384198|  8562|   2.0|  4.467003|
|1386107| 14435|   4.0|  3.556973|
|1048767|  8036|   4.0| 4.3793654|
|1048767| 22403|   5.0|  4.232718|
|1048767| 27397|   1.0| 4.1155477|
|1048767|   532|   5.0| 4.1740484|
|1048767|  1213|   4.0|  4.282963|
|1048767| 27637|   1.0| 4.2621603|
|1050230|  3607|   5.0|  4.269848|
|1050230| 22403|   5.0|  4.280637|
|1921517|  4794|   5.0| 4.7259817|
|1944959| 14043|   5.0|       NaN|
|1720279|  8549|   5.0|  4.666018|
|1720295|  9976|   5.0|  4.681894|
|1381733| 17502|   5.0| 4.6347003|
|1381733| 40130|   1.0|  4.369783|
|1381733| 34317|   1.0| 4.6545115|
|1844423| 24166|   4.0| 4.5115223|
+-------+------+------+----------+
only showing top 20 rows



In [33]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [59]:
rmse = evaluator.evaluate(predictions.na.drop())

print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.0281764968531164


## Tìm siêu tham số

In [35]:
param_grid = (\
    ParamGridBuilder()\
    .addGrid(als.rank,[5, 10])\
    .addGrid(als.maxIter,[20])\
    .addGrid(als.regParam,[0.05, 0.1])\
    .build()
)

In [36]:
crossval = CrossValidator(estimator=als,\
                          estimatorParamMaps=param_grid,\
                          evaluator=evaluator,\
                          numFolds=5)


In [37]:
cv_model = crossval.fit(training)
best_model = cv_model.bestModel

In [50]:
best_model

ALSModel: uid=ALS_c4230bd410d9, rank=5

In [52]:
print("Best model parameters:")
print("  rank:", best_model.rank)
print("  maxIter:", best_model._java_obj.parent().getMaxIter())
print("  regParam:", best_model._java_obj.parent().getRegParam())

Best model parameters:
  rank: 5
  maxIter: 20
  regParam: 0.05


In [61]:
def ALSRecommendation(reviewerID, n=5 ):
    userRecs = best_model.recommendForAllUsers(n)
    getuser = userRecs.filter(userRecs.userID == reviewerID).select(f.col('recommendations'))
    kq_df = spark.createDataFrame(getuser.rdd.collect()[0][0])

    df_book_selected = df_meta.select(f.col("asin").alias("itemID"),'title')

    kq_df = kq_df.join(df_book_selected, on=['itemID'], how='left')
    return kq_df.show()

In [62]:
ALSRecommendation(165)

+----------+-----------------+--------------------+
|    itemID|           rating|               title|
+----------+-----------------+--------------------+
| 844226254|7.863208293914795|Threads &amp; Tie...|
|1520167911| 8.01206111907959|The Manipulative ...|
| 805038183|8.117659568786621|Calendarbears: A ...|
|1566702011|7.997121334075928|Chemical Sensitiv...|
|1530116767|7.895768165588379|Toxic: Dos mundos...|
+----------+-----------------+--------------------+

