In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode

In [None]:
SparkContext.setSystemProperty('spark.executor.memory','12g')
sc = SparkContext(master='local', appName='Recommendation')

In [None]:
spark = SparkSession(sc)

In [None]:
# data1 = spark.read.csv('Cung cap HV/Product_new.csv',header=True,inferSchema=True)
data = spark.read.csv('Cung cap HV/Review_new.csv',header=True,inferSchema=True)

In [None]:
data.show(5, truncate=True)

In [None]:
data_sub = data.select(['product_id','rating','customer_id'])

In [None]:
data.count()

In [None]:
data_sub.printSchema()

In [None]:
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import isnan, when, count, col, udf

In [None]:
data_sub = data_sub.withColumn('rating', data_sub["rating"].cast(DoubleType()))
data_sub = data_sub.withColumn('customer_id', data_sub["customer_id"].cast(IntegerType()))
data_sub = data_sub.withColumn('product_id', data_sub["product_id"].cast(IntegerType()))

In [None]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

In [None]:
data_sub = data_sub.na.drop(how='any')

In [None]:
data_sub.count()

In [None]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in 
           data_sub.columns]).toPandas().T

In [None]:
# Distinct users and products
users = data_sub.select("customer_id").distinct().count()
products = data_sub.select("product_id").distinct().count()
numerator = data_sub.count()

In [None]:
display(numerator, users, products)

In [None]:
# Number of rating matrix could contain if no empty cells
denominator = users * products
denominator

In [None]:
sparsity = 1- (numerator * 1.0/ denominator)
print("sparsity:"), sparsity

### Feature Transformation

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [None]:
indexer = StringIndexer(inputCol='product_id',
                        outputCol='product_id_idx')
data_indexed = indexer.fit(data_sub).transform(data_sub)

indexer1 = StringIndexer(inputCol='customer_id',
                        outputCol='customer_id_idx')
data_indexed = indexer1.fit(data_indexed).transform(data_indexed)

In [None]:
data_indexed.show(5, truncate=True)

### Train model

In [None]:
# Chia dữ liệu train_test
# Smaller dataset so we will use 0.8/0.2
(training, test)= data_indexed.randomSplit([0.8,0.2])

In [None]:
# xây dựng model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

als = ALS(maxIter=10,
          regParam=0.09,
          rank=25,
          userCol="customer_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative=True)
model = als.fit(training)
# rank cho to hơn vì phim nhiều và user nhiều

In [None]:
data_indexed.select("rating").describe().show()

### Evaluate model

In [None]:
predictions = model.transform(test)

In [None]:
predictions.show(5)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator(metricName='rmse',
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean square error = "+ str(rmse))

### Tunning parameter

In [None]:
ALSExplicit = ALS( implicitPrefs=False, userCol="customer_id_idx", itemCol="product_id_idx", ratingCol="rating",
          coldStartStrategy="drop")

defaultModel = ALSExplicit.fit(training)

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

In [None]:
paramMapExplicit = ParamGridBuilder() \
                    .addGrid(ALSExplicit.rank, [30, 40,50]) \
                    .addGrid(ALSExplicit.maxIter, [5,10,15,20]) \
                    .addGrid(ALSExplicit.regParam, [0.1,0.01,0.001]) \
                    .addGrid(ALSExplicit.alpha, [2.0,3.0]) \
                    .build()

In [None]:
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

In [None]:
CVALSExplicit = CrossValidator(estimator=ALSExplicit,
                            estimatorParamMaps=paramMapExplicit,
                            evaluator=evaluatorR,
                           numFolds=5)


CVModelEXplicit = CVALSExplicit.fit(training)

In [None]:
predictions_t = CVModelEXplicit.transform(test)

In [None]:
rmse_t = evaluator.evaluate(predictions_t)
print("Root-mean square error = "+ str(rmse_t))

In [30]:
als_t = ALS(maxIter=10,
          regParam=0.1,
          rank = 30,
          userCol="customer_id_idx",
          itemCol="product_id_idx",
          ratingCol="rating",
          coldStartStrategy="drop",
          nonnegative=True)
model_t = als_t.fit(training)

In [31]:
predictions_t = model_t.transform(test)

In [32]:
rmse_t = evaluator.evaluate(predictions_t)
print("Root-mean square error = "+ str(rmse_t))

Root-mean square error = 1.5686073972761045


Nhận xét: Tốt hơn vì rmse nhỏ hơn nên sẽ sử dụng model này

### Make recommendations to all users

In [33]:
# Get 5 recommendations which have highest rating
user_recs = model_t.recommendForAllUsers(5)

In [34]:
user_recs.show(10,truncate=False)

+---------------+-----------------------------------------------------------------------------------------------+
|customer_id_idx|recommendations                                                                                |
+---------------+-----------------------------------------------------------------------------------------------+
|148            |[{2620, 7.9994826}, {3315, 7.9726677}, {3519, 7.7538385}, {3691, 7.7381005}, {2794, 7.6134644}]|
|463            |[{3519, 7.3069124}, {2815, 7.2635875}, {2620, 7.053291}, {3237, 7.0232787}, {3814, 7.020816}]  |
|471            |[{3315, 6.713652}, {3571, 6.674263}, {2702, 6.5250473}, {3755, 6.468761}, {3125, 6.464198}]    |
|496            |[{2620, 8.170576}, {3315, 8.141822}, {3125, 8.076595}, {3755, 7.953744}, {3691, 7.834151}]     |
|833            |[{3247, 7.4129047}, {2929, 7.2292795}, {2493, 7.2191615}, {3836, 7.2082977}, {2620, 7.096886}] |
|1088           |[{3315, 7.5833273}, {2620, 7.395566}, {2730, 7.2758083}, {3755, 7.23207

In [35]:
user_recs.printSchema()

root
 |-- customer_id_idx: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [36]:
for user in user_recs.head(3):
    print(user)
    print("\n")

Row(customer_id_idx=148, recommendations=[Row(product_id_idx=2620, rating=7.99948263168335), Row(product_id_idx=3315, rating=7.972667694091797), Row(product_id_idx=3519, rating=7.753838539123535), Row(product_id_idx=3691, rating=7.738100528717041), Row(product_id_idx=2794, rating=7.61346435546875)])


Row(customer_id_idx=463, recommendations=[Row(product_id_idx=3519, rating=7.306912422180176), Row(product_id_idx=2815, rating=7.263587474822998), Row(product_id_idx=2620, rating=7.053290843963623), Row(product_id_idx=3237, rating=7.023278713226318), Row(product_id_idx=3814, rating=7.020815849304199)])


Row(customer_id_idx=471, recommendations=[Row(product_id_idx=3315, rating=6.71365213394165), Row(product_id_idx=3571, rating=6.674263000488281), Row(product_id_idx=2702, rating=6.525047302246094), Row(product_id_idx=3755, rating=6.468760967254639), Row(product_id_idx=3125, rating=6.464198112487793)])




### Save to file

In [37]:
from time import time

In [38]:
t0 = time()

In [39]:
df_reviewer_reviewer_id = data_indexed.select('customer_id_idx', 'customer_id').distinct()

In [40]:
df_reviewer_reviewer_id.count()

251125

In [41]:
df_reviewer_reviewer_id.show(5)

+---------------+-----------+
|customer_id_idx|customer_id|
+---------------+-----------+
|        36902.0|   16822674|
|       213936.0|    6885375|
|        88043.0|   12483858|
|        12474.0|   11690857|
|        21554.0|    7859125|
+---------------+-----------+
only showing top 5 rows



In [42]:
df_asin_asin_id = data_indexed.select('product_id_idx', 'product_id').distinct()

In [43]:
df_asin_asin_id.count()

4214

In [44]:
df_asin_asin_id.show(5)

+--------------+----------+
|product_id_idx|product_id|
+--------------+----------+
|        1600.0|  11251743|
|        2463.0|  13678777|
|         785.0|  14934073|
|        1885.0|  20393324|
|        2893.0|  25693431|
+--------------+----------+
only showing top 5 rows



In [45]:
new_user_recs = user_recs.join(df_reviewer_reviewer_id, on=['customer_id_idx'], how="left")

In [46]:
new_user_recs.show(10, truncate=False)

+---------------+-----------------------------------------------------------------------------------------------+-----------+
|customer_id_idx|recommendations                                                                                |customer_id|
+---------------+-----------------------------------------------------------------------------------------------+-----------+
|148            |[{2620, 7.9994826}, {3315, 7.9726677}, {3519, 7.7538385}, {3691, 7.7381005}, {2794, 7.6134644}]|6206297    |
|463            |[{3519, 7.3069124}, {2815, 7.2635875}, {2620, 7.053291}, {3237, 7.0232787}, {3814, 7.020816}]  |5640291    |
|471            |[{3315, 6.713652}, {3571, 6.674263}, {2702, 6.5250473}, {3755, 6.468761}, {3125, 6.464198}]    |6189621    |
|496            |[{2620, 8.170576}, {3315, 8.141822}, {3125, 8.076595}, {3755, 7.953744}, {3691, 7.834151}]     |79470      |
|833            |[{3247, 7.4129047}, {2929, 7.2292795}, {2493, 7.2191615}, {3836, 7.2082977}, {2620, 7.096886}] |32127

In [47]:
new_user_recs.count()

211461

### Save to disk

In [80]:
new_user_recs.write.parquet('Recommendation_U.parquet', mode='overwrite')
df_asin_asin_id.write.parquet('Recommendation_P.parquet', mode='overwrite')

In [81]:
time_duration = time() - 10
print(time_duration)

1646196568.558788


### Make recommendations to a particular user

In [48]:
customer_id = "5917275"
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
user = find_user_rec.first()

lst=[]
for row in user['recommendations']:
    row_f = df_asin_asin_id.filter(df_asin_asin_id.product_id_idx == row['product_id_idx'])
    row_f_first = row_f.first()
    lst.append((row['product_id_idx'], row_f_first['product_id'], row['rating']))
dic_user_rec = {'customer_id' : user.customer_id, 'recommendations':lst}

In [49]:
dic_user_rec

{'customer_id': 5917275,
 'recommendations': [(3825, 29248443, 6.970757484436035),
  (2654, 21317020, 6.568686008453369),
  (3713, 41556669, 6.545156478881836),
  (3323, 74274442, 6.535182476043701),
  (3230, 931458, 6.503948211669922)]}

### Đọc 2 file đã lưu để lấy dữ liệu đầu vào => Đề xuất

In [50]:
new_user_recs = spark.read.parquet('Recommendation_U.parquet')

In [51]:
new_user_recs.printSchema()

root
 |-- customer_id_idx: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- product_id_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- customer_id: integer (nullable = true)



In [52]:
new_user_recs.show(2)

+---------------+--------------------+-----------+
|customer_id_idx|     recommendations|customer_id|
+---------------+--------------------+-----------+
|            168|[{2135, 8.469116}...|     113348|
|            205|[{3854, 7.032599}...|     673210|
+---------------+--------------------+-----------+
only showing top 2 rows



In [53]:
df_asin_asin_id = spark.read.parquet('Recommendation_P.parquet')

In [54]:
df_asin_asin_id.show(2)

+--------------+----------+
|product_id_idx|product_id|
+--------------+----------+
|         537.0|  14089823|
|        1028.0|  14313154|
+--------------+----------+
only showing top 2 rows



In [57]:
customer_id = "5917275"
find_user_rec = new_user_recs.filter(new_user_recs['customer_id'] == customer_id)
find_user_rec.show(truncate=False)

+---------------+---------------------------------------------------------------------------------------------+-----------+
|customer_id_idx|recommendations                                                                              |customer_id|
+---------------+---------------------------------------------------------------------------------------------+-----------+
|1238           |[{2929, 6.8932276}, {4014, 6.579216}, {3592, 6.5225677}, {3125, 6.416591}, {3281, 6.3698936}]|5917275    |
+---------------+---------------------------------------------------------------------------------------------+-----------+



In [59]:
result = ''
for user in find_user_rec.collect():
    lst=[]
    for row in user['recommendations']:
        print(row)
        row_f = df_asin_asin_id.filter(df_asin_asin_id.product_id_idx == row['product_id_idx'])
        row_f_first = row_f.first()
        lst.append((row['product_id_idx'], row_f_first['product_id'], row['rating']))
    dic_user_rec = {'customer_id' : user.customer_id, 'recommendations':lst}
    result = dic_user_rec

Row(product_id_idx=2929, rating=6.893227577209473)
Row(product_id_idx=4014, rating=6.579216003417969)
Row(product_id_idx=3592, rating=6.5225677490234375)
Row(product_id_idx=3125, rating=6.416591167449951)
Row(product_id_idx=3281, rating=6.369893550872803)
