In [None]:
import sys
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg,count
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
spark=SparkSession.builder.getOrCreate()

baseDir = os.path.join('input')

ratingsFilename = os.path.join(baseDir,  'ratings.csv')
moviesFilename = os.path.join(baseDir, 'movies.csv')

In [44]:
rating = spark.read.option("header", True).option("delimiter", ",").csv(ratingsFilename)
movie = spark.read.option("header", True).option("delimiter", ",").csv(moviesFilename)

In [118]:
rating=rating.select(rating.userId.cast('int'),rating.movieId.cast('int'),rating.rating.cast('float'))
movie=movie.select(movie.movieId.cast('int'),movie.title)
avgRating=rating.groupBy(col("movieId")).agg(count("*").alias("count"),avg("rating").alias("avgRating"))
sortedAvgRating=avgRating.sort("avgRating", ascending=False)
avgResult=sortedAvgRating.join(movie,sortedAvgRating.movieId==movie.movieId).drop(movie.movieId)
Top10Result=avgResult.limit(10).select("title","count","avgRating").collect()
for row in Top10Result:
    print (row[0],row[1],row[2])

Knockin' on Heaven's Door (1997) 1 5.0
Friends with Kids (2011) 1 5.0
Maelström (2000) 1 5.0
The Earrings of Madame de... (1953) 2 5.0
House Bunny, The (2008) 1 5.0
Step Into Liquid (2002) 2 5.0
Ben X (2007) 1 5.0
Lamerica (1994) 1 5.0
Little Lord Fauntleroy (1936) 1 5.0
Caveman (1981) 1 5.0


In [119]:
training,validation,test=rating.randomSplit([0.7,0.2,0.1])
print ("Training size:",training.count())
print ("Validation size:",training.count())
print ("Test size:",training.count())

Training size: 70104
Validation size: 70104
Test size: 70104


In [121]:
ranks = [4, 8, 12]
error=[0,0,0]
bestRank=-1
minError=float('inf')
err=0
for paramRank in ranks:
    alsEstimator=ALS(maxIter=5,\
                        regParam=0.01,\
                         rank=paramRank,\
                        userCol="userId", itemCol="movieId", ratingCol="rating",\
                        coldStartStrategy="drop")
    alsTransformer=alsEstimator.fit(training)
    predictions=alsTransformer.transform(validation)
    evaluator = RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
    rmse=evaluator.evaluate(predictions)
    error[err]=rmse
    err+=1
    if minError>rmse:
        bestRank=paramRank
        minError=rmse
    print("Model with rank %s has rmse: %s" % (paramRank,rmse))
print ("Best model has rank %s with rmse %s" %(bestRank,minError))

Model with rank 4 has rmse: 1.04203297968696
Model with rank 8 has rmse: 1.1171400173214416
Model with rank 12 has rmse: 1.190646055522231
Best model has rank 4 with rmse 1.04203297968696


In [None]:
alsEstimator=ALS(maxIter=5,\
                        regParam=0.01,\
                         rank=bestRank,\
                        userCol="userId", itemCol="movieId", ratingCol="rating",\
                        coldStartStrategy="drop")
alsTransformer=alsEstimator.fit(training)
predictions=alsTransformer.transform(test)
evaluator = RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
rmse=evaluator.evaluate(predictions)