In [0]:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Demo").getOrCreate()

This program takes a dataset consisted of 30 users rating 100 movies, with a total of 1500 rating samples. The purpose is to train a recommendation system with the alternating least squares algorithm and test which movies it would recommend to users 9 and 13.

In [0]:
movrdd= spark.sparkContext.textFile("/FileStore/tables/movies.csv", 4)
movrdd.collect()

In [0]:
movdf = spark.read \
  .format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ',') \
  .option("path", "/FileStore/tables/movies.csv") \
  .load()

In [0]:
movdf.display()

movieId,rating,userId
2,3,0
3,1,0
5,2,0
9,4,0
11,1,0
12,2,0
15,1,0
17,1,0
19,1,0
21,1,0


In [0]:
usercntdf=movdf.groupBy('userID').count().sort(col('userID'))
usercntdf=usercntdf.withColumnRenamed('userID','userID_cnt')
usercntdf.display()

userID_cnt,count
0,49
1,49
2,46
3,48
4,55
5,49
6,57
7,54
8,49
9,53


The dataset has 3 columns, being the movieId, rating and userId. There are 1501 rows representing 1501 ratings by all users. 30 users and 100 movies are included.

The following analysis finds top 15 movies with the highest average ratings, top 15 movies with the highest number of ratings, top 10 users providing highest average ratings and top 10 users rated for the most times.

In [0]:
usersumdf=movdf.groupBy('userID').sum('rating').sort(col('userID'))
usersumdf=usersumdf.join(usercntdf,usersumdf.userID==usercntdf.userID_cnt).drop("userID_cnt").sort(col('userID'))
usersumdf.display()

userID,sum(rating),count
0,70,49
1,76,49
2,95,46
3,88,48
4,86,55
5,87,49
6,82,57
7,88,54
8,93,49
9,95,53


In [0]:
useravgdf=usersumdf.withColumn('User_avg', ( usersumdf['sum(rating)'] ) /( usersumdf['count'] )).sort(col('User_avg').desc())
useravgdf.show(10)
#10 users giving highest average ratings

In [0]:
#10 users giving the largest number of ratings
useravgdf.sort(col('count').desc()).show(10)

In [0]:
mvcntdf=movdf.groupBy('movieId').count().sort(col('movieId'))
mvcntdf=mvcntdf.withColumnRenamed('movieId','movieId_cnt')
mvcntdf.display()

movieId_cnt,count
0,16
1,13
2,19
3,13
4,17
5,13
6,20
7,16
8,7
9,16


In [0]:
mvsumdf=movdf.groupBy('movieId').sum('rating').sort(col('movieId'))
mvsumdf=mvsumdf.join(mvcntdf,mvsumdf.movieId==mvcntdf.movieId_cnt).drop("movieId_cnt").sort(col('movieId'))
mvsumdf.display()

movieId,sum(rating),count
0,20,16
1,18,13
2,40,19
3,17,13
4,30,17
5,20,13
6,29,20
7,30,16
8,13,7
9,23,16


In [0]:
mvavgdf=mvsumdf.withColumn('movie_avg', ( mvsumdf['sum(rating)'] ) /( mvsumdf['count'] ))
mvavgdf.sort(col('movie_avg').desc()).show(15)
#15 highest rated movies

In [0]:
#15 mostly rated movies
mvavgdf.sort(col('count').desc()).show(15)

In [0]:
mvavgdf.sort(col('movie_avg').asc()).show(10)

Based on above analysis: 
Top 15 movies with the highest average ratings: 32,90,30,94,23,49,18,29,52,53,62,92,46,68,87, 
Top 15 movies with the highest number of ratings: 6,29,51,22,50,94,55,68,2,15,85,36,86,88,45, 
Top 10 users providing highest average ratings: 11,26,22,23,2,17,8,24,12,3, 
Top 10 users rated for the most times: 6,14,22,11,12,4,7,9,24,23

Next, the ALS based recommender is trained and tuned on hyperparameters.

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, TrainValidationSplit

0.8:0.2 train-test ratio

In [0]:
(train, test) = movdf.randomSplit([0.8, 0.2])
train.show(5)

In [0]:
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
model=als.fit(train)

In [0]:
print('rank:',als._java_obj.getRank())
print('maxIter:',als._java_obj.getMaxIter())
print('regParam:',als._java_obj.getRegParam())
print('numItemBlocks:',als._java_obj.getNumItemBlocks())
print('numUserBlocks:',als._java_obj.getNumUserBlocks())


In [0]:
predictions = model.transform(test)
mse_eval = RegressionEvaluator(metricName= 'mse', labelCol= 'rating',predictionCol= 'prediction')
rmse_eval = RegressionEvaluator(metricName= 'rmse', labelCol= 'rating',predictionCol= 'prediction')
mae_eval= RegressionEvaluator(metricName= 'mae', labelCol= 'rating',predictionCol= 'prediction')
mse = mse_eval.evaluate(predictions)
rmse = rmse_eval.evaluate(predictions)
mae = mae_eval.evaluate(predictions)
print("mse:",mse)
print("rmse:",rmse)
print("mae:",mae)
predictions.show()

0.7:0.3 train-test ratio

In [0]:
(train, test) = movdf.randomSplit([0.7, 0.3])
train.show(5)
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
model=als.fit(train)

In [0]:
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
model=als.fit(train)

In [0]:
predictions = model.transform(test)
mse_eval = RegressionEvaluator(metricName= 'mse', labelCol= 'rating',predictionCol= 'prediction')
rmse_eval = RegressionEvaluator(metricName= 'rmse', labelCol= 'rating',predictionCol= 'prediction')
mae_eval= RegressionEvaluator(metricName= 'mae', labelCol= 'rating',predictionCol= 'prediction')
mse = mse_eval.evaluate(predictions)
rmse = rmse_eval.evaluate(predictions)
mae = mae_eval.evaluate(predictions)
print("mse:",mse)
print("rmse:",rmse)
print("mae:",mae)
predictions.show()

Parameter tunning: Firstly manually decide some of the parameters (rank and regParams) and fix them, for saving computing effort for the further grid searches.

In [0]:
import numpy as np
(train, test) = movdf.randomSplit([0.8, 0.2])
ranks=[8,10,12]
maxIters=[20]
regParams=[0.1,1,10]
numItemBlockss=[10]
numUserBlockss=[10]
bestparams=[]
bestnum=0
bestmodel=None
besterr=99
bestpred=None
count=-1
for arank in ranks:
    for ami in maxIters:
        for arp in regParams:
            for anib in numItemBlockss:
                for anub in numUserBlockss:
                    count+=1
                    myals = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop',
                               rank=arank,maxIter=ami,regParam=arp,numItemBlocks=anib,numUserBlocks=anub)
                    mymodel=myals.fit(train)
                    mypredictions = mymodel.transform(test)
                    myeval = RegressionEvaluator(metricName= 'rmse', labelCol= 'rating',predictionCol= 'prediction')
                    err = myeval.evaluate(mypredictions)
                    if err<besterr:
                        besterr=err
                        bestmodel=mymodel.copy()
                        bestpred=mypredictions
                        bestnum=count  
                        bestparams=[arank,ami,arp,anib,anub]
                    print(count)
print("rmse:",besterr)
print("best parameters [rank,maxIter,regParam,numItemBlocks,numUserBlockss]:",bestparams)

Parameter tunning for remaining parameters fixing the tuned parameters from previous steps;
train-test ratio 8:2, rmse

In [0]:
(train, test) = movdf.randomSplit([0.8, 0.2])
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
parameters=ParamGridBuilder()\
    .addGrid(als.rank,[12])\
    .addGrid(als.maxIter, [20,40])\
    .addGrid(als.regParam, [0.1])\
    .addGrid(als.numItemBlocks, [10,20])\
    .addGrid(als.numUserBlocks, [10,20])\
.build()
eval = RegressionEvaluator(metricName= 'rmse', labelCol= 'rating',predictionCol= 'prediction')
trainvs = TrainValidationSplit(estimator=als,estimatorParamMaps=parameters, evaluator=eval)
cv = CrossValidator(estimator =als,estimatorParamMaps =parameters, evaluator =eval,numFolds=3)
model = trainvs.fit(train)

In [0]:
predictions = model.transform(test)
rmse = eval.evaluate(predictions)
print("rmse:",rmse)
predictions.show()

Parameter tunning based on the roughly tuned parameters from previous steps;
train-test ratio 8:2, mae

In [0]:
(train, test) = movdf.randomSplit([0.8, 0.2])
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
parameters=ParamGridBuilder()\
    .addGrid(als.rank,[12])\
    .addGrid(als.maxIter, [20,40])\
    .addGrid(als.regParam, [0.1])\
    .addGrid(als.numItemBlocks, [10,20])\
    .addGrid(als.numUserBlocks, [10,20])\
.build()
eval = RegressionEvaluator(metricName= 'mae', labelCol= 'rating',predictionCol= 'prediction')
trainvs = TrainValidationSplit(estimator=als,estimatorParamMaps=parameters, evaluator=eval)
cv = CrossValidator(estimator =als,estimatorParamMaps =parameters, evaluator =eval,numFolds=3)
model2 = trainvs.fit(train)

In [0]:
predictions = model2.transform(test)
mae = eval.evaluate(predictions)
print("mae:",mae)
predictions.show()

Parameter tunning based on the roughly tuned parameters from previous steps;
train-test ratio 7:3, rmse

In [0]:
(train, test) = movdf.randomSplit([0.7, 0.3])
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
parameters=ParamGridBuilder()\
    .addGrid(als.rank,[12])\
    .addGrid(als.maxIter, [20,40])\
    .addGrid(als.regParam, [0.1])\
    .addGrid(als.numItemBlocks, [10,20])\
    .addGrid(als.numUserBlocks, [10,20])\
.build()
eval = RegressionEvaluator(metricName= 'rmse', labelCol= 'rating',predictionCol= 'prediction')
trainvs = TrainValidationSplit(estimator=als,estimatorParamMaps=parameters, evaluator=eval)
cv = CrossValidator(estimator =als,estimatorParamMaps =parameters, evaluator =eval,numFolds=3)
model = trainvs.fit(train)

In [0]:
predictions = model.transform(test)
rmse = eval.evaluate(predictions)
print("rmse:",rmse)
predictions.show()

Parameter tunning based on the roughly tuned parameters from previous steps;
train-test ratio 7:3, mae

In [0]:
(train, test) = movdf.randomSplit([0.7, 0.3])
als = ALS(userCol= 'userId',itemCol= 'movieId',ratingCol='rating', coldStartStrategy = 'drop')
parameters=ParamGridBuilder()\
    .addGrid(als.rank,[12])\
    .addGrid(als.maxIter, [20,40])\
    .addGrid(als.regParam, [0.1])\
    .addGrid(als.numItemBlocks, [10,20])\
    .addGrid(als.numUserBlocks, [10,20])\
.build()
eval = RegressionEvaluator(metricName= 'mae', labelCol= 'rating',predictionCol= 'prediction')
trainvs = TrainValidationSplit(estimator=als,estimatorParamMaps=parameters, evaluator=eval)
cv = CrossValidator(estimator =als,estimatorParamMaps =parameters, evaluator =eval,numFolds=3)
model2 = trainvs.fit(train)

In [0]:
predictions = model2.transform(test)
mae = eval.evaluate(predictions)
print("mae:",mae)
predictions.show()

Finally, we test out the best model from last part on users 9 and 13.

In [0]:
def missedInts(left,right,existInts):
    allints=list(range(left,right+1))
    retints=[]
    for aint in allints:
        if not aint in existInts:
            retints.append(aint)
    return retints
missedInts(0,10,[1,2,5,9])

In [0]:
import numpy as np
u9rated=movdf.where(movdf['userId']==9)
u9ratedmvs=u9rated.select('movieId').rdd.flatMap(lambda x: x).collect()
u9missedmvs=missedInts(0,99,u9ratedmvs)
nummissed=len(u9missedmvs)
missed2d=np.zeros([nummissed,2])
missed2d[:,0]=u9missedmvs
missed2d[:,1]=np.ones(nummissed)*9
missed2d=missed2d.astype(int)
columns=['movieId','userId']
u9misseddf= spark.createDataFrame(missed2d.tolist(), columns)
u9misseddf.show()

In [0]:
u9preds = model2.transform(u9misseddf)
u9preds=u9preds.sort(col('prediction').desc())
u9preds.display(10)

movieId,userId,prediction
62,9,3.0348744
46,9,2.968433
17,9,2.9584653
23,9,2.9104004
13,9,2.8551896
29,9,2.8470316
27,9,2.6728992
55,9,2.645823
65,9,2.6343143
48,9,2.56945


In [0]:
u9rec10=u9preds.select('movieId').rdd.flatMap(lambda x: x).collect()
print("top 10 movies recommended to user 9:")
u9rec10[0:10]

In [0]:
predictions = model.transform(test)
rmse = eval.evaluate(predictions)
print("rmse:",rmse)
predictions.show()

In [0]:
u13rated=movdf.where(movdf['userId']==13)
u13ratedmvs=u13rated.select('movieId').rdd.flatMap(lambda x: x).collect()
u13missedmvs=missedInts(0,99,u13ratedmvs)
nummissed=len(u13missedmvs)
missed2d=np.zeros([nummissed,2])
missed2d[:,0]=u13missedmvs
missed2d[:,1]=np.ones(nummissed)*13
missed2d=missed2d.astype(int)
columns=['movieId','userId']
u13misseddf= spark.createDataFrame(missed2d.tolist(), columns)
u13misseddf.show()

In [0]:
u13preds = model2.transform(u13misseddf)
u13preds=u13preds.sort(col('prediction').desc())
u13preds.display()

movieId,userId,prediction
30,13,2.3758736
2,13,2.3649511
41,13,2.269994
70,13,2.2300198
69,13,2.1825686
32,13,2.1633108
75,13,2.1183453
92,13,1.9785659
76,13,1.9566662
8,13,1.8824189


In [0]:
u13rec10=u13preds.select('movieId').rdd.flatMap(lambda x: x).collect()
print("top 10 movies recommended to user 13:")
u13rec10[0:10]