# Câu 5: Recommendation - Amazon - Office Products (1.0 điểm)

## only have one laptop/computer:
- Use the information "reviewerID", "asin" (ProductID), and “overall” (users’ ratings for each product) in dataset reviews_Office_Products_5.json.gz to build a model to predict overalls for products that have not been selected 
by users. 
- Then make recommendations to some users: A3CJ7MHAS9IMAM, A3LGT6UZL99IW1, A21KNRUAA5RK5E

Read more information here: 
https://cseweb.ucsd.edu/~jmcauley/datasets/amazon/links.html

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()
spark

In [4]:
data = spark.read.json("reviews_Office_Products_5.json.gz")

In [5]:
data.show(5,truncate=True)

+----------+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------+
|B00000JBLH| [3, 4]|    5.0|I bought my first...| 09 3, 2004|A32T2H8150OJLU|                ARH|A solid performer...|    1094169600|
|B00000JBLH| [7, 9]|    5.0|WHY THIS BELATED ...|12 15, 2007|A3MAFS04ZABRGO|   Let it Be "Alan"|Price of GOLD is ...|    1197676800|
|B00000JBLH| [3, 3]|    2.0|I have an HP 48GX...| 01 1, 2011|A1F1A0QQP2XVH5|             Mark B|Good functionalit...|    1293840000|
|B00000JBLH| [7, 8]|    5.0|I've started doin...|04 19, 2006| A49R5DBXXQDE5|       R. D Johnson|One of the last o...|    1145404800|
|B00000JBLH| [0, 0]|    5.0|For simple calcul...| 08 4, 2013|A2XRMQA6

In [6]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])

In [7]:
data_sub.count()

53258

In [8]:
data_sub.show(5, truncate=True)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|B00000JBLH|    5.0|A32T2H8150OJLU|
|B00000JBLH|    5.0|A3MAFS04ZABRGO|
|B00000JBLH|    2.0|A1F1A0QQP2XVH5|
|B00000JBLH|    5.0| A49R5DBXXQDE5|
|B00000JBLH|    5.0|A2XRMQA6PJ5ZJ8|
+----------+-------+--------------+
only showing top 5 rows



In [9]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import isnan, when, count, col, udf

In [10]:
data_sub.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)



In [11]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [12]:
# Distinct users and movies
users = data_sub.select("reviewerID").distinct().count()
products = data_sub.select("asin").distinct().count()
numerator = data_sub.count()

In [13]:
display(numerator, users, products)

53258

4905

2420

In [14]:
# Number of ratings matrix could contain if no empty cells
denominator = users * products
denominator

11870100

In [15]:
#Calculating sparsity
sparsity = 1 - (numerator*1.0 / denominator)
print ("Sparsity: "), sparsity

Sparsity: 


(None, 0.9955132644206873)

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [17]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [18]:
# Create an indexer
indexer = StringIndexer(inputCol='asin', 
                        outputCol='asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol='reviewerID', 
                         outputCol='reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [19]:
data_indexed.show(5, truncate=True)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|B00000JBLH|    5.0|A32T2H8150OJLU|  1444.0|         286.0|
|B00000JBLH|    5.0|A3MAFS04ZABRGO|  1444.0|        4499.0|
|B00000JBLH|    2.0|A1F1A0QQP2XVH5|  1444.0|         211.0|
|B00000JBLH|    5.0| A49R5DBXXQDE5|  1444.0|        4604.0|
|B00000JBLH|    5.0|A2XRMQA6PJ5ZJ8|  1444.0|        1330.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [20]:
data_indexed.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- asin_idx: double (nullable = false)
 |-- reviewerID_idx: double (nullable = false)



In [21]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_indexed.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewerID_idx,0


In [22]:
# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [23]:
# Creating ALS model and fitting data
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from time import time

In [24]:
als = ALS(maxIter=10, 
          regParam=0.09,           
          rank = 25,
          userCol="reviewerID_idx", 
          itemCol="asin_idx", 
          ratingCol="overall", 
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)

In [25]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [26]:
predictions.show(3)

+----------+-------+--------------+--------+--------------+----------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|prediction|
+----------+-------+--------------+--------+--------------+----------+
|B00004Z5SM|    5.0|A2HPVNZZF15W93|   111.0|         148.0|  5.378322|
|B0000721Z3|    3.0| AZL1JK2B50D2H|   769.0|        4900.0| 3.1529493|
|B0000C1XHY|    4.0| AHROSV6WJOEGY|   947.0|        1238.0|  1.557443|
+----------+-------+--------------+--------+--------------+----------+
only showing top 3 rows



In [27]:
evaluator = RegressionEvaluator(metricName="rmse", 
                                labelCol="overall",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [28]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.982135679243817


In [29]:
# save hdfs, nho xoa file da co hoac doi ten file
model.save("Rec_Amazon_OfficeProduct_model")

### Providing Recommendations: for all users

In [30]:
# get 5 recommendations which have highest rating.
user_recs = model.recommendForAllUsers(5) 

In [31]:
user_recs.show(5, truncate=False)

+--------------+---------------------------------------------------------------------------------------------+
|reviewerID_idx|recommendations                                                                              |
+--------------+---------------------------------------------------------------------------------------------+
|12            |[{2247, 5.835313}, {793, 5.79998}, {1991, 5.796389}, {2226, 5.756344}, {2396, 5.745772}]     |
|22            |[{1984, 5.538006}, {2396, 5.51104}, {1911, 5.406266}, {1991, 5.3912053}, {1421, 5.3553834}]  |
|26            |[{2247, 6.4652405}, {1984, 6.4511175}, {1985, 6.136538}, {1421, 6.129053}, {2199, 6.118365}] |
|27            |[{1538, 5.489496}, {1697, 5.460419}, {1415, 5.3401423}, {793, 5.3184586}, {1985, 5.2797318}] |
|28            |[{1984, 5.662718}, {1985, 5.4777637}, {1421, 5.473986}, {2247, 5.4734344}, {2396, 5.4222794}]|
+--------------+---------------------------------------------------------------------------------------------+
o

In [32]:
user_recs.count()

4905

In [33]:
for user in user_recs.head(5):
    print(user)
    print("\n")

Row(reviewerID_idx=12, recommendations=[Row(asin_idx=2247, rating=5.835312843322754), Row(asin_idx=793, rating=5.799980163574219), Row(asin_idx=1991, rating=5.796389102935791), Row(asin_idx=2226, rating=5.756343841552734), Row(asin_idx=2396, rating=5.745771884918213)])


Row(reviewerID_idx=22, recommendations=[Row(asin_idx=1984, rating=5.538005828857422), Row(asin_idx=2396, rating=5.511040210723877), Row(asin_idx=1911, rating=5.406266212463379), Row(asin_idx=1991, rating=5.391205310821533), Row(asin_idx=1421, rating=5.355383396148682)])


Row(reviewerID_idx=26, recommendations=[Row(asin_idx=2247, rating=6.465240478515625), Row(asin_idx=1984, rating=6.451117515563965), Row(asin_idx=1985, rating=6.136538028717041), Row(asin_idx=1421, rating=6.129053115844727), Row(asin_idx=2199, rating=6.1183648109436035)])


Row(reviewerID_idx=27, recommendations=[Row(asin_idx=1538, rating=5.489496231079102), Row(asin_idx=1697, rating=5.460419178009033), Row(asin_idx=1415, rating=5.340142250061035), Row

### Converting back to string form

In [34]:
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()

In [35]:
df_reviewer_reviewer_id.count()

4905

In [36]:
df_reviewer_reviewer_id.show(5)

+--------------+--------------+
|reviewerID_idx|    reviewerID|
+--------------+--------------+
|         103.0|A2JXCOUGYS5Y4J|
|         305.0|A1T61QP7QHYBRQ|
|        1228.0|  AC2278WPK3EU|
|        4887.0| AYI9RQ4YYM9W1|
|         579.0|A343C98QJO0JBE|
+--------------+--------------+
only showing top 5 rows



In [37]:
df_asin_asin_idx = data_indexed.select('asin_idx', 'asin').distinct()

In [38]:
df_asin_asin_idx.count()

2420

In [39]:
df_asin_asin_idx.show(5)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  1613.0|B00006I62X|
|  1631.0|B0001DBHNA|
|  1364.0|B0013CIJSO|
|   394.0|B0013CM584|
|   473.0|B001CSMJJE|
+--------+----------+
only showing top 5 rows



In [40]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['reviewerID_idx'], how='left')

In [41]:
new_user_recs.show(5, truncate=False)

+--------------+---------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                              |reviewerID    |
+--------------+---------------------------------------------------------------------------------------------+--------------+
|12            |[{2247, 5.835313}, {793, 5.79998}, {1991, 5.796389}, {2226, 5.756344}, {2396, 5.745772}]     |A376OJHLE6SU9Q|
|22            |[{1984, 5.538006}, {2396, 5.51104}, {1911, 5.406266}, {1991, 5.3912053}, {1421, 5.3553834}]  |A13U975DFXBU44|
|26            |[{2247, 6.4652405}, {1984, 6.4511175}, {1985, 6.136538}, {1421, 6.129053}, {2199, 6.118365}] |A1X3ESYZ79H59E|
|27            |[{1538, 5.489496}, {1697, 5.460419}, {1415, 5.3401423}, {793, 5.3184586}, {1985, 5.2797318}] |A38BE06WWL20AY|
|28            |[{1984, 5.662718}, {1985, 5.4777637}, {1421, 5.473986}, {2247, 5.4734344}, {2396, 5.4222794}]|A3NHUQ33

In [42]:
new_user_recs.count()

4905

In [43]:
# Save to hdfs
new_user_recs.write.parquet('new_user_recs', mode='overwrite')
df_asin_asin_idx.write.parquet('asin_asin_idx', mode='overwrite')

In [44]:
new_user_recs.toPandas().to_csv('new_user_recommendations.csv', index=False)

In [45]:
df_asin_asin_idx.toPandas().to_csv('df_asin_asin_idx.csv', index=False)

In [46]:
from pyspark.sql.functions import explode, col

In [47]:
# Explode the recommendations column to get each recommendation as a separate row
exploded_df = new_user_recs.select("reviewerID", explode("recommendations").alias("exploded_rec"))

# Join with df_asin_asin_idx to replace asin_idx with asin
df_result = exploded_df.join(df_asin_asin_idx, exploded_df["exploded_rec.asin_idx"] == df_asin_asin_idx["asin_idx"], "left") \
                       .select("reviewerID", col("asin").alias("recommendations"), "exploded_rec.rating")

In [48]:
df_result.show(truncate=False)

+--------------+---------------+---------+
|reviewerID    |recommendations|rating   |
+--------------+---------------+---------+
|A376OJHLE6SU9Q|B001HA5PDE     |5.835313 |
|A376OJHLE6SU9Q|B00006JNJR     |5.79998  |
|A376OJHLE6SU9Q|B0041WISRK     |5.796389 |
|A376OJHLE6SU9Q|B001B0ASBO     |5.756344 |
|A376OJHLE6SU9Q|B008664QXW     |5.745772 |
|A13U975DFXBU44|B003XOXUK2     |5.538006 |
|A13U975DFXBU44|B008664QXW     |5.51104  |
|A13U975DFXBU44|B001554FBE     |5.406266 |
|A13U975DFXBU44|B0041WISRK     |5.3912053|
|A13U975DFXBU44|B006LNU00K     |5.3553834|
|A1X3ESYZ79H59E|B001HA5PDE     |6.4652405|
|A1X3ESYZ79H59E|B003XOXUK2     |6.4511175|
|A1X3ESYZ79H59E|B003XR480U     |6.136538 |
|A1X3ESYZ79H59E|B006LNU00K     |6.129053 |
|A1X3ESYZ79H59E|B000XHY15A     |6.118365 |
|A38BE06WWL20AY|B001PMGB2I     |5.489496 |
|A38BE06WWL20AY|B0017LLYZG     |5.460419 |
|A38BE06WWL20AY|B004ISGG7Q     |5.3401423|
|A38BE06WWL20AY|B00006JNJR     |5.3184586|
|A38BE06WWL20AY|B003XR480U     |5.2797318|
+----------

In [49]:
df_result.toPandas().to_csv('amazon_office_product_recommendations.csv', header=True, index=False)

## Make recommendations to some users: 

A3CJ7MHAS9IMAM, A3LGT6UZL99IW1, A21KNRUAA5RK5E

In [50]:
def get_recommendations(reviewerID):
    find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
    user = find_user_rec.first()
    lst = []
    
    for row in user['recommendations']:
        row_f = df_asin_asin_idx.filter(df_asin_asin_idx['asin_idx'] == row['asin_idx'])
        row_f_first = row_f.first()
        lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
    
    dic_user_rec = {'reviewerID': user['reviewerID'], 'recommendations': lst}
    
    return dic_user_rec



In [51]:
# Recommendations for the specified users
users_to_recommend = ['A3CJ7MHAS9IMAM', 'A3LGT6UZL99IW1', 'A21KNRUAA5RK5E']

In [52]:
for user_id in users_to_recommend:
    print("List of products that ", user_id, " rated")
    test.filter(test['reviewerID']==user_id).sort("overall", ascending = False).show()

List of products that  A3CJ7MHAS9IMAM  rated


+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|B005X9VZ70|    5.0|A3CJ7MHAS9IMAM|    70.0|         414.0|
|B008587M7I|    5.0|A3CJ7MHAS9IMAM|   136.0|         414.0|
|B00D3EWHU6|    4.0|A3CJ7MHAS9IMAM|   145.0|         414.0|
|B009QYH32O|    3.0|A3CJ7MHAS9IMAM|   428.0|         414.0|
+----------+-------+--------------+--------+--------------+

List of products that  A3LGT6UZL99IW1  rated
+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|B002M7W098|    5.0|A3LGT6UZL99IW1|   254.0|         185.0|
|B002NU5ND4|    5.0|A3LGT6UZL99IW1|    26.0|         185.0|
|B0039N7ELS|    5.0|A3LGT6UZL99IW1|     1.0|         185.0|
|B004VJSG3E|    5.0|A3LGT6UZL99IW1|    95.0|         185.0|
|B007XPBW1G|    5.0|A3LGT6UZL99IW1|    71.0|         1

In [53]:
all_recommendations = []
for user_id in users_to_recommend:
    print('Recommendation for: ', user_id)
    recommendations = get_recommendations(user_id)
    print(recommendations)
    all_recommendations.append(recommendations)

Recommendation for:  A3CJ7MHAS9IMAM
{'reviewerID': 'A3CJ7MHAS9IMAM', 'recommendations': [(1984, 'B003XOXUK2', 5.433505058288574), (2247, 'B001HA5PDE', 5.37818717956543), (1941, 'B001PMHUSW', 5.343048572540283), (2396, 'B008664QXW', 5.324159622192383), (2052, 'B00000K3T0', 5.320980548858643)]}
Recommendation for:  A3LGT6UZL99IW1
{'reviewerID': 'A3LGT6UZL99IW1', 'recommendations': [(1984, 'B003XOXUK2', 6.233952045440674), (2396, 'B008664QXW', 6.065934658050537), (2374, 'B0056DIYBS', 6.022498607635498), (1699, 'B001AKX59C', 5.999139308929443), (2090, 'B00006JNJJ', 5.978412628173828)]}
Recommendation for:  A21KNRUAA5RK5E
{'reviewerID': 'A21KNRUAA5RK5E', 'recommendations': [(1984, 'B003XOXUK2', 4.99719762802124), (1022, 'B00358RIRC', 4.747041702270508), (1072, 'B0011G47PQ', 4.70930290222168), (1554, 'B002NS0ZCK', 4.6887102127075195), (2304, 'B0035JJJ96', 4.619287490844727)]}


In [54]:
df_recommendations = spark.createDataFrame(all_recommendations).select('reviewerID', 'recommendations')

In [55]:
df_recommendations.show( truncate=False)

+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|reviewerID    |recommendations                                                                                                                                                                                    |
+--------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|A3CJ7MHAS9IMAM|[{1984, B003XOXUK2, 5.433505058288574}, {2247, B001HA5PDE, 5.37818717956543}, {1941, B001PMHUSW, 5.343048572540283}, {2396, B008664QXW, 5.324159622192383}, {2052, B00000K3T0, 5.320980548858643}] |
|A3LGT6UZL99IW1|[{1984, B003XOXUK2, 6.233952045440674}, {2396, B008664QXW, 6.065934658050537}, {2374, B0056DIYBS, 6.022498607635498}, {1699, B001AKX

In [56]:
df_recommendations.toPandas().to_csv('new_recommendations.csv', index=False)