## Business understanding

- Goal: Build a model to predict overalls for products that have been select for users.

## Set Environments

In [1]:
import findspark
findspark.init()

In [2]:
import pandas as pd
import numpy as np

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [3]:
spark = SparkSession.builder.appName('customer_recommendation').getOrCreate()

## Load Dataset

In [4]:
data = spark.read.json('reviews_Office_Products_5.json')

In [5]:
data.show(5)

+----------+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------+
|      asin|helpful|overall|          reviewText| reviewTime|    reviewerID|       reviewerName|             summary|unixReviewTime|
+----------+-------+-------+--------------------+-----------+--------------+-------------------+--------------------+--------------+
|B00000JBLH| [3, 4]|    5.0|I bought my first...| 09 3, 2004|A32T2H8150OJLU|                ARH|A solid performer...|    1094169600|
|B00000JBLH| [7, 9]|    5.0|WHY THIS BELATED ...|12 15, 2007|A3MAFS04ZABRGO|   Let it Be "Alan"|Price of GOLD is ...|    1197676800|
|B00000JBLH| [3, 3]|    2.0|I have an HP 48GX...| 01 1, 2011|A1F1A0QQP2XVH5|             Mark B|Good functionalit...|    1293840000|
|B00000JBLH| [7, 8]|    5.0|I've started doin...|04 19, 2006| A49R5DBXXQDE5|       R. D Johnson|One of the last o...|    1145404800|
|B00000JBLH| [0, 0]|    5.0|For simple calcul...| 08 4, 2013|A2XRMQA6

## Data understanding

#### Data Collection
- Datasets contain product review on Amazon from May,1996 to July, 2014.

#### Data Understanding
- reviewerID - ID of the reviewer, e.g. A2SUAM1J3GNN3B
- asin - ID of the product, e.g. 0000013714
- reviewerName - name of the reviewer
- helpful - helpfulness rating of the review, e.g. 2/3
- reviewText - text of the review
- overall - rating of the product
- summary - summary of the review
- unixReviewTime - time of the review (unix time)
- reviewTime - time of the review (raw)

In [6]:
rating = data.select(['asin', 'overall', 'reviewerID'])

In [7]:
print('Total row of dataset: {} rows'.format(rating.count()))

Total row of dataset: 53258 rows


In [8]:
rating.select([count(when(col(c).isNull(), c)).alias(c) for c in rating.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


There is no Null value in this dataset

In [9]:
users = rating.select('reviewerID').distinct().count()
products = rating.select('asin').distinct().count()
numerator = rating.count()

In [10]:
print('Total Reviewer ID: {} user'.format(users))
print('Total Products: {} products'.format(products))
print('Total row of rating: {} rows'.format(numerator))

Total Reviewer ID: 4905 user
Total Products: 2420 products
Total row of rating: 53258 rows


In [11]:
denominator = users*products
denominator

11870100

In [12]:
sparsity = 1-(numerator*1.0/denominator)
print('Sparsity: '), sparsity

Sparsity: 


(None, 0.9955132644206873)

## Data Transformation

asin and reviewerID need to convert into StringIndexer for prediction model.

In [13]:
indexer = StringIndexer(inputCol = 'asin', outputCol = 'asin_idx')
indexer1 = StringIndexer(inputCol = 'reviewerID', outputCol = 'reviewID_idx')

In [14]:
pipeline = Pipeline(stages = [indexer, indexer1])

In [15]:
data_indexed = pipeline.fit(rating).transform(rating)

In [16]:
data_indexed.show(5)

+----------+-------+--------------+--------+------------+
|      asin|overall|    reviewerID|asin_idx|reviewID_idx|
+----------+-------+--------------+--------+------------+
|B00000JBLH|    5.0|A32T2H8150OJLU|  1444.0|       286.0|
|B00000JBLH|    5.0|A3MAFS04ZABRGO|  1444.0|      4499.0|
|B00000JBLH|    2.0|A1F1A0QQP2XVH5|  1444.0|       211.0|
|B00000JBLH|    5.0| A49R5DBXXQDE5|  1444.0|      4604.0|
|B00000JBLH|    5.0|A2XRMQA6PJ5ZJ8|  1444.0|      1330.0|
+----------+-------+--------------+--------+------------+
only showing top 5 rows



The feature have already been converted.

In [17]:
data_indexed.select([count(when(col(c).isNull(), c)).alias(c) for c in data_indexed.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewID_idx,0


There is no null value after convert

## Build model - ALS

In [18]:
(training, test) = data_indexed.randomSplit([0.8, 0.2])

In [19]:
als = ALS(maxIter = 5, regParam = 0.09, rank = 25,
         userCol = 'reviewID_idx', itemCol = 'asin_idx', ratingCol = 'overall',
         coldStartStrategy = 'drop', nonnegative = True)
model = als.fit(training)

In [20]:
predictions = model.transform(test)

In [21]:
predictions.select(['asin_idx', 'reviewID_idx', 'overall', 'prediction']).show(5)

+--------+------------+-------+----------+
|asin_idx|reviewID_idx|overall|prediction|
+--------+------------+-------+----------+
|   148.0|      1959.0|    4.0| 3.4595585|
|   148.0|       580.0|    4.0| 4.0196123|
|   148.0|       685.0|    5.0| 3.9438353|
|   148.0|       168.0|    5.0| 3.8702369|
|   148.0|       852.0|    5.0|  4.239786|
+--------+------------+-------+----------+
only showing top 5 rows



In [22]:
evaluator = RegressionEvaluator(metricName = 'rmse', 
                               labelCol = 'overall',
                               predictionCol = 'prediction')
rmse = evaluator.evaluate(predictions)
print('Root mean square error = '+str(rmse))

Root mean square error = 1.0441154764794682


The MSE value does not big, It's acceptable.

In [23]:
user_recs = model.recommendForAllUsers(20)

In [24]:
for user in user_recs.head(5):
    print(user)
    print('\n')

Row(reviewID_idx=1580, recommendations=[Row(asin_idx=2243, rating=5.379284858703613), Row(asin_idx=1820, rating=4.989779472351074), Row(asin_idx=2001, rating=4.972285747528076), Row(asin_idx=1320, rating=4.971190929412842), Row(asin_idx=1985, rating=4.970786094665527), Row(asin_idx=1062, rating=4.949776649475098), Row(asin_idx=1030, rating=4.93527889251709), Row(asin_idx=2247, rating=4.914149761199951), Row(asin_idx=2141, rating=4.870896339416504), Row(asin_idx=1695, rating=4.861541748046875), Row(asin_idx=1899, rating=4.848489284515381), Row(asin_idx=2177, rating=4.817795753479004), Row(asin_idx=906, rating=4.793710708618164), Row(asin_idx=1309, rating=4.785304546356201), Row(asin_idx=2007, rating=4.7574968338012695), Row(asin_idx=1003, rating=4.6844892501831055), Row(asin_idx=2219, rating=4.652678489685059), Row(asin_idx=46, rating=4.622981548309326), Row(asin_idx=1217, rating=4.5997514724731445), Row(asin_idx=2333, rating=4.589977264404297)])


Row(reviewID_idx=4900, recommendations

### Create dataframe for result

In [25]:
recs = model.recommendForAllUsers(10).toPandas()
nrecs = recs.recommendations.apply(pd.Series)\
            .merge(recs, right_index = True, left_index = True)\
            .drop(['recommendations'], axis = 1)\
            .melt(id_vars = ['reviewID_idx'], value_name = 'recommendation')\
            .drop('variable', axis = 1)\
            .dropna()
nrecs = nrecs.sort_values('reviewID_idx')
nrecs = pd.concat([nrecs['recommendation'].apply(pd.Series),
                  nrecs['reviewID_idx']], axis = 1)
nrecs.columns = ['ProductID_index',
                'Rating',
                'UserID_index']

In [26]:
md = data_indexed.select(['reviewerID', 'reviewID_idx', 'asin', 'asin_idx'])
md = md.toPandas()
dict1 = dict(zip(md['reviewID_idx'], md['reviewerID']))
dict2 = dict(zip(md['asin_idx'], md['asin']))
nrecs['reviewerID'] = nrecs['UserID_index'].map(dict1)
nrecs['asin'] = nrecs['ProductID_index'].map(dict2)
nrecs = nrecs.sort_values('reviewerID')
nrecs.reset_index(drop = True, inplace = True)
new = nrecs[['reviewerID', 'asin', 'Rating']]
new['recommendations'] = list(zip(new.asin, new.Rating))
res = new[['reviewerID', 'recommendations']]
res_new = res['recommendations'].groupby([res.reviewerID])\
                                .apply(list).reset_index()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  new['recommendations'] = list(zip(new.asin, new.Rating))


### Make recommendation for some specific Users
A3CJ7MHAS9IMAM, A3LGT6UZL99IW1, A21KNRUAA5RK5E

In [27]:
res_new

Unnamed: 0,reviewerID,recommendations
0,A00473363TJ8YSZ3YAGG9,"[(B003XR480U, 4.710568904876709), (B006OGBTWM,..."
1,A04324702R5O7JRSQNVAH,"[(B002YSPDJ4, 5.483543395996094), (B004JDI1I2,..."
2,A0678500JUN3N7KUG4PS,"[(B002YSPDJ4, 6.062443733215332), (B003XR480U,..."
3,A100UD67AHFODS,"[(B0049UCECE, 5.889972686767578), (B0076BXEVI,..."
4,A100WO06OQR8BQ,"[(B003XR480U, 6.8318562507629395), (B002JG10GG..."
...,...,...
4900,AZQJ85BTRUQV2,"[(B001A4QKMW, 6.2549285888671875), (B0050PNOUS..."
4901,AZU2JDR2GVICQ,"[(B003XR480U, 5.964960098266602), (B004JDI1I2,..."
4902,AZYJE40XW6MFG,"[(B000VKUXHY, 6.64094352722168), (B002IXKD9U, ..."
4903,AZZ5ASC403N74,"[(B001NIIDI4, 6.230529308319092), (B0002T4064,..."


In [28]:
pd.set_option('display.max_colwidth', None)

In [60]:
def customer_recommendation(customer_id):
    recommend = res_new[res_new['reviewerID'] == customer_id]
    for i in recommend['recommendations']:
        print('List of product ID recommended:')
        for j in i:
            print(j[0])

Show the product IDrecommended for specific user.

In [61]:
customer_recommendation('A3CJ7MHAS9IMAM')

List of product ID recommended:
B0006HXQX0
B006LNU00K
B000VKUXHY
B001CD9RYY
B003XR480U
B002JG10GG
B0001XPC7C
B003ZWHTMW
B0076BXEVI
B004JDI1I2


In [62]:
customer_recommendation('A3LGT6UZL99IW1')

List of product ID recommended:
B002JG10GG
B001NIIDI4
B002R0DX0U
B004JDI1I2
B004GGMODU
B000U5HZ12
B004I2EE3K
B0076BXEVI
B003XR480U
B000VKUXHY


In [63]:
customer_recommendation('A21KNRUAA5RK5E')

List of product ID recommended:
B001167XXO
B0002ZQAZW
B003HD03IQ
B002BA5WK0
B004JDI1I2
B003D7NSPG
B004J2U12W
B0006OKKN2
B001A3XXV4
B002YSPDJ4
