In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Recommendation_system') \
        .getOrCreate()

In [None]:
df = spark.read.json("/FileStore/tables/Musical_Instruments_5.json")

In [None]:
nd=df.select(df['asin'],df['overall'],df['reviewerID']) #asin : productid

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [None]:
#ALS only accepts integer value as parameters.  Hence we need to convert asin and reviewerID columns into index form.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [None]:
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list( set(nd.columns) - set(['overall']) ) ]

In [None]:
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(nd).transform(nd)

In [None]:
(train,test) = transformed.randomSplit([0.8, 0.2])

In [None]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerID_index", \
        itemCol="asin_index",ratingCol="overall",coldStartStrategy="drop",nonnegative=True)

model=als.fit(train)

In [None]:
predictions = model.transform(test)

In [None]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="overall",predictionCol="prediction")

In [None]:
rmse=evaluator.evaluate(predictions)
#print("RMSE="+str(rmse))

In [None]:
recs_to_each_user = model.recommendForAllUsers(5)

In [None]:
#recs_to_each_user.printSchema()

In [None]:
#Converting back to string form from StringIndexer

In [None]:
import pandas as pd

In [None]:
rec_df = recs_to_each_user.toPandas()

In [None]:
#type(rec_df)
#type(rec_df.recommendations[0]) 
#type(rec_df.recommendations[0][1])

In [None]:
nrecs=rec_df.recommendations.apply(pd.Series) \
             .merge(rec_df, right_index = True, left_index = True) \
             .drop(["recommendations"], axis = 1) \
             .melt(id_vars = ['reviewerID_index'], value_name = "recommendation") \
             .drop("variable", axis = 1) \
             .dropna() 

In [None]:
nrecs=nrecs.sort_values('reviewerID_index')

nrecs=pd.concat(  [nrecs['recommendation'].apply(pd.Series) ,  nrecs['reviewerID_index']]  ,  axis = 1   )

In [None]:
#nrecs.head(3)

In [None]:
nrecs.columns = [ 'ProductID_index', 'Rating', 'UserID_index' ]

In [None]:
#nrecs.head(3)

In [None]:
md=transformed.select(transformed['reviewerID'],transformed['reviewerID_index'],transformed['asin'],transformed['asin_index'])

In [None]:
#md.head(5) 
md = md.toPandas()
#type(md)

In [None]:
dict1 = dict(  zip( md['reviewerID_index'] , md['reviewerID'] )  )
dict2 = dict(  zip( md['asin_index'] , md['asin'] )   )

In [None]:
nrecs['reviewerID']=nrecs['UserID_index'].map(dict1)
nrecs['asin']=nrecs['ProductID_index'].map(dict2)

In [None]:
#nrecs.head(5)

In [None]:
nrecs=nrecs.sort_values('reviewerID')
nrecs.reset_index(drop=True, inplace=True)

In [None]:
new=nrecs[['reviewerID','asin','Rating']]

In [None]:
new['recommendations'] = list(zip(new.asin, new.Rating))

In [None]:
#new.head(5)

In [None]:
res = new[['reviewerID','recommendations']]  

res_new = res['recommendations'].groupby([res.reviewerID]).apply(list).reset_index()

#res_new.head(10)