In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()

In [None]:
data = spark.read.json('data/Musical_Instruments_5.json')

In [None]:
data.show(5, truncate=True)

+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|      asin| helpful|overall|          reviewText| reviewTime|    reviewerID|        reviewerName|             summary|unixReviewTime|
+----------+--------+-------+--------------------+-----------+--------------+--------------------+--------------------+--------------+
|1384719342|  [0, 0]|    5.0|Not much to write...|02 28, 2014|A2IBPI20UZIR0U|cassandra tu "Yea...|                good|    1393545600|
|1384719342|[13, 14]|    5.0|The product does ...|03 16, 2013|A14VAT5EAX3D9S|                Jake|                Jake|    1363392000|
|1384719342|  [1, 1]|    5.0|The primary job o...|08 28, 2013|A195EZSQDW3E21|Rick Bennette "Ri...|It Does The Job Well|    1377648000|
|1384719342|  [0, 0]|    5.0|Nice windscreen p...|02 14, 2014|A2C00NNG1ZQQG2|RustyBill "Sunday...|GOOD WINDSCREEN F...|    1392336000|
|1384719342|  [0, 0]|    5.0|This pop filter i...|02 21

In [None]:
data_sub = data.select(['asin', 'overall', 'reviewerID'])

In [None]:
data_sub.count()

10261

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import isnan, when, count, col

In [None]:
data_sub.show(5, truncate = True)

+----------+-------+--------------+
|      asin|overall|    reviewerID|
+----------+-------+--------------+
|1384719342|    5.0|A2IBPI20UZIR0U|
|1384719342|    5.0|A14VAT5EAX3D9S|
|1384719342|    5.0|A195EZSQDW3E21|
|1384719342|    5.0|A2C00NNG1ZQQG2|
|1384719342|    5.0| A94QU4C90B1AX|
+----------+-------+--------------+
only showing top 5 rows



In [None]:
data_sub.select([count(when(col(c).isNull(), c)).alias(c) for c in
                 data_sub.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0


In [None]:
# Distinct users and movies
users = data_sub.select('reviewerID').distinct().count()
products = data_sub.select('asin').distinct().count()
numerator = data_sub.count()

In [None]:
display(numerator, users, products)

10261

1429

900

In [None]:
# Number of rating matrix could contain if no empty cells
denominator = users * products
denominator

1286100

In [None]:
# Calculating sparsity 
sparsity = 1 - (numerator * 1.0 / denominator)
print('Sparsity: ', sparsity)

Sparsity:  0.992021615737501


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
# Converting String to index
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = StringIndexer(inputCol = 'asin',
                        outputCol = 'asin_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(data_sub)

# Indexer creates a new column with numeric index values
data_indexed = indexer_model.transform(data_sub)

# Repeat the process for the other categorical feature
indexer1 = StringIndexer(inputCol = 'reviewerID',
                         outputCol = 'reviewerID_idx')
indexer1_model = indexer1.fit(data_indexed)
data_indexed = indexer1_model.transform(data_indexed)

In [None]:
data_indexed.show(5, truncate = True)

+----------+-------+--------------+--------+--------------+
|      asin|overall|    reviewerID|asin_idx|reviewerID_idx|
+----------+-------+--------------+--------+--------------+
|1384719342|    5.0|A2IBPI20UZIR0U|   781.0|          72.0|
|1384719342|    5.0|A14VAT5EAX3D9S|   781.0|         359.0|
|1384719342|    5.0|A195EZSQDW3E21|   781.0|         436.0|
|1384719342|    5.0|A2C00NNG1ZQQG2|   781.0|        1216.0|
|1384719342|    5.0| A94QU4C90B1AX|   781.0|        1137.0|
+----------+-------+--------------+--------+--------------+
only showing top 5 rows



In [None]:
data_indexed.select([count(when(col(c).isNull(),c)).alias (c) for c in 
                    data_indexed.columns]).toPandas().T

Unnamed: 0,0
asin,0
overall,0
reviewerID,0
asin_idx,0
reviewerID_idx,0


In [None]:
# Smaller dataset so we will use 0.8/0.2
training, test = data_indexed.randomSplit([0.8,0.2])

In [None]:
# Creating ALS model and fitting data
als = ALS(maxIter = 5, 
          regParam=0.09,
          rank=25, 
          userCol='reviewerID_idx', 
          itemCol = 'asin_idx',
          ratingCol = 'overall',
          coldStartStrategy='drop',
          nonnegative=True)
model = als.fit(training)

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

In [None]:
predictions.select(['asin_idx', 'reviewerID_idx', 
                    'overall', 'prediction']).show(5)

+--------+--------------+-------+----------+
|asin_idx|reviewerID_idx|overall|prediction|
+--------+--------------+-------+----------+
|   148.0|        1161.0|    5.0| 3.8815749|
|   148.0|         482.0|    5.0|  3.950369|
|   463.0|        1165.0|    4.0| 4.1057177|
|   471.0|         264.0|    4.0| 3.4301744|
|   833.0|         692.0|    5.0|    4.4717|
+--------+--------------+-------+----------+
only showing top 5 rows



In [None]:
evaluator = RegressionEvaluator(metricName = 'rmse', 
                                labelCol = 'overall',
                                predictionCol = 'prediction')
rmse = evaluator.evaluate(predictions)
print('Root-mean-square error = ' + str(rmse))

Root-mean-square error = 1.2016281746300879


In [None]:
# On average,  this model is ~ 1.2 from perfect recommendations

## Providing Recommendations: for all users

In [None]:
# get 20 recommendations which have highest rating
user_recs = model.recommendForAllUsers(20)

In [None]:
for user in user_recs.head(5):
    print(user)
    print('\n')

Row(reviewerID_idx=471, recommendations=[Row(asin_idx=603, rating=6.210137844085693), Row(asin_idx=359, rating=6.096579074859619), Row(asin_idx=773, rating=6.070028305053711), Row(asin_idx=707, rating=6.059390068054199), Row(asin_idx=90, rating=6.018151760101318), Row(asin_idx=738, rating=6.003474235534668), Row(asin_idx=328, rating=6.000361442565918), Row(asin_idx=491, rating=5.979040145874023), Row(asin_idx=609, rating=5.973596572875977), Row(asin_idx=698, rating=5.967898368835449), Row(asin_idx=865, rating=5.929290294647217), Row(asin_idx=416, rating=5.923426151275635), Row(asin_idx=368, rating=5.902251243591309), Row(asin_idx=345, rating=5.901122093200684), Row(asin_idx=731, rating=5.877236843109131), Row(asin_idx=689, rating=5.862932205200195), Row(asin_idx=788, rating=5.843459606170654), Row(asin_idx=162, rating=5.841752529144287), Row(asin_idx=581, rating=5.839955806732178), Row(asin_idx=852, rating=5.807944297790527)])


Row(reviewerID_idx=1342, recommendations=[Row(asin_idx=85

## Converting back to string form

In [None]:
import pandas as pd
recs = model.recommendForAllUsers(10).toPandas()
nrecs = recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(['recommendations'], axis = 1) \
            .melt(id_vars = ['reviewerID_idx'], value_name = 'recommendation') \
            .drop('variable', axis = 1) \
            .dropna()
nrecs = nrecs.sort_values('reviewerID_idx')
nrecs = pd.concat([nrecs['recommendation'].apply(pd.Series),
                   nrecs['reviewerID_idx']], axis = 1)
nrecs.columns = [
    'ProductID_index',
    'Rating', 
    'UserID_index'
]

In [None]:
md = data_indexed.select(['reviewerID', 'reviewerID_idx',
                          'asin', 'asin_idx'])
md = md.toPandas()
dict1 = dict(zip(md['reviewerID_idx'], md['reviewerID']))
dict2 = dict(zip(md['asin_idx'], md['asin']))
nrecs['reviewerID'] = nrecs['UserID_index'].map(dict1)
nrecs['asin'] = nrecs['ProductID_index'].map(dict2)
nrecs = nrecs.sort_values('reviewerID')
nrecs.reset_index(drop = True, inplace = True)
new = nrecs[['reviewerID','asin', 'Rating']]
new['recommendations'] = list(zip(new.asin, new.Rating))
res = new[['reviewerID', 'recommendations']]
res_new = res['recommendations'].groupby([res.reviewerID])\
                                .apply(list).reset_index()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  # This is added back by InteractiveShellApp.init_path()


In [None]:
res_new

Unnamed: 0,reviewerID,recommendations
0,A00625243BI8W1SSZNLMD,"[(B002GHBZ4U, 6.518843650817871), (B009S814U0,..."
1,A10044ECXDUVKS,"[(B000RY68PA, 5.182676792144775), (B000RYPN38,..."
2,A102MU6ZC9H1N6,"[(B0002D0B4K, 5.885778427124023), (B00AHEWBM4,..."
3,A109JTUZXO61UY,"[(B003AJMPW4, 6.1314873695373535), (B003S3S0DU..."
4,A109ME7C09HM2M,"[(B0002D0B4K, 5.673584938049316), (B002GHBZ4U,..."
...,...,...
1424,AZJPNK73JF3XP,"[(B000EPVXWU, 5.508548259735107), (B000RY68PA,..."
1425,AZMHABTPXVLG3,"[(B0002E4Z8M, 3.9464001655578613), (B0002D0B4K..."
1426,AZMIKIG4BB6BZ,"[(B002N4GBLI, 5.653383731842041), (B001RNH8YA,..."
1427,AZPDO6FLSMLFP,"[(B008GS3XLQ, 5.188046455383301), (B0002E4Z8M,..."
