In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

In [3]:
SparkContext.setSystemProperty('spark.executor.memory','12g')
sc = SparkContext(master='local', appName='Recommendation_VideoGame')

In [4]:
spark = SparkSession(sc)

In [5]:
data = spark.read.json('Du lieu cung cap/reviews_Video_Games_5.json.gz')

In [6]:
data.show(5, truncate=True)

+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|0700099867|[8, 12]|    1.0|Installing the ga...| 07 9, 2012|A2HD75EMZR8QLN|                 123|Pay to unlock con...|    1341792000|
|0700099867| [0, 0]|    4.0|If you like rally...|06 30, 2013|A3UR8NLLY1ZHCX|Alejandro Henao "...|     Good rally game|    1372550400|
|0700099867| [0, 0]|    1.0|1st shipment rece...|06 28, 2014|A1INA0F5CWW3J4|Amazon Shopper "M...|           Wrong key|    1403913600|
|0700099867|[7, 10]|    3.0|I got this versio...|09 14, 2011|A1DLMTOTHQ4AST|            ampgreen|awesome game, if ...|    1315958400|
|0700099867| [2, 2]|    4.0|I had Dirt 2 on X...|06 14, 2011|A

In [7]:
data_sub = data.select(['asin','overall','reviewerID'])

In [8]:
data_sub.count()

231780

In [9]:
data_sub.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)



In [10]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [11]:
data_sub = data_sub.withColumn('overall', data_sub["overall"].cast(DoubleType()))

In [12]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [13]:
# Distinct users and movies
users = data.select("reviewerId").distinct().count()
products = data.select("asin").distinct().count()
numerator = data.count()

In [14]:
display(numerator, users, products)

231780

24303

10672

In [15]:
# Number of rating matrix could contain if no empty cells
denominator = users * products
denominator

259361616

In [16]:
sparsity = 1- (numerator * 1.0/ denominator)
print("sparsity:"), sparsity

sparsity:


(None, 0.9991063442479476)

### Chuẩn hóa dữ liệu, chuyển đổi dữ liệu

In [17]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [18]:
indexer = StringIndexer(inputCol='asin',
                        outputCol='asin_idx')
data_indexed = indexer.fit(data_sub).transform(data_sub)

indexer1 = StringIndexer(inputCol='reviewerID',
                        outputCol='reviewerID_idx')
data_indexed = indexer1.fit(data_indexed).transform(data_indexed)

In [19]:
data_indexed.show(5, truncate=True)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|0700099867|    1.0|A2HD75EMZR8QLN|  2269.0|       14157.0|
|0700099867|    4.0|A3UR8NLLY1ZHCX|  2269.0|       22489.0|
|0700099867|    1.0|A1INA0F5CWW3J4|  2269.0|        7934.0|
|0700099867|    3.0|A1DLMTOTHQ4AST|  2269.0|        7852.0|
|0700099867|    4.0|A361M14PU2GUEG|  2269.0|         847.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



### Train model

In [20]:
# Chia dữ liệu train_test
# Smaller dataset so we will use 0.8/0.2
(training, test)= data_indexed.randomSplit([0.8,0.2])

In [21]:
# xây dựng model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10,
          regParam=0.09,
          rank=25,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)
# rank cho to hơn vì phim nhiều và user nhiều

In [22]:
data_indexed.select("overall").describe().show()

+-------+------------------+
|summary|           overall|
+-------+------------------+
|  count|            231780|
|   mean| 4.086396582966606|
| stddev|1.2023296087789057|
|    min|               1.0|
|    max|               5.0|
+-------+------------------+



### Đánh giá kết quả

In [23]:
predictions = model.transform(test)

In [24]:
predictions.show(5)

+----------+-------+--------------+--------+--------------+----------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|prediction|
+----------+-------+--------------+--------+--------------+----------+
|B00BMFIXT2|    4.0|A1VQBEW0G4IH1J|   148.0|       18652.0|  3.787307|
|B00BMFIXT2|    3.0|A2YJ1K8M0KPHMF|   148.0|          93.0| 3.7883072|
|B00BMFIXT2|    5.0|A19BG3PQ0DMYPF|   148.0|       17377.0|  4.573931|
|B00BMFIXT2|    5.0| A9PAC5VFTDQHF|   148.0|          94.0|  4.068368|
|B00BMFIXT2|    5.0|A369H7AU0HYUAT|   148.0|        4846.0|  4.987848|
+----------+-------+--------------+--------+--------------+----------+
only showing top 5 rows



In [25]:
from pyspark.ml.evaluation import RegressionEvaluator

In [26]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean square error = "+ str(rmse))

Root-mean square error = 1.2081450851042443


### Hiệu chỉnh tham số (tunning parameter)

In [27]:
als_t = ALS(maxIter=10,
          regParam=0.1,
          rank = 30,
          userCol="reviewerID_idx",
          itemCol="asin_idx",
          ratingCol="overall",
          coldStartStrategy="drop",
          nonnegative=True)
model_t = als_t.fit(training)

In [28]:
predictions_t = model_t.transform(test)

In [29]:
rmse_t = evaluator.evaluate(predictions_t)
print("Root-mean square error = "+ str(rmse_t))

Root-mean square error = 1.2002357025505224


Nhận xét: Tốt hơn vì rmse nhỏ hơn nên sẽ sử dụng model này

### Đưa ra đề xuất cho tất cả users

In [30]:
# Get 5 recommendations which have highest rating
user_recs = model_t.recommendForAllUsers(5)

In [31]:
user_recs.show(10,truncate=False)

+--------------+-----------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                                |
+--------------+-----------------------------------------------------------------------------------------------+
|1580          |[{6067, 5.973857}, {6582, 5.6582165}, {6190, 5.654786}, {5437, 5.6509037}, {4651, 5.6392984}]  |
|4900          |[{6067, 6.138893}, {8935, 5.8081117}, {5437, 5.7383227}, {10027, 5.6957746}, {10527, 5.668173}]|
|5300          |[{3811, 5.3455973}, {6067, 5.196928}, {1389, 5.1456923}, {4651, 5.0612917}, {3559, 5.059187}]  |
|6620          |[{3811, 5.568969}, {4651, 5.547037}, {6067, 5.518276}, {3964, 5.4003363}, {8956, 5.332127}]    |
|7240          |[{9095, 5.986202}, {8935, 5.8964634}, {7425, 5.892434}, {6024, 5.8868823}, {1884, 5.839867}]   |
|7340          |[{6067, 5.060079}, {6190, 5.0509815}, {5531, 4.8464823}, {5437, 4.8339524}, {827

In [32]:
user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [33]:
for user in user_recs.head(3):
    print(user)
    print("\n")

Row(reviewerID_idx=1580, recommendations=[Row(asin_idx=6067, rating=5.9738569259643555), Row(asin_idx=6582, rating=5.65821647644043), Row(asin_idx=6190, rating=5.654786109924316), Row(asin_idx=5437, rating=5.650903701782227), Row(asin_idx=4651, rating=5.639298439025879)])


Row(reviewerID_idx=4900, recommendations=[Row(asin_idx=6067, rating=6.138893127441406), Row(asin_idx=8935, rating=5.808111667633057), Row(asin_idx=5437, rating=5.738322734832764), Row(asin_idx=10027, rating=5.695774555206299), Row(asin_idx=10527, rating=5.668172836303711)])


Row(reviewerID_idx=5300, recommendations=[Row(asin_idx=3811, rating=5.345597267150879), Row(asin_idx=6067, rating=5.196928024291992), Row(asin_idx=1389, rating=5.145692348480225), Row(asin_idx=4651, rating=5.061291694641113), Row(asin_idx=3559, rating=5.059186935424805)])




### Save to file

In [34]:
from time import time

In [35]:
t0 = time()

In [36]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()

In [37]:
df_reviewer_reviewer_id.count()

24303

In [38]:
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|       20806.0|A2ZYJOZO6BPV6K|
|         735.0|A3TQTYD0D6AUO3|
|        2580.0|A2QVKLB1VT903K|
|        9117.0|A3OMBKL5EOHA36|
|        2945.0|A2NWQA506BES77|
+--------------+--------------+
only showing top 5 rows



In [39]:
df_asin_asin_id = data_indexed.select('asin_idx', 'asin').distinct()

In [40]:
df_asin_asin_id.count()

10672

In [41]:
df_asin_asin_id.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|   883.0|B000038IFX|
|  2005.0|B00005Q8J1|
|  4809.0|B00005YYFE|
|  3085.0|B00006F2ZR|
|  4821.0|B00007KUW5|
+--------+----------+
only showing top 5 rows



In [42]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how="left")

In [43]:
new_user_recs.show(10, truncate=False)

+--------------+----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                               |reviewerID    |
+--------------+----------------------------------------------------------------------------------------------+--------------+
|299           |[{6067, 6.39593}, {8685, 6.2984905}, {9342, 6.2089605}, {1691, 6.1321745}, {1389, 6.080168}]  |A357A1TI51VT1S|
|305           |[{4651, 5.729057}, {6067, 5.4909596}, {6190, 5.4886765}, {6860, 5.4625597}, {8956, 5.4054456}]|A150XV12S9HAZ |
|496           |[{6067, 5.7689414}, {6582, 5.7520547}, {8956, 5.516962}, {3964, 5.5011563}, {8019, 5.4669366}]|A12WZTC4YJ8ZEC|
|558           |[{6067, 5.5897017}, {4651, 5.5648694}, {8956, 5.4224405}, {3964, 5.406923}, {5437, 5.3940496}]|A1Y5LUJZ8879PP|
|596           |[{6159, 5.5026884}, {3811, 5.458193}, {10527, 5.4280777}, {6580, 5.4014454}, {6067, 5.30044}] |

In [44]:
new_user_recs.count()

24302

### Save to disk

In [45]:
new_user_recs.write.parquet('VideoGames_U.parquet', mode='overwrite')
df_asin_asin_id.write.parquet('VideoGames_P.parquet', mode='overwrite')

In [46]:
time_duration = time() - 10
print(time_duration)

1642497314.6962535


### Đưa ra đề xuất cho một userid cụ thể

In [47]:
reviewerId = "A29KT7UP7DLM1J"
find_user_rec = new_user_recs.filter(new_user_recs['reviewerId'] == reviewerId)
user = find_user_rec.first()

lst=[]
for row in user['recommendations']:
    row_f = df_asin_asin_id.filter(df_asin_asin_id.asin_idx == row['asin_idx'])
    row_f_first = row_f.first()
    lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations':lst}

In [48]:
dic_user_rec

{'reviewerID': 'A29KT7UP7DLM1J',
 'recommendations': [(4532, 'B000G7WGP4', 3.9754128456115723),
  (3811, 'B00ATST8HY', 3.94747257232666),
  (9084, 'B005GWU1M4', 3.875537872314453),
  (3964, 'B001J5SFC0', 3.854609489440918),
  (8095, 'B007TY84MG', 3.809636116027832)]}

### Đọc 2 file đã lưu để lấy dữ liệu đầu vào => Đề xuất

In [49]:
new_user_recs = spark.read.parquet('VideoGames_U.parquet')

In [50]:
new_user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- reviewerID: string (nullable = true)



In [51]:
new_user_recs.show(2)

+--------------+--------------------+--------------+
|reviewerID_idx|     recommendations|    reviewerID|
+--------------+--------------------+--------------+
|           255|[{6067, 6.249302}...|A3KKM0T1KY42HA|
|           271|[{6067, 6.408162}...|A39CYUXTVGQSCF|
+--------------+--------------------+--------------+
only showing top 2 rows



In [52]:
df_asin_asin_id = spark.read.parquet('VideoGames_P.parquet')

In [53]:
df_asin_asin_id.show(2)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|   290.0|B00000F1GM|
|  8184.0|B00000JHPN|
+--------+----------+
only showing top 2 rows



In [54]:
# Recommendation for some users: A29KT7UP7DLM1J, A1WGVOVABHFDF3, A3DIS5O83SQJWW
reviewerId = "A29KT7UP7DLM1J"
find_user_rec = new_user_recs.filter(new_user_recs['reviewerId'] == reviewerId)
find_user_rec.show(truncate=False)

+--------------+----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                               |reviewerID    |
+--------------+----------------------------------------------------------------------------------------------+--------------+
|4622          |[{4532, 3.9754128}, {3811, 3.9474726}, {9084, 3.8755379}, {3964, 3.8546095}, {8095, 3.809636}]|A29KT7UP7DLM1J|
+--------------+----------------------------------------------------------------------------------------------+--------------+



In [55]:
result = ''
for user in find_user_rec.collect():
    lst=[]
    for row in user['recommendations']:
        print(row)
        row_f = df_asin_asin_id.filter(df_asin_asin_id.asin_idx == row['asin_idx'])
        row_f_first = row_f.first()
        lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
    dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations':lst}
    result = dic_user_rec

Row(asin_idx=4532, rating=3.9754128456115723)
Row(asin_idx=3811, rating=3.94747257232666)
Row(asin_idx=9084, rating=3.875537872314453)
Row(asin_idx=3964, rating=3.854609489440918)
Row(asin_idx=8095, rating=3.809636116027832)


In [56]:
reviewerId = "A1WGVOVABHFDF3"
find_user_rec = new_user_recs.filter(new_user_recs['reviewerId'] == reviewerId)
find_user_rec.show(truncate=False)

+--------------+--------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                             |reviewerID    |
+--------------+--------------------------------------------------------------------------------------------+--------------+
|6623          |[{6067, 5.597731}, {1389, 5.45756}, {3811, 5.4493628}, {5114, 5.383317}, {10460, 5.3671975}]|A1WGVOVABHFDF3|
+--------------+--------------------------------------------------------------------------------------------+--------------+



In [57]:
reviewerId = "A3DIS5O83SQJWW"
find_user_rec = new_user_recs.filter(new_user_recs['reviewerId'] == reviewerId)
find_user_rec.show(truncate=False)

+--------------+---------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                              |reviewerID    |
+--------------+---------------------------------------------------------------------------------------------+--------------+
|780           |[{6067, 5.777022}, {6582, 5.5520196}, {6955, 5.4805064}, {8935, 5.436106}, {8869, 5.3869433}]|A3DIS5O83SQJWW|
+--------------+---------------------------------------------------------------------------------------------+--------------+

