In [1]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

myrdd=sc.textFile("/FileStore/tables/fine_foods_csv_comma_cleaned.csv")

myrdd=myrdd.filter(lambda x: x.split(',')[1]!='userID') #strip header

(train,test)=myrdd.randomSplit([0.7,0.3])

avg=test.map(lambda x:float(x.split(',')[6])).mean() #mean review score on test set

var=test.map(lambda x: (float(x.split(',')[6])-avg)**2).mean() #test variance

intmax=2**31-1

ratings_train=train.map(lambda x:Rating(hash(x.split(',')[1]) % intmax,hash(x.split(',')[0]) % intmax,float(x.split(',')[6])/5))
#Rating expects an integer, but the user ids are strings, so we put the userids and productids into a hash function. the scores are casted to floats.
ratings_test=test.map(lambda x:Rating(hash(x.split(',')[1]) % intmax,hash(x.split(',')[0]) % intmax,float(x.split(',')[6])/5))

rank = 50 #important tuning parameter
numIterations = 40 #important tuning parameter
model = ALS.train(ratings_train, rank, numIterations)

print(model)

# Evaluate the model on training data
testdata = ratings_test.map(lambda p: (p[0], p[1])) #just get userId and productId
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) #userId, productId, score
ratesAndPreds = ratings_test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

print("Our model performs ", 1 - MSE/var, "percent better than predicting the mean review score")

not_most_frequent=myrdd.filter(lambda x: x.split(',')[1]!='A3OXHLG6DIBRW8') #get all the rows in the dataset in which the user A3OXHLG6DIBRW8, the user with the most reviews and who hereafter will be called 'user X', does not appear.

products=not_most_frequent.map(lambda x: x.split(',')[0]) #get the product ids of products user X did not review.

products=products.distinct()

products2=products.map(lambda x:[x,hash(x) % intmax]) #to help recover productid from hashed productid.

products=products.map(lambda x:[hash('A3OXHLG6DIBRW8') % intmax,hash(x) % intmax])

preds=model.predictAll(products)

def get_max2(x,y): # function to get the predicted highest score for user X
  if x[2]>y[2]:
    return(x)
  else:
    return(y)
  
rec=preds.reduce(get_max2) #get the score of the model's most recommended product for User X.

recid=products2.filter(lambda x:x[1]==rec[1]) #get the productid of the most recommended product from the hash of its productid

print('The most frequent reviewer is predicted to give the product with product id' + str(recid.collect()[0][0]) + 'a score of' + str(rec))

