# Notebook for Collaborative Filtering with both ALS and NCF models for 1M rows

In this notebook, we implement ALS and NCF models for Movie Recommendation System for 1M movie ratings. The 1M reviews dataset contains 1 million movie ratings made by 4,000 users on 6,000 movies.

In [1]:
# Intialization
import os
import time
import datetime as dt

import warnings
warnings.filterwarnings("ignore", message="numpy.dtype size changed")

# spark sql imports
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import *

# spark ml imports
from pyspark.ml.recommendation import ALS
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# spark bigdl, analytics zoo imports
from zoo.models.recommendation import UserItemFeature
from zoo.models.recommendation import NeuralCF
from zoo.common.nncontext import init_nncontext
from bigdl.nn.criterion import *
from bigdl.optim.optimizer import *
from bigdl.dataset import movielens
from bigdl.util.common import *

# data science imports
import math
import numpy as np
import pandas as pd
from sklearn import metrics
from operator import itemgetter

In [2]:
data_path = 'hdfs:///user/andrew/'

In [3]:
sc = init_nncontext("NCF Example")

## Data Preparation

In [4]:
sqlContext = SQLContext(sc)
# Row formats for imported data
Rating = Row("userId", "movieId", "rating") # "timespamp" ignored
Item = Row("movieId", "title" ,"genres")

# Load data
ratings = sc.textFile(data_path + "ratings.dat") \
    .map(lambda line:line.split("::")[0:3]) \
    .map(lambda line: (int(line[0]), int(line[1]), float(line[2]))) \
    .map(lambda r: Rating(*r))
ratings = sqlContext.createDataFrame(ratings)

movies = sc.textFile(data_path + "movies.dat") \
    .map(lambda line: line.split("::")[0:2]) \
    .map(lambda line: (int(line[0]), line[1])) \
    .map(lambda r: Item(*r))
movies = sqlContext.createDataFrame(movies)

In [5]:
# Create training and validation sets
ratings_train, ratings_val = ratings.randomSplit([0.8, 0.2], seed = 42)
print(ratings_train.count())
ratings_train.take(3)

800228


[Row(userId=1, movieId=1, rating=5.0),
 Row(userId=1, movieId=527, rating=5.0),
 Row(userId=1, movieId=531, rating=4.0)]

In [6]:
# Prepare the RDDs of Sample for the NCF model
# train and val will be used now, and full will be used later
def build_sample(user_id, item_id, rating):
    sample = Sample.from_ndarray(np.array([user_id, item_id]), np.array([rating]))
    return UserItemFeature(user_id, item_id, sample)

fullPairFeatureRdds = ratings.rdd.map(lambda x: build_sample(x[0], x[1], x[2]))
trainPairFeatureRdds = ratings_train.rdd.map(lambda x: build_sample(x[0], x[1], x[2]))
valPairFeatureRdds = ratings_val.rdd.map(lambda x: build_sample(x[0], x[1], x[2]))

full_rdd = fullPairFeatureRdds.map(lambda pair_feature: pair_feature.sample)
train_rdd = trainPairFeatureRdds.map(lambda pair_feature: pair_feature.sample)
val_rdd = valPairFeatureRdds.map(lambda pair_feature: pair_feature.sample)

In [7]:
print(train_rdd.count())
train_rdd.take(3)

800228


[Sample: features: [JTensor: storage: [1. 1.], shape: [2], float], labels: [JTensor: storage: [5.], shape: [1], float],
 Sample: features: [JTensor: storage: [  1. 527.], shape: [2], float], labels: [JTensor: storage: [5.], shape: [1], float],
 Sample: features: [JTensor: storage: [  1. 531.], shape: [2], float], labels: [JTensor: storage: [4.], shape: [1], float]]

## ALS and NCF Model Training and Validation on Training Data

### Train and evaluate the ALS model with training data

In [8]:
%%time
# Create the ALS models and set parameters
als = ALS(seed = 42, regParam = 0.1, maxIter = 15, rank = 12, 
          userCol = "userId", itemCol = "movieId", ratingCol = "rating")
# Using MAE for the scoring metric
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating",
                                predictionCol="prediction")
# Train and evaluate the models - if training error is more than slightly less than validation error, the model has been overfit
als_model = als.fit(ratings_train)
print 'Training Error (MAE):', evaluator.evaluate(als_model.transform(ratings_train))
print 'Validation Error (MAE):', evaluator.evaluate(als_model.transform(ratings_val).fillna(0))

Training Error (MAE): 0.646367688753
Validation Error (MAE): 0.688705264902
CPU times: user 59.9 ms, sys: 7.73 ms, total: 67.7 ms
Wall time: 31 s


### Train and evaluate the NCF model with training data

In [9]:
%%time
# Set parameters for the NCF model.
# Batch size should be a multiple of the total number of cores in the Spark environment
# max_user_id and max_movie_id are used for matching ids to embedding values
batch_size = 2560
max_user_id = ratings.agg({'userId': 'max'}).collect()[0]['max(userId)']
max_movie_id = ratings.agg({'movieId': 'max'}).collect()[0]['max(movieId)']
# Set other parameters and initialize the model
ncf = NeuralCF(user_count=max_user_id, item_count=max_movie_id, class_num=5, hidden_layers=[20, 10], include_mf = False)

# Define the model optimizer
optimizer = Optimizer(
    model=ncf,
    training_rdd=train_rdd,
    criterion=ClassNLLCriterion(),
    end_trigger=MaxEpoch(10),
    batch_size=batch_size, # 16 executors, 16 cores each
    optim_method=Adam(learningrate=0.001))

# Set the validation method for the optimizer
optimizer.set_validation(
    batch_size=batch_size, # 16 executors, 16 cores each
    val_rdd=val_rdd,
    trigger=EveryEpoch(),
    val_method=[MAE(), Loss(ClassNLLCriterion())]
)

# Train the model
optimizer.optimize()

creating: createZooNeuralCF
creating: createClassNLLCriterion
creating: createMaxEpoch
creating: createAdam
creating: createDistriOptimizer
creating: createEveryEpoch
creating: createMAE
creating: createClassNLLCriterion
creating: createLoss
CPU times: user 25.6 ms, sys: 7.97 ms, total: 33.6 ms
Wall time: 2min 26s


In [10]:
%%time
# Evaluate the model using MAE as the scoring metric
train_res = ncf.evaluate(train_rdd, batch_size, [MAE()])
val_res = ncf.evaluate(val_rdd, batch_size, [MAE()])
# If training error is more than slightly less than validation error, the model has been overfit
print 'Training Error (MAE):', train_res[0]
print 'Validation Error (MAE):', val_res[0]

creating: createMAE
creating: createMAE
Training Error (MAE): Evaluated result: 0.658410191536, total_num: 626, method: MAE
Validation Error (MAE): Evaluated result: 0.684635400772, total_num: 157, method: MAE
CPU times: user 10.6 ms, sys: 1.72 ms, total: 12.3 ms
Wall time: 32.6 s


## ALS and NCF Model Training and Validation on the entire dataset

In [11]:
# Create a sparse matrix of all userIds and movieIds.
# userIds are the rows and movieIds are the columns. Any position without an explicit rating is given a value of 0.
ratings_df = ratings.toPandas()
ratings_matrix = ratings_df.pivot(index='userId',columns='movieId',values='rating').fillna(0)

In [12]:
# Transform the userId x movieId back into three column format. (Will be much larger now)
ratings_matrix['userId'] = ratings_matrix.index
ratings_df_2 = pd.melt(ratings_matrix, id_vars = ['userId'], value_vars = list(ratings_matrix.columns).remove('userId'))
ratings_df_2.columns = ['userId', 'movieId', 'rating']
ratings_df_2.shape

(22384240, 3)

In [13]:
# remove the userId, movieId pairs that already have ratings
ratings_blanks_df = ratings_df_2.iloc[np.where(ratings_df_2.rating == 0)]
ratings_blanks_df.shape

(21384031, 3)

In [14]:
%%time
# Conver to spark dataframe
# This will be used by the ALS model for recommendations
ratings_blanks = sqlContext.createDataFrame(ratings_blanks_df)
del ratings_df, ratings_matrix, ratings_df_2, ratings_blanks_df

CPU times: user 10min 47s, sys: 7.44 s, total: 10min 55s
Wall time: 10min 53s


In [15]:
# Build the RDDs of Sample for the unrated userId, movieId pairs
# This will be used by the NCF model for recommendations
blankPairFeatureRdds = ratings_blanks.rdd.map(lambda x: build_sample(x[0], x[1], x[2]))

In [16]:
%%time
# Set parameters and train the ALS on the full rated dataset
als = ALS(seed = 42, regParam = 0.1, maxIter = 15, rank = 12,
          userCol = "userId", itemCol = "movieId", ratingCol = "rating")
evaluator = RegressionEvaluator(metricName="mae", labelCol="rating",
                                predictionCol="prediction")
als_model = als.fit(ratings)
# Evaluate the model on the training set
print 'Model Error (MAE):', evaluator.evaluate(als_model.transform(ratings))

Model Error (MAE): 0.655533489375
CPU times: user 28.6 ms, sys: 10 ms, total: 38.6 ms
Wall time: 17 s


In [17]:
%%time
# Set parameters and train the NCF on the full rated dataset, keep the previously defined batch_size
max_user_id = ratings.agg({'userId': 'max'}).collect()[0]['max(userId)']
max_movie_id = ratings.agg({'movieId': 'max'}).collect()[0]['max(movieId)']
ncf = NeuralCF(user_count=max_user_id, item_count=max_movie_id, class_num=5, hidden_layers=[20, 10], include_mf = False)
# Define the optimizer
optimizer = Optimizer(
    model=ncf,
    training_rdd=full_rdd,
    criterion=ClassNLLCriterion(),
    end_trigger=MaxEpoch(10),
    batch_size=batch_size, # 16 executors, 16 cores each
    optim_method=Adam(learningrate=0.001))
# Train the model
optimizer.optimize()
# Evaluate the model on the training set
full_res = ncf.evaluate(full_rdd, batch_size, [MAE()])
print 'Model Error (MAE):', full_res[0]

creating: createZooNeuralCF
creating: createClassNLLCriterion
creating: createMaxEpoch
creating: createAdam
creating: createDistriOptimizer
creating: createMAE
Model Error (MAE): Evaluated result: 0.655768036842, total_num: 783, method: MAE
CPU times: user 28.3 ms, sys: 4.82 ms, total: 33.1 ms
Wall time: 2min 50s


### Predictions Comparison

Compare the prediction between ALS and NCF for one specific user. The user id is specified in the final two cells

In [18]:
%%time
# Create recommendations for all users. 
# The NCF model allow the number of recommendations to be limited to a top set, in this case the top 10 recommendations.
als_pair_preds = als_model.transform(ratings_blanks)
ncf_pair_preds = ncf.recommend_for_user(blankPairFeatureRdds, 10).toDF()

CPU times: user 27 ms, sys: 4.11 ms, total: 31.1 ms
Wall time: 1min 32s


In [19]:
als_pair_preds.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   243|    148|   0.0| 2.6787767|
|   392|    148|   0.0|  2.493863|
|   540|    148|   0.0| 2.5721192|
|   623|    148|   0.0| 3.0742738|
|   737|    148|   0.0| 2.8385499|
+------+-------+------+----------+
only showing top 5 rows



In [20]:
ncf_pair_preds.show(5)

+----------+-------+----------+------------------+-------+
|bigdl_type|item_id|prediction|       probability|user_id|
+----------+-------+----------+------------------+-------+
|     float|   2905|         5|0.7960795249610225|   3840|
|     float|    318|         5|0.7615469311867679|   3840|
|     float|   1262|         5|0.7510181537714825|   3840|
|     float|     53|         5|0.7488545115398759|   3840|
|     float|    745|         5|0.7425894443213458|   3840|
+----------+-------+----------+------------------+-------+
only showing top 5 rows



In [21]:
# Select the userId, movieId, and prediction columns
# The predictions are the predicted rating for the userId, movieId pair
als_preds = als_pair_preds.select('userId', 'movieId', 'prediction').toDF('userId', 'movieId', 'als_pred')
ncf_preds_topN = ncf_pair_preds.select('user_id', 'item_id', 'prediction').toDF('userId', 'movieId', 'ncf_pred')
del als_pair_preds, ncf_pair_preds

In [22]:
# Filtering the ALS recommendations to the top 10
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number

# Create a window for each userId, with sorted predictions
window = Window.partitionBy(als_preds['userId']).orderBy(als_preds['als_pred'].desc())
# For each userId, keep the top 10 rows
als_preds_topN = als_preds.select(col('*'), row_number().over(window).alias('row_number')).where(col('row_number') <= 10)

In [23]:
# Combine the recommendations with movie information, this will make the recommendations more understandable
als_preds_topN_labeled = als_preds_topN.join(movies, how = 'left', on = 'movieId')
ncf_preds_topN_labeled = ncf_preds_topN.join(movies, how = 'left', on = 'movieId')
del window, als_preds, als_preds_topN, ncf_preds_topN

In [24]:
# Keep only essential columns
als_final = als_preds_topN_labeled.select('userId', 'movieId', 'als_pred', 'title').sort(col("userId"))
ncf_final = ncf_preds_topN_labeled.select('userId', 'movieId', 'ncf_pred', 'title').sort(col("userId"))

In [25]:
als_final.show(10)

+------+-------+---------+--------------------+
|userId|movieId| als_pred|               title|
+------+-------+---------+--------------------+
|     1|    318|4.5874867|Shawshank Redempt...|
|     1|   3233|  4.73503|Smashing Time (1967)|
|     1|    953|4.5261497|It's a Wonderful ...|
|     1|   2129| 4.416759|Saltmen of Tibet,...|
|     1|    593|4.4463463|Silence of the La...|
|     1|    904| 4.435828|  Rear Window (1954)|
|     1|    858|4.4980555|Godfather, The (1...|
|     1|    989| 4.475181|Schlafes Bruder (...|
|     1|   3517|4.5038576|   Bells, The (1926)|
|     1|    572|5.5205665|Foreign Student (...|
+------+-------+---------+--------------------+
only showing top 10 rows



In [26]:
ncf_final.show(10)

+------+-------+--------+--------------------+
|userId|movieId|ncf_pred|               title|
+------+-------+--------+--------------------+
|     1|   2905|       5|      Sanjuro (1962)|
|     1|   1178|       5|Paths of Glory (1...|
|     1|    318|       5|Shawshank Redempt...|
|     1|   3233|       5|Smashing Time (1967)|
|     1|   3338|       5|For All Mankind (...|
|     1|   3382|       5|Song of Freedom (...|
|     1|    668|       5|Pather Panchali (...|
|     1|     53|       5|     Lamerica (1994)|
|     1|     50|       5|Usual Suspects, T...|
|     1|    787|       5|Gate of Heavenly ...|
+------+-------+--------+--------------------+
only showing top 10 rows



In [27]:
# Convert to pandas to specific userIds can be viewed easily
als_final_df = als_final.toPandas()
ncf_final_df = ncf_final.toPandas()

In [28]:
als_final_df.iloc[np.where(als_final_df.userId == 5000)]

Unnamed: 0,userId,movieId,als_pred,title
49990,5000,2129,5.350278,"Saltmen of Tibet, The (1997)"
49991,5000,1234,4.95955,"Sting, The (1973)"
49992,5000,989,5.078969,Schlafes Bruder (Brother of Sleep) (1995)
49993,5000,811,5.015543,"Bewegte Mann, Der (1994)"
49994,5000,1250,5.008807,"Bridge on the River Kwai, The (1957)"
49995,5000,904,5.004963,Rear Window (1954)
49996,5000,3233,5.18007,Smashing Time (1967)
49997,5000,1207,4.975183,To Kill a Mockingbird (1962)
49998,5000,3172,5.149885,Ulysses (Ulisse) (1954)
49999,5000,572,6.142802,Foreign Student (1994)


In [29]:
ncf_final_df.iloc[np.where(ncf_final_df.userId == 5000)]

Unnamed: 0,userId,movieId,ncf_pred,title
49990,5000,3382,5,Song of Freedom (1936)
49991,5000,50,5,"Usual Suspects, The (1995)"
49992,5000,1002,5,Ed's Next Move (1996)
49993,5000,3233,5,Smashing Time (1967)
49994,5000,2503,5,"Apple, The (Sib) (1998)"
49995,5000,745,5,"Close Shave, A (1995)"
49996,5000,787,5,"Gate of Heavenly Peace, The (1995)"
49997,5000,1423,5,Hearts and Minds (1996)
49998,5000,2905,5,Sanjuro (1962)
49999,5000,53,5,Lamerica (1994)


In [30]:
# sc.stop()