In [1]:
import os
import sys

# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
import findspark
findspark.init()

In [3]:
# import libraries
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime
from pyspark.sql import Row
import pyspark.sql.functions as f

import pyspark.sql.types as t

import pyspark.ml.feature as feat

import numpy as np
import pandas as pd

# Pipeline
from pyspark.ml import Pipeline, PipelineModel

# Machine learning
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# About Dataset
Use the information:
-  product ID, 
-  User ID, 
- Rating

in dataset Shopee: Products_ThoiTrangNam build a recommended sysytem

**Then make recommendations to some specific users**

In [4]:
SparkContext.setSystemProperty(key='spark.hadoop.dfs.client.use.datanode.hostname',value='true')

In [5]:
sc =SparkContext()

In [6]:
sc.setLogLevel("ERROR")

In [7]:
spark = SparkSession(sc)

In [8]:
df = spark.read.csv(path='../DATA/clean_details.csv',inferSchema=True, header=True)

In [9]:
#sc.cancelAllJobs()

# Overview

In [10]:
df.show(10)
df.printSchema()
print(f'There are {df.count()} rows and {len(df.columns)} columns in dataframe')

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|       190|      2|     5|
|       190|      5|     5|
|       190|      6|     5|
|       190|      8|     5|
|       190|      9|     5|
|       190|     10|     5|
|       190|     12|     5|
|       190|     13|     5|
|       190|     18|     5|
|       190|     26|     5|
+----------+-------+------+
only showing top 10 rows

root
 |-- product_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)

There are 486351 rows and 3 columns in dataframe


## check null

In [11]:
df.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|         0|      0|     0|
+----------+-------+------+



In [12]:
df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|         0|      0|     0|
+----------+-------+------+



## Descirble data

In [13]:
cat_col = df.columns
cat_col

['product_id', 'user_id', 'rating']

### Category data

#### Check distinct values

In [14]:
df.select([f.count_distinct(c).alias(c) for c in df.columns]).show()

+----------+-------+------+
|product_id|user_id|rating|
+----------+-------+------+
|     27730| 113516|     5|
+----------+-------+------+



#### Check top5 disticnt value count of each category columns in df 

In [15]:
for col in cat_col:
    (df.groupby(col).count()
     .withColumn('Normalize',(f.col('count')/df.count()))
     .orderBy(f.col('count').desc())
     .show(5,vertical=False,truncate=False))

+----------+-----+--------------------+
|product_id|count|Normalize           |
+----------+-----+--------------------+
|2114      |270  |5.551546105590407E-4|
|216       |270  |5.551546105590407E-4|
|212       |270  |5.551546105590407E-4|
|21122     |270  |5.551546105590407E-4|
|21132     |270  |5.551546105590407E-4|
+----------+-----+--------------------+
only showing top 5 rows

+-------+-----+---------------------+
|user_id|count|Normalize            |
+-------+-----+---------------------+
|199    |19549|0.040195249932661804 |
|159    |2581 |0.0053068668513069775|
|831    |2533 |0.005208172698318704 |
|324    |2412 |0.004959381187660763 |
|860    |2080 |0.004276746629491869 |
+-------+-----+---------------------+
only showing top 5 rows

+------+------+--------------------+
|rating|count |Normalize           |
+------+------+--------------------+
|5     |380211|0.7817625542046793  |
|4     |50113 |0.10303875184794521 |
|3     |26327 |0.05413168678588098 |
|1     |18677 |0.038402306

In [17]:
df = df.withColumnRenamed(existing='user_id',new='usr_id')

- Because my target to recommend the product with high quality, that why reason I only choose rating from 3 to 5 (DID in EDA2)

In [17]:
df.show(5)

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|       190|     1|     5|
|       190|     2|     5|
|       190|     3|     5|
|       190|     4|     5|
|       190|     5|     5|
+----------+------+------+
only showing top 5 rows



#### Drop some product with low rating and drop user with one time buy

##### PRD

In [24]:
group_prd = df.groupby('product_id').agg(f.mean('rating').alias('avg_rating'), f.count('usr_id').alias('count_user'))
group_prd.show(5)

+----------+-----------------+----------+
|product_id|       avg_rating|count_user|
+----------+-----------------+----------+
|      1959|4.746031746031746|        63|
|     19204|              5.0|         4|
|     19530|4.857142857142857|         7|
|     19553|              5.0|         2|
|     20135|4.898305084745763|        59|
+----------+-----------------+----------+
only showing top 5 rows



In [28]:
group_prd_gt_3 = group_prd.filter(f.col('avg_rating')>=3)
group_prd_gt_3.show(5)

+----------+-----------------+----------+
|product_id|       avg_rating|count_user|
+----------+-----------------+----------+
|      1959|4.746031746031746|        63|
|     19204|              5.0|         4|
|     19530|4.857142857142857|         7|
|     19553|              5.0|         2|
|     20135|4.898305084745763|        59|
+----------+-----------------+----------+
only showing top 5 rows



In [29]:
list_prd = group_prd_gt_3.select('product_id').rdd.flatMap(lambda x: x).collect()
len(list_prd)

30961

In [30]:
list_prd[:5]

[1959, 19204, 19530, 19553, 20135]

##### User Group

In [25]:
group_user = df.groupby('usr_id').agg(f.mean('rating').alias('avg_rating'), f.count('product_id').alias('count_prd'))
group_user.show(5)

+------+------------------+---------+
|usr_id|        avg_rating|count_prd|
+------+------------------+---------+
|   148|               3.5|        2|
|   463|3.6666666666666665|        3|
|   471|               3.0|        1|
|   496|3.8461538461538463|       26|
|   833|               4.0|        1|
+------+------------------+---------+
only showing top 5 rows



In [34]:
group_user_gt_5 = group_user.filter(f.col('count_prd')>=2)
print('Total customer:', group_user_gt_5.count())
group_user_gt_5.show(5)

Total customer: 113654
+------+------------------+---------+
|usr_id|        avg_rating|count_prd|
+------+------------------+---------+
|   148|               3.5|        2|
|   463|3.6666666666666665|        3|
|   496|3.8461538461538463|       26|
|  1238|               4.2|        5|
|  1645| 4.777777777777778|        9|
+------+------------------+---------+
only showing top 5 rows



In [35]:
list_user = group_user_gt_5.select('usr_id').rdd.flatMap(lambda x: x).collect()
print("Total user:",len(list_user))
list_user[:5]

Total user: 113654


[148, 463, 496, 1238, 1645]

# Preprocessing

In [18]:
final_df = df
final_df.show()

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|       190|     2|     5|
|       190|     5|     5|
|       190|     6|     5|
|       190|     8|     5|
|       190|     9|     5|
|       190|    10|     5|
|       190|    12|     5|
|       190|    13|     5|
|       190|    18|     5|
|       190|    26|     5|
|       190|    28|     5|
|       190|     6|     5|
|       190|    30|     5|
|       190|    31|     5|
|       190|    32|     5|
|       190|    33|     5|
|       190|    34|     5|
|       190|    35|     5|
|       190|    37|     5|
|       190|    38|     5|
+----------+------+------+
only showing top 20 rows



In [21]:
final_df.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in final_df.columns]).show()

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|         0|     0|     0|
+----------+------+------+



In [22]:
final_df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in final_df.columns]).show()

+----------+------+------+
|product_id|usr_id|rating|
+----------+------+------+
|         0|     0|     0|
+----------+------+------+



# Split Data

In [23]:
train, test = final_df.randomSplit([0.8,0.2],seed=42)

# Model

In [26]:
_maxIter = 10
_regParam = 0.01
_rank = 20

als = ALS(maxIter=_maxIter,
          regParam=_regParam, 
          userCol='usr_id',
          itemCol='product_id', 
          ratingCol='rating', 
          coldStartStrategy='drop',
          rank=_rank,
          nonnegative=True)
model_t = als.fit(train)

## Evaluation

In [27]:
# Evaluate the model by computing the RMSE on the test data
predictions = model_t.transform(test)

In [28]:
#inspect
predictions.show(5)

+----------+------+------+----------+
|product_id|usr_id|rating|prediction|
+----------+------+------+----------+
|       148|    31|     1| 1.2646215|
|      1088|   108|     5|  4.226724|
|     18800|   368|     3|  4.162443|
|    233044|   481|     5|  4.608573|
|    174229|   513|     5|  4.815551|
+----------+------+------+----------+
only showing top 5 rows



In [29]:
#calcuate rmse score
evaluator = RegressionEvaluator(metricName="rmse",
                                labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [30]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.2459594940220193


The model can predict new overall with error nearly 1.0 

## Build Pipeline

In [31]:
pipeline = Pipeline().setStages([als])
p_als_model = pipeline.fit(train)

In [45]:
# save
# models_path =' ../DATA/Pyspark/'
# p_als_model.save(models_path + "/als_model")

In [35]:
# load
# p_als_model = PipelineModel.load(models_path + "/xgb_model.model"

In [36]:
#save model to disk
# model_t.save("../DATA/Pyspark/recommendation_model_amazon_toys_games")

## Providing Recommendations: for all users

In [31]:
# get 5 recommendations which have highest rating.
user_recs = model_t.recommendForAllUsers(5)

In [32]:
user_recs.show(5, truncate=False)

+------+------------------------------------------------------------------------------------------------------+
|usr_id|recommendations                                                                                       |
+------+------------------------------------------------------------------------------------------------------+
|31    |[{10552, 9.198219}, {10529, 9.198219}, {10448, 9.149739}, {172598, 8.324952}, {233614, 8.269331}]     |
|34    |[{15747, 7.086456}, {233427, 6.9390316}, {172324, 6.9051256}, {21889, 6.8773227}, {232486, 6.8213263}]|
|53    |[{10837, 9.332895}, {234040, 8.957514}, {233635, 8.778513}, {233512, 8.778402}, {234261, 8.777884}]   |
|65    |[{25839, 7.963831}, {111273, 7.796167}, {19570, 7.796167}, {131264, 7.6429644}, {161877, 7.6077466}]  |
|108   |[{151198, 9.4677515}, {234516, 8.9777}, {131382, 8.807213}, {16321, 8.759116}, {11262, 8.60579}]      |
+------+------------------------------------------------------------------------------------------------

In [33]:
user_recs.select('usr_id').count()

110083

In [88]:
result = user_recs.select(f.col('usr_id'),
                       f.explode(f.col('recommendations')))

result = (result.withColumn('product_id', 
                               result.col.getField('product_id'))
             .withColumn('rating',  
                         result.col.getField('rating'))
            )
result.show()

+------+-------------------+----------+---------+
|usr_id|                col|product_id|   rating|
+------+-------------------+----------+---------+
|    31|  {10552, 9.198219}|     10552| 9.198219|
|    31|  {10529, 9.198219}|     10529| 9.198219|
|    31|  {10448, 9.149739}|     10448| 9.149739|
|    31| {172598, 8.324952}|    172598| 8.324952|
|    31| {233614, 8.269331}|    233614| 8.269331|
|    34|  {15747, 7.086456}|     15747| 7.086456|
|    34|{233427, 6.9390316}|    233427|6.9390316|
|    34|{172324, 6.9051256}|    172324|6.9051256|
|    34| {21889, 6.8773227}|     21889|6.8773227|
|    34|{232486, 6.8213263}|    232486|6.8213263|
|    53|  {10837, 9.332895}|     10837| 9.332895|
|    53| {234040, 8.957514}|    234040| 8.957514|
|    53| {233635, 8.778513}|    233635| 8.778513|
|    53| {233512, 8.778402}|    233512| 8.778402|
|    53| {234261, 8.777884}|    234261| 8.777884|
|    65|  {25839, 7.963831}|     25839| 7.963831|
|    65| {111273, 7.796167}|    111273| 7.796167|


In [89]:
result_pd = result.select(['usr_id','product_id','rating']).toPandas()
result_pd.head()

Unnamed: 0,usr_id,product_id,rating
0,31,10552,9.198219
1,31,10529,9.198219
2,31,10448,9.149739
3,31,172598,8.324952
4,31,233614,8.269331


In [92]:
for row in result_pd.itertuples(index=False):
    print(row.product_id)
    break

10552


In [94]:
user_id = 31
#create a dictionary for user recommendation
fil_1 = result_pd['usr_id'] == user_id
find_user_rec = result_pd[fil_1]
recom = []
for row in find_user_rec.itertuples(index=False):
    # Access columns using row.column_name
    dict_prd = {'product_id':row.product_id, 'rating':row.rating}
    recom.append(dict_prd)
    #print(row.usr_id, row.product_id, row.rating)

recom_user_dict = {'user_id':user_id,'recom': recom}
recom_user_dict

{'user_id': 31,
 'recom': [{'product_id': 10552, 'rating': 9.198219299316406},
  {'product_id': 10529, 'rating': 9.198219299316406},
  {'product_id': 10448, 'rating': 9.149739265441895},
  {'product_id': 172598, 'rating': 8.324952125549316},
  {'product_id': 233614, 'rating': 8.269330978393555}]}

## Save data to file

In [46]:
#final_df.show()

In [36]:
# Save to disk
#models_path ='../DATA/Pyspark/'
#user_recs.write.parquet('Top_5_User_CF.parquet', mode='overwrite')
#df_asin_asin_idx.write.parquet('reviews_Toys_and_Games_5_Product.parquet', mode='overwrite')

# Define Function for Recommendation

In [96]:
def make_recommendation(usr_id,result_: pd.DataFrame):
    #create a dictionary for user recommendation
    try:
        #create a dictionary for user recommendation
        fil_1 = result_['usr_id'] == user_id
        find_user_rec = result_[fil_1]
        recom = []
        for row in find_user_rec.itertuples(index=False):
            # Access columns using row.column_name
            dict_prd = {'product_id':row.product_id, 'rating':row.rating}
            recom.append(dict_prd)
            #print(row.usr_id, row.product_id, row.rating)
        
        recom_user_dict = {'user_id':user_id,'recom': recom}
        
        return recom_user_dict
    except Exception as e:
        no_value = 'There is no recommendation for this user'
        print(no_value)
        return None

In [97]:
test_dict = make_recommendation(usr_id=31,result_=result_pd)
test_dict

{'user_id': 31,
 'recom': [{'product_id': 10552, 'rating': 9.198219299316406},
  {'product_id': 10529, 'rating': 9.198219299316406},
  {'product_id': 10448, 'rating': 9.149739265441895},
  {'product_id': 172598, 'rating': 8.324952125549316},
  {'product_id': 233614, 'rating': 8.269330978393555}]}

In [59]:
# test = pd.DataFrame(data=test_dict)
# # Expand the column containing lists
# expanded = test['recom'].apply(pd.Series)
# user_recom_df = pd.concat([test.drop(columns=['recom']), expanded], axis=1)
# user_recom_df

In [98]:
def make_result_dataframe(dic_user_rec):
    #adjust rating parameter for filter higher rating
    test = pd.DataFrame(data=dic_user_rec)
    # Expand the column containing lists
    expanded = test['recom'].apply(pd.Series)
    user_recom_df = pd.concat([test.drop(columns=['recom']), expanded], axis=1)
    user_recom_df
    return user_recom_df

In [99]:
make_result_dataframe(dic_user_rec=test_dict)

Unnamed: 0,user_id,product_id,rating
0,31,10552.0,9.198219
1,31,10529.0,9.198219
2,31,10448.0,9.149739
3,31,172598.0,8.324952
4,31,233614.0,8.269331


# Then make recommendations to some users: 

## Get random user

In [100]:
# Get list user ID
full_detail_df = pd.read_csv('../DATA/clean_details.csv')
user_id_unique = full_detail_df['user_id'].unique()

# Get sample user id: 10 values
sample_user = np.random.choice(user_id_unique, size=10, replace=False)
print("Random sample:", sample_user)

Random sample: [  7078  38482  57980 134459 258142 215636 327459 516546 118457 384149]


In [103]:
sample_user[0]

7078

In [80]:
#make_recommendation(usr_id=102823,result=result)

In [67]:
#final_df.select(f.col("usr_id").alias("random_urs_id")).sample(False, 0.05, seed=0).collect()

In [106]:
#recommendation for each users
for user in sample_user:
    user_id = int(user)
    user_recom = make_recommendation(usr_id=user_id,result_=result_pd)
    if isinstance(user_recom,dict):
        result = make_result_dataframe(dic_user_rec=user_recom)
        print("Recommendation for UserID: ", user_id)
        display(result)

Recommendation for UserID:  7078


Unnamed: 0,user_id,product_id,rating
0,7078,11717.0,5.79473
1,7078,232486.0,5.738311
2,7078,21889.0,5.683164
3,7078,172726.0,5.642112
4,7078,19337.0,5.628994


Recommendation for UserID:  38482


Unnamed: 0,user_id,product_id,rating
0,38482,16178.0,7.30928
1,38482,19337.0,7.278183
2,38482,234284.0,7.232375
3,38482,21889.0,7.180694
4,38482,11717.0,7.153756


Recommendation for UserID:  57980


Unnamed: 0,user_id,product_id,rating
0,57980,232100.0,5.686516
1,57980,13725.0,5.64147
2,57980,231872.0,5.535635
3,57980,13756.0,5.37499
4,57980,19337.0,5.345009


Recommendation for UserID:  134459


Unnamed: 0,user_id,product_id,rating
0,134459,181117.0,4.999285
1,134459,141335.0,4.981279
2,134459,21889.0,4.952895
3,134459,15747.0,4.866946
4,134459,11717.0,4.855806


Recommendation for UserID:  258142


Unnamed: 0,user_id,product_id,rating
0,258142,19337.0,6.560414
1,258142,172726.0,6.506807
2,258142,234284.0,6.433818
3,258142,21889.0,6.409804
4,258142,12331.0,6.397433


Recommendation for UserID:  215636


Unnamed: 0,user_id,product_id,rating
0,215636,11717.0,6.035505
1,215636,21889.0,5.976181
2,215636,12331.0,5.926832
3,215636,232486.0,5.905541
4,215636,19337.0,5.903114


Recommendation for UserID:  327459


Unnamed: 0,user_id,product_id,rating
0,327459,13573.0,4.997901
1,327459,13628.0,4.417516
2,327459,232233.0,4.353266
3,327459,17777.0,4.291866
4,327459,21889.0,4.272508


Recommendation for UserID:  516546


Unnamed: 0,user_id,product_id,rating
0,516546,16280.0,6.77478
1,516546,25569.0,6.627365
2,516546,172324.0,6.624763
3,516546,15747.0,6.616024
4,516546,10166.0,6.61134


Recommendation for UserID:  118457


Unnamed: 0,user_id,product_id,rating
0,118457,21889.0,1.699261
1,118457,11717.0,1.697416
2,118457,172726.0,1.687926
3,118457,15747.0,1.684354
4,118457,171429.0,1.683898


Recommendation for UserID:  384149


Unnamed: 0,user_id,product_id,rating
0,384149,233427.0,7.150984
1,384149,25552.0,7.15098
2,384149,232232.0,7.117294
3,384149,234284.0,7.078147
4,384149,21889.0,7.074231


## Save Data Recommedation:

In [107]:
result_pd.to_csv('../DATA/Pyspark/rcm_user_ALS.csv',index=False)