## ALS Recommender system example

### 1. Spark context and global variables and functions

In [3]:
SEED = 42

import numpy as np
np.random.seed(SEED)

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# http://www.learnbymarketing.com/644/recsys-pyspark-als/
# https://github.com/narenkmanoharan/Movie-Recommender-System/blob/master/recommender.py
# https://www.codementor.io/jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

In [4]:
# Getting the SparkContext
sc = SparkContext()

# Initializing the SQLContext
sqlContext = SQLContext(sc)

# Initializing Spark Session
spark = SparkSession.builder.appName('recommender-system').getOrCreate()

In [5]:
FORMAT_STD_OUT = lambda x, w=12: \
    '[{message: <{width}}]'.format(message=x, width=w)

DATA_DIR = 'data/masters/movies/'

### 2. Data

#### 2.1. Loading the data

In [6]:
ratings_header = 'UserID::MovieID::Rating::Timestamp'.split('::')
ratings_df = sqlContext.read.csv(
    DATA_DIR + 'ratings.dat', header=False, sep=':', inferSchema=True
)

#### 2.2. Data preprocessing

In [7]:
# Extract correct columns (avoid those created by double ':')
correct_columns = np.array(ratings_df.columns)
correct_columns = correct_columns[range(0, len(ratings_df.columns), 2)]
ratings_df = ratings_df.select(correct_columns.tolist())

# Rename columns
assert len(ratings_header) == len(correct_columns)

for i in range(len(ratings_header)):
    ratings_df = \
        ratings_df.withColumnRenamed(correct_columns[i], ratings_header[i])
ratings_df.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- MovieID: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Timestamp: integer (nullable = true)



## 3. Model

#### 3.1. Train-test split

In [8]:
# Create test and train set
train, test = ratings_df.randomSplit([0.7, 0.3], seed=SEED)

#### 3.2. Creating the model

In [9]:
# Create ALS model
als = ALS(
    userCol='UserID',
    itemCol='MovieID',
    ratingCol='Rating',
    nonnegative=True,
    coldStartStrategy='drop',
    implicitPrefs=False
)
print(FORMAT_STD_OUT('ALS'), type(als))

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.maxIter, [5]) \
            .addGrid(als.rank, [10, 50]) \
            .addGrid(als.regParam, [.01, .05, .15]) \
            .build()
print(FORMAT_STD_OUT('PARAM GRID'), len(param_grid))

# Define evaluator as RMSE and print evaluator
evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='Rating',
    predictionCol='prediction'
)
print(FORMAT_STD_OUT('EVALUATOR'), evaluator)

# Build cross validator
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5
)
print(FORMAT_STD_OUT('CV'), cv)

[ALS         ] <class 'pyspark.ml.recommendation.ALS'>
[PARAM GRID  ] 6
[EVALUATOR   ] RegressionEvaluator_d4c4235750bb
[CV          ] CrossValidator_4fc4e84e9b2c


#### 3.3. Training the model

In [10]:
# Fit cross validator to the 'train' dataset
model = cv.fit(train)

# Extract best model from the cv model above
best_model = model.bestModel
print(FORMAT_STD_OUT('BEST MODEL'), best_model)

get_reg_param = lambda model: model._java_obj.parent().getRegParam()
get_max_iter  = lambda model: model._java_obj.parent().getMaxIter()

print('rank:', best_model.rank)
print('regParam:', get_reg_param(best_model))
print('maxIter:', get_max_iter(best_model))

[BEST MODEL  ] ALS_d5b51911b281
rank: 50
regParam: 0.05
maxIter: 5


#### 3.4. Evaluation

In [11]:
# View the predictions
test_predictions = best_model.transform(test)
test_predictions.show(10)

+------+-------+------+----------+----------+
|UserID|MovieID|Rating| Timestamp|prediction|
+------+-------+------+----------+----------+
|   673|    148|     5| 975620824| 3.0055335|
|  3184|    148|     4| 968708953| 2.6920252|
|  4784|    148|     3| 970000570| 2.6175513|
|   752|    148|     4|1029309135| 1.8079419|
|  3829|    148|     2| 965940170| 1.8995509|
|   424|    148|     4|1027003224|  3.089113|
|  3053|    148|     3| 970170090| 2.3488672|
|  3328|    463|     4| 967918151| 3.3068717|
|  5306|    463|     2| 961013160| 2.7964902|
|  4510|    463|     2| 966800044| 2.1049874|
+------+-------+------+----------+----------+
only showing top 10 rows



In [12]:
# Calculate and print the RMSE of the test_predictions
rmse = evaluator.evaluate(test_predictions)
print(FORMAT_STD_OUT('RMSE'), rmse)

[RMSE        ] 0.8685174366673392
