In [26]:
import os
import sys

# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [1]:
import findspark
findspark.init()

In [2]:
# import libraries
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime
from pyspark.sql import Row
import pyspark.sql.functions as f

import pyspark.sql.types as t

import pyspark.ml.feature as feat

import numpy as np
import pandas as pd

# Pipeline
from pyspark.ml import Pipeline, PipelineModel

# Machine learning
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# About Dataset
Use the information:
-  product ID, 
-  User ID, 
- Rating

in dataset Shopee: Products_ThoiTrangNam build a recommended sysytem

**Then make recommendations to some specific users**

In [3]:
SparkContext.setSystemProperty(key='spark.hadoop.dfs.client.use.datanode.hostname',value='true')

In [4]:
sc =SparkContext()

In [5]:
sc.setLogLevel("ERROR")

In [6]:
spark = SparkSession(sc)

In [7]:
df = spark.read.csv(path='../DATA/clean_details.csv',inferSchema=True, header=True)

In [7]:
#sc.cancelAllJobs()

# Overview

In [8]:
df.show(10)
df.printSchema()
print(f'There are {df.count()} rows and {len(df.columns)} columns in dataframe')

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|       190|      1|     5|
|       190|      2|     5|
|       190|      3|     5|
|       190|      4|     5|
|       190|      5|     5|
|       190|      6|     5|
|       190|      7|     5|
|       190|      8|     5|
|       190|      9|     5|
|       190|     10|     5|
+----------+-------+------+
only showing top 10 rows

root
 |-- product_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)

There are 1024482 rows and 3 columns in dataframe


## check null

In [9]:
df.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|         0|      0|     0|
+----------+-------+------+



In [10]:
df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|         0|      0|     0|
+----------+-------+------+



## Descirble data

In [11]:
cat_col = df.columns
cat_col

['product_id', 'user_id', 'rating']

## Category data

### Check distinct values

In [12]:
df.select([f.count_distinct(c).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|     31267| 650636|     5|
+----------+-------+------+



### Check top5 disticnt value count of each category columns in df 

In [13]:
for col in cat_col:
    (df.groupby(col).count()
     .withColumn('Normalize',(f.col('count')/df.count()))
     .orderBy(f.col('count').desc())
     .show(5,vertical=False,truncate=False))

+----------+-----+---------------------+
|product_id|count|Normalize            |
+----------+-----+---------------------+
|1731      |412  |4.021544546414676E-4 |
|177       |395  |3.8556070287228085E-4|
|231       |391  |3.816562906912957E-4 |
|17194     |389  |3.797040846008031E-4 |
|11131     |387  |3.7775187851031057E-4|
+----------+-----+---------------------+
only showing top 5 rows

+-------+-----+---------------------+
|user_id|count|Normalize            |
+-------+-----+---------------------+
|199    |19615|0.019146261232505794 |
|159    |2585 |0.002523226371961635 |
|831    |2541 |0.002480277837970799 |
|324    |2415 |0.0023572888542697677|
|860    |2088 |0.0020381031584742336|
+-------+-----+---------------------+
only showing top 5 rows

+------+------+--------------------+
|rating|count |Normalize           |
+------+------+--------------------+
|5     |777662|0.759078246372313   |
|4     |118212|0.11538709318465332 |
|3     |63051 |0.061544273105823236|
|1     |41447 |0.

In [14]:
df = df.withColumnRenamed(existing='user_id',new='usr_id')

# Preprocessing

In [15]:
df.show(10)

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|       190|     1|     5|
|       190|     2|     5|
|       190|     3|     5|
|       190|     4|     5|
|       190|     5|     5|
|       190|     6|     5|
|       190|     7|     5|
|       190|     8|     5|
|       190|     9|     5|
|       190|    10|     5|
+----------+------+------+
only showing top 10 rows



In [16]:
#strindex = feat.StringIndexer(inputCols=['usr_id','product_id'],outputCols=['user_id','prd_id']).fit(df)

In [17]:
#final_df = strindex.transform(df)
final_df = df
del df
final_df.show(5, truncate=True)

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|       190|     1|     5|
|       190|     2|     5|
|       190|     3|     5|
|       190|     4|     5|
|       190|     5|     5|
+----------+------+------+
only showing top 5 rows



In [22]:
print("Number of rows: ",final_df.count())
final_df.printSchema()

Number of rows:  1024482
root
 |-- product_id: integer (nullable = true)
 |-- usr_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [19]:
final_df.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in final_df.columns]).show()

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|         0|     0|     0|
+----------+------+------+



In [20]:
final_df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in final_df.columns]).show()

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|         0|     0|     0|
+----------+------+------+



# Split Data

In [23]:
train, test = final_df.randomSplit([0.8,0.2],seed=42)

# Model

In [24]:
als = ALS(maxIter=10,
          regParam=0.01, 
          userCol='usr_id',
          itemCol='product_id', 
          ratingCol='rating', 
          coldStartStrategy='drop',
          rank=10,
          nonnegative=True)
model_t = als.fit(train)

## Evaluation

In [40]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [41]:
#inspect
predictions.show(5)

                                                                                

+----------+-------+------+----------+
|product_id|user_id|rating|prediction|
+----------+-------+------+----------+
|      2142|     40|   5.0| 3.9790876|
|     25517|     40|   5.0| 3.8241038|
|    232473|     40|   5.0|  5.035574|
|     11317|     50|   5.0|  4.661109|
|     13623|     57|   5.0|  5.002939|
+----------+-------+------+----------+
only showing top 5 rows



In [42]:
#calcuate rmse score
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

                                                                                

In [43]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1477614413474846


The model can predict new overall with error nearly 1.0 

In [44]:
# save model to disk
#model.save("recommendation_model_amazon_toys_games")

### Providing Recommendations: for all users

In [45]:
# get 5 recommendations which have highest rating.
user_recs = model.recommendForAllUsers(5)

In [46]:
user_recs.show(5, truncate=False)

[Stage 277:>                                                        (0 + 1) / 1]

+-------+------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                       |
+-------+------------------------------------------------------------------------------------------------------+
|12     |[{19344, 6.25057}, {173345, 6.2220078}, {172416, 6.1911573}, {235012, 6.1029043}, {172944, 6.0931697}]|
|26     |[{12334, 6.008749}, {172944, 6.0044107}, {19344, 5.9870224}, {173345, 5.9641447}, {235012, 5.909465}] |
|28     |[{173345, 6.349544}, {172416, 6.240093}, {173325, 6.196763}, {231384, 6.1837873}, {151305, 6.1657777}]|
|31     |[{10489, 6.274107}, {111747, 6.2604423}, {12382, 6.2467313}, {173710, 6.0998383}, {174879, 6.099676}] |
|34     |[{12741, 6.9398265}, {10490, 6.916534}, {173345, 6.9019046}, {16155, 6.8827195}, {174436, 6.8670235}] |
+-------+---------------------------------------------------------------------------------------

                                                                                

In [35]:
#user_recs.count()

## Save data to file

In [49]:
df_product = df_product.select('product_id','product_name_new')

In [36]:
#get reviewerID for reviewerID_idx
df_reviewer_reviewer_id = data_indexed.select('reviewerID_idx', 'reviewerID').distinct()

In [37]:
#get product_id for asin_idx
df_product_idx = data_indexed.select('asin_idx', 'asin').distinct()

In [51]:
data_indexed.show(3)

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|       190|      1|   5.0|
|       190|      2|   5.0|
|       190|      3|   5.0|
+----------+-------+------+
only showing top 3 rows



In [29]:
#combine dataframe with reviewerID
#new_user_recs = user_recs.join(df_product, on=['product_id'], how='left')

In [39]:
#inspect
new_user_recs.show(3, truncate=False)

23/08/04 14:54:47 WARN DAGScheduler: Broadcasting large task binary with size 1327.2 KiB
23/08/04 14:55:05 WARN DAGScheduler: Broadcasting large task binary with size 1324.1 KiB


+--------------+-----------------------------------------------------------------------------------------------+--------------+
|reviewerID_idx|recommendations                                                                                |reviewerID    |
+--------------+-----------------------------------------------------------------------------------------------+--------------+
|0             |[{9847, 6.2357802}, {6836, 5.9811497}, {9254, 5.9066997}, {7533, 5.8986154}, {7735, 5.8954325}]|AJGU56YG8G1DQ |
|1             |[{6696, 5.283102}, {4135, 5.2031755}, {4560, 5.169352}, {7244, 5.1690936}, {8672, 5.165695}]   |A1M8AYAL3L8ACP|
|2             |[{10415, 5.114091}, {2700, 5.0675197}, {7010, 5.048225}, {9254, 5.0290294}, {7735, 5.0211015}] |A1II2ZRPKZAQQD|
+--------------+-----------------------------------------------------------------------------------------------+--------------+
only showing top 3 rows



In [40]:
# Save to disk
new_user_recs.write.parquet('reviews_Toys_and_Games_5_User.parquet', mode='overwrite')
df_asin_asin_idx.write.parquet('reviews_Toys_and_Games_5_Product.parquet', mode='overwrite')

23/08/04 14:55:06 WARN DAGScheduler: Broadcasting large task binary with size 1327.2 KiB
23/08/04 14:55:23 WARN DAGScheduler: Broadcasting large task binary with size 1525.3 KiB
                                                                                

## Then make recommendations to some users: 
A3GJPLCZCDXXG6, A34U85WY8ZWBPV, A2VIY2TL6QPYLG

In [41]:
def make_recommendation(reviewerID):
    #create a dictionary for user recommendation
    find_user_rec = new_user_recs.filter(new_user_recs['reviewerID'] == reviewerID)
    user = find_user_rec.first()
    lst = []
    for row in user['recommendations']:
        row_f = df_asin_asin_idx.filter(df_asin_asin_idx.asin_idx == row['asin_idx'])
        row_f_first = row_f.first()
        lst.append((row['asin_idx'], row_f_first['asin'], row['rating']))
    dic_user_rec = {'reviewerID' : user.reviewerID, 'recommendations' :lst}
    return dic_user_rec

In [42]:
def make_result_dataframe(dic_user_rec, rating=4):
    #adjust rating parameter for filter higher rating
    #creating dataframe recommendations for new users
    import pandas as pd
    df_result = pd.DataFrame(dic_user_rec['recommendations'], columns=['asin_idx', 'productID', 'rating'])
    df_result['reviewerID'] = dic_user_rec['reviewerID']
    column_order = ['reviewerID',  'productID', 'rating', 'asin_idx']
    df_result = df_result.reindex(columns=column_order)
    df_result.drop('asin_idx', axis=1, inplace=True)
    result = df_result[df_result['rating']>rating]
    return result

In [43]:
#recommendation for each users
for user in ['A3GJPLCZCDXXG6', 'A34U85WY8ZWBPV', 'A2VIY2TL6QPYLG']:
    user_recom = make_recommendation(user)
    result = make_result_dataframe(user_recom)
    print("Recommendation for: ", user)
    display(result)    

23/08/04 14:55:28 WARN DAGScheduler: Broadcasting large task binary with size 1327.2 KiB
23/08/04 14:55:45 WARN DAGScheduler: Broadcasting large task binary with size 1323.3 KiB
[Stage 369:>                                                        (0 + 1) / 1]

Recommendation for:  A3GJPLCZCDXXG6


                                                                                

Unnamed: 0,reviewerID,productID,rating
0,A3GJPLCZCDXXG6,B00D3Y18WO,5.634512
1,A3GJPLCZCDXXG6,B0007DI63S,5.598936
2,A3GJPLCZCDXXG6,B008L264Q8,5.564395
3,A3GJPLCZCDXXG6,B004Y3U90K,5.558105
4,A3GJPLCZCDXXG6,B00GHPHRI0,5.535957


23/08/04 14:55:51 WARN DAGScheduler: Broadcasting large task binary with size 1327.2 KiB
23/08/04 14:56:08 WARN DAGScheduler: Broadcasting large task binary with size 1323.3 KiB
                                                                                

Recommendation for:  A34U85WY8ZWBPV


Unnamed: 0,reviewerID,productID,rating
0,A34U85WY8ZWBPV,B00BK3FA8I,5.089292
1,A34U85WY8ZWBPV,B0093HPI9E,5.066402
2,A34U85WY8ZWBPV,B0007DI63S,5.065325
3,A34U85WY8ZWBPV,B00739W6VM,5.064379
4,A34U85WY8ZWBPV,B0007XIZ0M,5.021018


23/08/04 14:56:12 WARN DAGScheduler: Broadcasting large task binary with size 1327.2 KiB
23/08/04 14:56:29 WARN DAGScheduler: Broadcasting large task binary with size 1323.3 KiB
[Stage 483:>                                                        (0 + 1) / 1]

Recommendation for:  A2VIY2TL6QPYLG


                                                                                

Unnamed: 0,reviewerID,productID,rating
0,A2VIY2TL6QPYLG,B00BY7YIPG,4.860012
1,A2VIY2TL6QPYLG,B009B7F6CA,4.828602
2,A2VIY2TL6QPYLG,B00ERZGLT8,4.795135
3,A2VIY2TL6QPYLG,B004SGOSI2,4.794385
4,A2VIY2TL6QPYLG,B00ERZVZZS,4.739208


#### Summary
* A3GJPLCZCDXXG6 user might likes these productIDs: B009Y943F6, B009Y94H1G, B007CB7X1E, B00D0GYMTG, B00ERZVZZS

* A34U85WY8ZWBPV user might likes these productIDs: B00D3Y18WO, B004Y3U90K, B003AVWOMS, B00AZQ2SYU, B000BX4TTO

* A2VIY2TL6QPYLG user might likes these productIDs: B00A0GNOVQ, B00D95E30G, B001L123JS, B000PC62EQ, B00CFELU30 	

## Or reading the saved file to get input data => recommendation

In [44]:
# Read the Parquet file into a new DataFrame
new_user_recs = spark.read.parquet('reviews_Toys_and_Games_5_User.parquet')

In [45]:
new_user_recs.printSchema()

root
 |-- reviewerID_idx: integer (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- asin_idx: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)
 |-- reviewerID: string (nullable = true)



In [46]:
new_user_recs.show(2)

+--------------+--------------------+--------------+
|reviewerID_idx|     recommendations|    reviewerID|
+--------------+--------------------+--------------+
|             0|[{9847, 6.2357802...| AJGU56YG8G1DQ|
|             1|[{6696, 5.283102}...|A1M8AYAL3L8ACP|
+--------------+--------------------+--------------+
only showing top 2 rows



In [47]:
df_asin_asin_idx = spark.read.parquet('reviews_Toys_and_Games_5_Product.parquet')

In [48]:
df_asin_asin_idx.printSchema()

root
 |-- asin_idx: double (nullable = true)
 |-- asin: string (nullable = true)



In [49]:
df_asin_asin_idx.show(2)

+--------+----------+
|asin_idx|      asin|
+--------+----------+
|  9630.0|1603800689|
|    19.0|B00000K3BR|
+--------+----------+
only showing top 2 rows



In [50]:
#recommendation for each users
for user in ['A3GJPLCZCDXXG6', 'A34U85WY8ZWBPV', 'A2VIY2TL6QPYLG']:
    user_recom = make_recommendation(user)
    result = make_result_dataframe(user_recom)
    print("Recommendation for: ", user)
    display(result)    

Recommendation for:  A3GJPLCZCDXXG6


Unnamed: 0,reviewerID,productID,rating
0,A3GJPLCZCDXXG6,B00D3Y18WO,5.634512
1,A3GJPLCZCDXXG6,B0007DI63S,5.598936
2,A3GJPLCZCDXXG6,B008L264Q8,5.564395
3,A3GJPLCZCDXXG6,B004Y3U90K,5.558105
4,A3GJPLCZCDXXG6,B00GHPHRI0,5.535957


Recommendation for:  A34U85WY8ZWBPV


Unnamed: 0,reviewerID,productID,rating
0,A34U85WY8ZWBPV,B00BK3FA8I,5.089292
1,A34U85WY8ZWBPV,B0093HPI9E,5.066402
2,A34U85WY8ZWBPV,B0007DI63S,5.065325
3,A34U85WY8ZWBPV,B00739W6VM,5.064379
4,A34U85WY8ZWBPV,B0007XIZ0M,5.021018


Recommendation for:  A2VIY2TL6QPYLG


Unnamed: 0,reviewerID,productID,rating
0,A2VIY2TL6QPYLG,B00BY7YIPG,4.860012
1,A2VIY2TL6QPYLG,B009B7F6CA,4.828602
2,A2VIY2TL6QPYLG,B00ERZGLT8,4.795135
3,A2VIY2TL6QPYLG,B004SGOSI2,4.794385
4,A2VIY2TL6QPYLG,B00ERZVZZS,4.739208
