In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf

sc = SparkContext(master = 'local', appName = 'amazon grocery and gourment food')

spark = SparkSession(sc)

In [6]:
data = spark.read.csv("ratings_Grocery_and_Gourmet_Food.csv", header = False, inferSchema= True)

In [7]:
data.show(5)

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A1ZQZ8RJS1XVTX|0657745316|5.0|1381449600|
|A31W38VGZAUUM4|0700026444|5.0|1354752000|
|A3I0AV0UJX5OH0|1403796890|1.0|1385942400|
|A3QAAOLIXKV383|1403796890|3.0|1307836800|
| AB1A5EGHHVA9M|141278509X|5.0|1332547200|
+--------------+----------+---+----------+
only showing top 5 rows



In [8]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: integer (nullable = true)



In [9]:
data.select('_c1').distinct().show(5)

+----------+
|       _c1|
+----------+
|B0000D1699|
|B0000D17HO|
|B0000D89E8|
|B0000D94PL|
|B0000DC3QJ|
+----------+
only showing top 5 rows



In [10]:
data.filter(data['_c0'] == 'A2BSUJYATHI7WW').show()

+--------------+----------+---+----------+
|           _c0|       _c1|_c2|       _c3|
+--------------+----------+---+----------+
|A2BSUJYATHI7WW|B002YRBALU|4.0|1395619200|
|A2BSUJYATHI7WW|B007GPARA0|4.0|1395619200|
+--------------+----------+---+----------+



In [11]:
data = data.select('_c0', '_c2', '_c1').withColumnRenamed('_c0', 'reviewerID').withColumnRenamed('_c2', 'overall').withColumnRenamed('_c1', 'asin')

In [12]:
data.show(5)

+--------------+-------+----------+
|    reviewerID|overall|      asin|
+--------------+-------+----------+
|A1ZQZ8RJS1XVTX|    5.0|0657745316|
|A31W38VGZAUUM4|    5.0|0700026444|
|A3I0AV0UJX5OH0|    1.0|1403796890|
|A3QAAOLIXKV383|    3.0|1403796890|
| AB1A5EGHHVA9M|    5.0|141278509X|
+--------------+-------+----------+
only showing top 5 rows



In [13]:
from pyspark.sql.functions import *

data.select([count(when(isnan(col), col)).alias(col) for col in data.columns]).show()

+----------+-------+----+
|reviewerID|overall|asin|
+----------+-------+----+
|         0|      0|   0|
+----------+-------+----+



In [14]:
data.select([count(when(isnull(col), col)).alias(col) for col in data.columns]).show()

+----------+-------+----+
|reviewerID|overall|asin|
+----------+-------+----+
|         0|      0|   0|
+----------+-------+----+



In [15]:
data.count() - data.distinct().count()

0

In [16]:
number_of_users = data.select('reviewerID').distinct().count()
number_of_products = data.select('asin').distinct().count()
number_of_ratings = data.count()

In [17]:
number_of_users

768438

In [18]:
number_of_products

166049

In [19]:
number_of_ratings

1297156

In [20]:
sparsity = 1 - (number_of_ratings/(number_of_users*number_of_products))
sparsity

0.9999898340700841

In [21]:
from pyspark.ml.feature import StringIndexer

indexer1 = StringIndexer(inputCol = 'asin', outputCol = 'asin_idx')
indexer2 = StringIndexer(inputCol = 'reviewerID', outputCol = 'reviewerID_idx')

data_indexed = indexer1.fit(data).transform(data)
data_indexed = indexer2.fit(data_indexed).transform(data_indexed)

In [22]:
data_indexed.show(5, False)

+--------------+-------+----------+--------+--------------+
|reviewerID    |overall|asin      |asin_idx|reviewerID_idx|
+--------------+-------+----------+--------+--------------+
|A1ZQZ8RJS1XVTX|5.0    |0657745316|122279.0|165912.0      |
|A31W38VGZAUUM4|5.0    |0700026444|157556.0|750032.0      |
|A3I0AV0UJX5OH0|1.0    |1403796890|88468.0 |201813.0      |
|A3QAAOLIXKV383|3.0    |1403796890|88468.0 |45441.0       |
|AB1A5EGHHVA9M |5.0    |141278509X|127327.0|758035.0      |
+--------------+-------+----------+--------+--------------+
only showing top 5 rows



In [38]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

train,test = data_indexed.randomSplit([0.8, 0.2])

als = ALS(maxIter = 10, regParam = 0.1, rank = 25,
         userCol = "reviewerID_idx",
         itemCol = "asin_idx",
         ratingCol = "overall", coldStartStrategy = "drop", nonnegative = True)
model = als.fit(train)

In [39]:
predictions = model.transform(test)

predictions.select('overall', 'prediction').show(5)

+-------+----------+
|overall|prediction|
+-------+----------+
|    5.0| 3.8570075|
|    5.0| 3.2644181|
|    5.0| 3.7489128|
|    2.0| 1.3912433|
|    4.0| 1.7047215|
+-------+----------+
only showing top 5 rows



In [40]:
evaluator = RegressionEvaluator(metricName = 'rmse',
                               labelCol = 'overall',
                               predictionCol = "prediction")
rmse = evaluator.evaluate(predictions)
rmse

1.7156368893410001

With maxiter = 20, I got the result which is 1.59

In [41]:
model.save('ALS_new')

In [3]:
from pyspark.ml.recommendation import ALSModel

In [4]:
model_new = ALSModel.load('ALS')

In [5]:
user_recs = model_new.recommendForAllUsers(10)

In [None]:
import pandas as pd
recs=model_new.recommendForAllUsers(10).toPandas()
nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['reviewerID_idx'], value_name = "recommendation") \
            .drop("variable", axis = 1) \
            .dropna() 
nrecs=nrecs.sort_values('reviewerID_idx')
nrecs=pd.concat([nrecs['recommendation'].apply(pd.Series), nrecs['reviewerID_idx']], axis = 1)
nrecs.columns = [
        'ProductID_idx',
        'Rating',
        'UserID_idx'
       ]
md=transformed.select(transformed['reviewerID'],transformed['reviewerID_idx'],transformed['asin'],transformed['asin_idx'])
md=md.toPandas()
dict1 =dict(zip(md['reviewerID_idx'],md['reviewerID']))
dict2=dict(zip(md['asin_idx'],md['asin']))
nrecs['reviewerID']=nrecs['UserID_idx'].map(dict1)
nrecs['asin']=nrecs['ProductID_idx'].map(dict2)
nrecs=nrecs.sort_values('reviewerID')
nrecs.reset_index(drop=True, inplace=True)
new=nrecs[['reviewerID','asin','Rating']]
new['recommendations'] = list(zip(new.asin, new.Rating))
res=new[['reviewerID','recommendations']]  
res_new=res['recommendations'].groupby([res.reviewerID]).apply(list).reset_index()
print(res_new)

I trained this model 3 times and obtained the results with RMSE being 1.59, 1.63, 1.71 and make predictions.
However, it ran out of my resources when I tried to convert dataframe of recommendation for all users. So, I can not give the final results due to my weak computer. However, I have codes for the chance that we can build the system which have master computer and slave computers.

The above codes can give us the pandas dataframe which provides reviewerID and recommendations. Furthermore, recommendations column contains tuples of products and predicted ratings. 

# Predict for reviewer: A2BSUJYATHI7WW

# Predict for customer: A26LKBXTSIHQV2

# Predict for customer: A3ABZBEG3KZ0L

Based on the results of above code, we can find out tuples of ratings and related products