In [1]:
import findspark
findspark.init('/Users/ying/spark-2.3.2-bin-hadoop2.7/')

In [2]:
import warnings
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline, Transformer

In [3]:
# Do not show warnings
warnings.filterwarnings('ignore')

In [4]:
class DropNAPredictions(Transformer):
    '''Drop data with NAs in the prediction. Used as part of the pipeline'''
    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna()
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

In [5]:
# Initiate a SparkSession, which is the main entry point for Dataframe and SQL functionality.
spark = SparkSession.builder.getOrCreate()

In [6]:
# read dataframe 
ratings = spark.read.csv('../data/subset/rating_subset_600.csv', sep=',', inferSchema=True,
                          header=True)

In [7]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



#### Hyperparameter tuning with gridsearch

In [8]:
als = ALS(userCol='userId',
          itemCol='movieId',
          ratingCol='rating',
          seed=42)

In [9]:
als_paramgrid = (ParamGridBuilder()
                 .addGrid(als.rank, [6, 8, 10])    
                 .addGrid(als.maxIter, [10, 15])
                 .addGrid(als.regParam, [0.1, 0.2])
                 .build())

In [10]:
# The use RMSE as the metrics for determining the best model
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [11]:
# Define pipeline
# The prediction is NA for users or movies that are not used for training. 
# Need to reomve the NAs for evaluation purpose.
pipeline = Pipeline(stages=[als, DropNAPredictions()])

In [12]:
# setting up cross validation
als_cv = CrossValidator(estimator=pipeline,
                        estimatorParamMaps=als_paramgrid,
                        evaluator=evaluator,
                        numFolds=3, 
                        seed=42)

In [13]:
# train the model
als_cv_model=als_cv.fit(ratings)

In [14]:
# Retrieve the best model
als_best = als_cv_model.bestModel

In [15]:
# For some reason you have to add _java_obj.parent() to get parameters of the model.
print('rank:', als_best.stages[0].rank)
print('maxIter:', als_best.stages[0]._java_obj.parent().getMaxIter())
print('regParam:', als_best.stages[0]._java_obj.parent().getRegParam())

rank: 6
maxIter: 10
regParam: 0.2


In [16]:
# Average RMSE of the best model
min(als_cv_model.avgMetrics)

0.9152209797711998