In [1]:
import pandas as pd
import numpy as np
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/usr/bin/python3'

In [2]:
#movies = pd.read_csv("/home/helen/Documents/Codes/Recommender System/movielens/movie.csv")
#ratings = pd.read_csv("/home/helen/Documents/Codes/Recommender System/movielens/rating.csv")

In [3]:
#def fix_unicode(text):
#    return text.replace(u"\u2019", "'")
#ratings.drop(['timestamp'], axis = 1, inplace = True)

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [5]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
sc = SparkContext()
sqlContext = SQLContext(sc)

In [6]:
#rating = ratings.select('userId', 'movieId', 'rating')
# create train and test set
#rating = sc.textFile("/home/helen/Documents/Codes/Recommender System/movielens/rating.csv")
rating = sqlContext.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/home/helen/Documents/Codes/Recommender System/movielens/rating1.csv')

print(rating)

DataFrame[userId: int, movieId: int, rating: double, timestamp: timestamp]


In [7]:
train, test = rating.randomSplit([0.8, 0.2])

In [8]:
#create ALS model
als = ALS(userCol = "userId", itemCol = "movieId", ratingCol = "rating",
         coldStartStrategy = "drop", nonnegative = True)

In [9]:
#tune model using ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [12,13,14])\
.addGrid(als.maxIter, [18,19,20]).addGrid(als.regParam, [.17, .18, .19]).build()

In [10]:
# define evaluator as RMSE
eva = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")

In [11]:
# Build cross validation
tvs = TrainValidationSplit(
    estimator = als,
    estimatorParamMaps = param_grid,
    evaluator = eva,
    parallelism = 50)

In [12]:
#fit ALS model to training data
#tvs = sc.parallelize(rating)
print(train)
model = tvs.fit(train)

DataFrame[userId: int, movieId: int, rating: double, timestamp: timestamp]


In [13]:
print(model)

TrainValidationSplitModel_577f4cf3c544


In [14]:
best_model = model.bestModel # extract the best model using ParamGridBuilder

In [15]:
predictions = best_model.transform(test)
rmse = eva.evaluate(predictions)

In [16]:
print("Root Mean Square Error = "+ str(rmse))
print("--- Best Model---")
print("Rank: ", best_model.rank)
print("Max Iterator: ", best_model._java_obj.parent().getMaxIter())
print("Regression Parameter: ", best_model._java_obj.parent().getRegParam())

Root Mean Square Error = 0.8484364620056432
--- Best Model---
Rank:  14
Max Iterator:  20
Regression Parameter:  0.17


In [17]:
user_recs = best_model.recommendForAllUsers(10)

In [18]:
def get_recs_for_user(recs):
    recs = recs.select("recommendations.movieId", "recommendations.rating")
    movies = recs.select("movieId").toPandas().iloc[0, 0]
    ratings = recs.select("rating").toPandas().iloc[0, 0]
    ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
    ratings_matrix["ratings"] = ratings
    ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)
    return ratings_matrix_ps

In [19]:
user_recs.count()

6039

In [20]:
recs = user_recs    
recs = recs.select("recommendations.movieId", "recommendations.rating")
movies = recs.select("movieId").toPandas().iloc[0, 0]
ratings = recs.select("rating").toPandas().iloc[0, 0]
ratings_matrix = pd.DataFrame(movies, columns = ["movieId"])
ratings_matrix["ratings"] = ratings
ratings_matrix_ps = sqlContext.createDataFrame(ratings_matrix)

In [21]:
ratings_matrix_ps.count()

10

In [22]:
display(predictions.sort("userId", "rating"))

DataFrame[userId: int, movieId: int, rating: double, timestamp: timestamp, prediction: float]

In [23]:

rec = get_recs_for_user(user_recs)
rec.show()

+-------+------------------+
|movieId|           ratings|
+-------+------------------+
|  59295|3.9272844791412354|
|  67138| 3.692906618118286|
|  94806| 3.549441337585449|
|   8422|3.4268205165863037|
|    873|3.4113528728485107|
|  73413| 3.330717086791992|
|  50703|  3.25600004196167|
|    857|3.2137033939361572|
|  70133|3.1945221424102783|
|  26791| 3.188664674758911|
+-------+------------------+

