In [1]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrameNaFunctions as DFna
from pyspark.sql.functions import udf, col, when
import matplotlib.pyplot as plt
import pyspark as ps
import os


spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("building recommender") \
            .getOrCreate()
            
sc = spark.sparkContext

In [2]:
# read movies CSV
movies_df = spark.read.csv('data/movies/movies.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [3]:
print("line count: {}".format(movies_df.count()))

line count: 9125


In [4]:
# read ratings CSV
ratings_df = spark.read.csv('data/movies/ratings.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?
ratings_df.printSchema()



root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [5]:
ratings = ratings_df.rdd

numRatings = ratings.count()
numUsers = ratings.map(lambda r: r[0]).distinct().count()
numMovies = ratings.map(lambda r: r[1]).distinct().count()

print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

Got 100004 ratings from 671 users on 9066 movies.


In [6]:
movies_counts = ratings_df.groupBy(col("movieId")).agg(F.count(col("rating")))
movies_counts.show()

+-------+-------------+
|movieId|count(rating)|
+-------+-------------+
|   1580|          190|
|   2659|            3|
|   3794|            5|
|   3175|           65|
|    471|           49|
|   1088|           53|
|   1342|           17|
|   1645|           60|
|   2366|           23|
|   6620|           17|
|   8638|           17|
|  96488|            4|
| 160563|            2|
|   7982|            3|
|   1238|           17|
|   1959|           30|
|    463|            7|
|   2122|           11|
|   1591|           15|
|   5518|            1|
+-------+-------------+
only showing top 20 rows



Show's .5 increment ratings being chosen with less frequency than whole number values. This is an indicator that perhaps people don't know they can rate in increments of .5. Perhaps this is more indicative of a interface problem with the website rather than the users actual opinion of the movie

In [7]:
# ratingsRDD = ratings_df.rdd.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
#                                      rating=float(p[2])))


OK let's get something working!
First I will split the data in to a training, validation and test set

In [8]:
ratings_df.take(3)

[Row(userId=1, movieId=31, rating=2.5, timestamp=1260759144),
 Row(userId=1, movieId=1029, rating=3.0, timestamp=1260759179),
 Row(userId=1, movieId=1061, rating=3.0, timestamp=1260759182)]

In [9]:
training_RDD, validation_RDD, test_RDD = ratings_df.randomSplit([.6, .2, .2], seed=0L)
# training_RDD = training_RDD.rdd.cache()
# validation_for_predict_RDD = validation_RDD.rdd.map(lambda x: (x[0], x[1])).cache()
# test_for_predict_RDD = test_RDD.rdd.map(lambda x: (x[0], x[1])).cache()
training_RDD

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.sql import Row
import numpy as np
import math

In [11]:
seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = range(4, 12)
errors = []
err = 0
tolerance = 0.02

In [12]:
min_error = float('inf')
best_rank = -1
best_iteration = -1

def nan_as_0(x):
    if math.isnan(x):
        return 0
    else:
        return x

for rank in ranks:
    als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training_RDD)
    predictions = model.transform(validation_RDD)
#     predictions = predictions.withColumn("prediction", nan_as_0(col("prediction")))
#     new_predictions = DFna(predictions).fill(2.5, ['prediction'])
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    errors.append(rmse)

    print 'For rank %s the RMSE is %s' % (rank, rmse)
    if rmse < min_error:
        min_error = rmse
        best_rank = rank
print 'The best model was trained with rank %s' % best_rank



For rank 4 the RMSE is 0.948992716752
For rank 5 the RMSE is 0.948618374438
For rank 6 the RMSE is 0.945806714537
For rank 7 the RMSE is 0.953022789405
For rank 8 the RMSE is 0.946677108696
For rank 9 the RMSE is 0.956961948065
For rank 10 the RMSE is 0.952181780456
For rank 11 the RMSE is 0.950707813636
The best model was trained with rank 6


In [13]:
# Now we will build a CrossValidator to evaluate over a range of hyper parameters
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# The Estimator we will be using is ALS
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 2 values for als.regParam and 8 values for als.rank,
# this grid will have 2 x 8 = 16 parameter settings for CrossValidator to choose from.

als = ALS(maxIter=iterations, regParam=regularization_parameter, rank=rank, userCol="userId", itemCol="movieId", ratingCol="rating")
paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.1, 0.01]) \
    .addGrid(als.rank, range(4, 12)) \
    .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)
cvModel = crossval.fit(training_RDD)

In [14]:
predictions = cvModel.transform(test_RDD)
new_predictions = predictions.filter(col('prediction') != np.nan)
rmse = evaluator.evaluate(new_predictions)
print "the rmse for optimal grid parameters with cross validation is: {}".format(rmse)


the rmse for optimal grid parameters with cross validation is: 0.950506349727


In [None]:
final_als = ALS(maxIter=10, regParam=0.1, rank=6, userCol="userId", itemCol="movieId", ratingCol="rating")
final_model = final_als.fit(training_RDD)
final_pred = final_model.transform(test_RDD)
final_pred = final_pred.filter(col('prediction') != np.nan)
rmse = evaluator.evaluate(final_pred)

In [None]:
rmse