In [1]:
# import libraries

from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



In [2]:
# make a SparkSession object

spark = (SparkSession
         .builder
         .appName("MoviesALS")
         .config("spark.driver.host", "localhost")
         .getOrCreate())

In [3]:
# read json file

movie_ratings = spark.read.json('data/ratings.json')

In [4]:
movie_ratings.dtypes

[('movie_id', 'bigint'),
 ('rating', 'bigint'),
 ('timestamp', 'double'),
 ('user_id', 'bigint')]

In [5]:
movie_ratings.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- rating: long (nullable = true)
 |-- timestamp: double (nullable = true)
 |-- user_id: long (nullable = true)



In [6]:
# cast to Pandas dataframe

movies_df = movie_ratings.select('*').toPandas()

In [7]:
#movies_df.head()

In [8]:
movies_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 719949 entries, 0 to 719948
Data columns (total 4 columns):
movie_id     719949 non-null int64
rating       719949 non-null int64
timestamp    719949 non-null float64
user_id      719949 non-null int64
dtypes: float64(1), int64(3)
memory usage: 22.0 MB


In [9]:
#check what are different years in the timstamp column

pd.to_datetime(movies_df.timestamp.astype(int)).dt.year.value_counts()

1970    719949
Name: timestamp, dtype: int64

In [10]:
#check years

import datetime
date = datetime.datetime.fromtimestamp(movies_df.timestamp[7777])
date

datetime.datetime(2000, 4, 28, 8, 8, 46)

In [11]:
movie_ratings = movie_ratings.drop('timestamp')

In [12]:
#make a split of data
(training, test) = movie_ratings.randomSplit([.8, .2])

In [13]:
#create ALS instance

als = ALS(maxIter=10,
          rank=10,
          userCol='user_id',
          itemCol='movie_id',
          ratingCol='rating')

In [14]:
#fit the model

model = als.fit(training)

In [15]:
# evaluate the model

predictions = model.transform(test)
predictions.persist()

DataFrame[movie_id: bigint, rating: bigint, user_id: bigint, prediction: float]

In [17]:
predictions.show(5)

+--------+------+-------+----------+
|movie_id|rating|user_id|prediction|
+--------+------+-------+----------+
|     148|     3|   4784| 2.8403747|
|     463|     3|   4858| 2.3031218|
|     463|     4|   4277|  3.228796|
|     463|     2|   5306| 2.4464169|
|     463|     3|   2210| 3.0743353|
+--------+------+-------+----------+
only showing top 5 rows



In [18]:
pred_df = predictions.select('*').toPandas()

def user_average(user, df):
    """Return average score for user"""
    user_df = df[df['user_id'] == user]
    average = user_df['prediction'].mean()
    if np.isnan(average):
        return 3
    else:
        return average

def compute_user_average_if_null(row):
    """Check if value is null, if so, replace with user average"""
    if np.isnan(row['prediction']):
        return user_average(row['user_id'], pred_df)
    else:
        return row['prediction']    

for i, row in pred_df[pred_df['prediction'].isna()].iterrows():
    pred_df.loc[i, 'prediction'] = compute_user_average_if_null(row)    

predictions = spark.createDataFrame(pred_df)

In [19]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                               predictionCol='prediction')

In [20]:
rmse = evaluator.evaluate(predictions)

In [21]:
print(rmse)

0.8743097890338952


In [22]:
#create a parameter grid
params_score = {}

params = (ParamGridBuilder()
          .addGrid(als.regParam, [1, 0.01, 0.001, 0.1])
          .addGrid(als.maxIter, [5, 10, 20])
          .addGrid(als.rank, [4, 10, 50])).build()

# instantiating CV estimator

cv = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)

best_model = cv.fit(movie_ratings)

In [None]:
als_model = best_model.bestModel

In [None]:
als_model.save('als_model')

In [None]:
#model2 = ALSModel.load('als_model')

#model2_df = model2.itemFactors.toPandas()

#model2_df.index = model2_df.id

#model2_uf = model2.userFactors.toPandas()

#model2_uf.index = model2_uf.id

#prediction_test = np.dot(model2_df.loc[17, 'features'], model2_uf.loc[646, 'features'])

#prediction_test