# Movie recommendation using ALS in Spark

The goal of this project is to use Alternating Least Squares (ALS) in Spark to recommend movies.

# Set-up

## Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Dependencies

In [2]:
from math import sqrt
import pandas as pd

In [3]:
# Create a spark context and session
%cd
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!tar -xzf '/content/drive/MyDrive/Colab Notebooks/spark-3.0.1-bin-hadoop2.7.tgz'
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/root/spark-3.0.1-bin-hadoop2.7"
%cd /content

import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print(sc)
print(spark)

/root
/content
<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x79ce8ebc2aa0>


# Local model tuning
**Key observation:**
* The optimal hyperparameter configuration is found via a cross-validated grid search using a subset of the available data.

## Training/test data prep

**Key observations:**
* The dataset of movie ratings consists of four columns separated by `::`. The first column contains `userId`s, the second `movieId`s, the third the `rating` of the user for that movie and the last column is a `timestamp`.

* This is read into a an RDD initially before creation of a DataFrame.

In [4]:
!wc "/content/drive/MyDrive/Colab Notebooks/sample_movielens_ratings.txt"

 1501  1501 32363 /content/drive/MyDrive/Colab Notebooks/sample_movielens_ratings.txt


In [5]:
!head -n 5 "/content/drive/MyDrive/Colab Notebooks/sample_movielens_ratings.txt"

0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312


In [6]:
# create RDD
records = spark.read.text("/content/drive/MyDrive/Colab Notebooks/sample_movielens_ratings.txt").rdd
records.take(1)

[Row(value='0::2::3::1424380312')]

In [7]:
# split columns at '::'
elements = records.map(lambda row: row.value.split('::'))
elements.take(1)

[['0', '2', '3', '1424380312']]

In [8]:
# create DataFrame via RDD of row objects
from pyspark.sql import Row
row_RDD = elements.map(lambda row: Row(userId=int(row[0]), movieId=int(row[1]), rating=float(row[2]), timestamp=int(row[3])))
ratings_DF = spark.createDataFrame(row_RDD)
ratings_DF.createOrReplaceTempView('ratings_DF') # register temporary view enabling SQL queries

In [9]:
# train/test split
(training_DF, valid_DF) = ratings_DF.randomSplit([0.8, 0.2])
print("Training data:")
print(training_DF.show(5))
print(training_DF.count())
print("\nTest data:")
print(valid_DF.show(5))
print(valid_DF.count())

Training data:
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     12|   2.0|1424380312|
+------+-------+------+----------+
only showing top 5 rows

None
1227

Test data:
+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|     11|   1.0|1424380312|
|     0|     15|   1.0|1424380312|
|     0|     21|   1.0|1424380312|
|     0|     26|   3.0|1424380312|
|     0|     30|   1.0|1424380312|
+------+-------+------+----------+
only showing top 5 rows

None
274


## Baseline

**Key observation:**
* The mean of all ratings is the naive baseline predictor.

* Performance is estimated using RMSE.

In [10]:
# Calculate mean rating across all rows in the training set
SQL_query = 'SELECT AVG(rating) FROM ratings_DF'
row_baseline = spark.sql(SQL_query).collect()[0] # query returns dataframe with only 1 row
baseline = row_baseline['avg(rating)'] # access via dictionary
print(baseline)

1.7741505662891406


In [11]:
# Calculate squared error for each row in the validation set
se_RDD = valid_DF.rdd.map(lambda row: Row(se=pow(row['rating']-baseline, 2)))
se_DF = spark.createDataFrame(se_RDD)
se_DF.createOrReplaceTempView('se_DF') # register temporary view enabling SQL queries
se_DF.show(5)

+------------------+
|                se|
+------------------+
| 0.599309099285797|
| 0.599309099285797|
| 0.599309099285797|
|1.5027068341292347|
| 0.599309099285797|
+------------------+
only showing top 5 rows



In [12]:
# Calculate RMSE
SQL_query = 'SELECT AVG(se) FROM se_DF'
row_avg = spark.sql(SQL_query).collect()[0] # query returns dataframe with only 1 row
MSE = row_avg['avg(se)'] # access via dictionary
print(f"Baseline RMSE: {sqrt(MSE)}")

Baseline RMSE: 1.257853894594803


## Hyperparameter tuning

**Key observations:**

* An [ALS estimator](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALS.html#pyspark.ml.recommendation.ALS) is created and cross-validation is used to assess different values for 'rank' and 'regParam'.

In [13]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# ALS estimator
als = ALS(maxIter=5, rank=5, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating")

# parameter grid
# Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html#pyspark.ml.tuning.ParamGridBuilder
param_grid = ParamGridBuilder().addGrid(als.rank, [1, 3, 10]).addGrid(als.regParam, [0.01, 0.03, 0.1, 0.3, 1]).build()

# regression evaluator
# Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html#pyspark.ml.evaluation.RegressionEvaluator
reg_eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# k-fold cross validation
# Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html
k=3
kfold_cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=reg_eval, numFolds=k)
cvModel = kfold_cv.fit(training_DF)

In [14]:
# DataFrame showing grid search results
# Note: Parameter maps and metrics are local Python lists
cv_output = list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
cv_output = pd.DataFrame(cv_output, columns=['parameter combination', 'rmse'])
cv_output.head()

Unnamed: 0,parameter combination,rmse
0,"{ALS_f305fa9818c1__rank: 1, ALS_f305fa9818c1__...",1.279742
1,"{ALS_f305fa9818c1__rank: 1, ALS_f305fa9818c1__...",1.276164
2,"{ALS_f305fa9818c1__rank: 1, ALS_f305fa9818c1__...",1.267773
3,"{ALS_f305fa9818c1__rank: 1, ALS_f305fa9818c1__...",1.269761
4,"{ALS_f305fa9818c1__rank: 1, ALS_f305fa9818c1__...",1.522569


In [15]:
# Optimal configuration
print(f"Lowest RMSE: {cv_output.iloc[cv_output['rmse'].idxmin(), 1]}")
print('Congiguration: ')
display(cv_output.iloc[cv_output['rmse'].idxmin(), 0])

Lowest RMSE: 1.21292077429191
Congiguration: 


{Param(parent='ALS_f305fa9818c1', name='rank', doc='rank of the factorization'): 10,
 Param(parent='ALS_f305fa9818c1', name='regParam', doc='regularization parameter (>= 0).'): 0.3}

## Evaluation

**Key observations:**

* Performance of the optimal model (rank: 10, regParam: 0.3) is assessed with the validation set.

* It performs better than the baseline model (baseline RMSE: 1.17)

In [16]:
valid_pred = cvModel.transform(valid_DF)
rmse = reg_eval.evaluate(valid_pred)
print(f"RMSE: {rmse}")

RMSE: 1.1850722905696298


# Cloud training
**Key observation:**
* The final model will be trained using all available data. This will be achieved using the Google Cloud platform.

## Data
**Key observation:**
* The data is available in csv form. Consequently, the data prep process will need to be adjusted.

In [106]:
!wc "/content/drive/MyDrive/Colab Notebooks/ratings.csv"

 100837  100837 2483723 /content/drive/MyDrive/Colab Notebooks/ratings.csv


In [105]:
!head -n 5 "/content/drive/MyDrive/Colab Notebooks/ratings.csv"

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815


## Training script

In [None]:
%%writefile als_model.py
import sys
from math import sqrt
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

INPUT_PATH = sys.argv[1] # parse file path from command line

if __name__ == '__main__':

    # Create SparkContext and SparkSession
    sc = SparkContext(appName="spark-submit demo")
    spark = SparkSession.builder.getOrCreate()

    # 1. Training/test data prep
    rows = spark.read.csv(INPUT_PATH, header=True).rdd
    row_RDD = rows.map(lambda row: Row(userId=int(row[0]), movieId=int(row[1]), rating=float(row[2]), timestamp=int(row[3])))
    ratings_DF = spark.createDataFrame(row_RDD)
    ratings_DF.createOrReplaceTempView('ratings_DF') # register temporary view enabling SQL queries
    (training_DF, test_DF) = ratings_DF.randomSplit([0.8, 0.2])

    # 2. Baseline
    # Calculate mean rating across all rows in the training set
    SQL_query = 'SELECT AVG(rating) FROM ratings_DF'
    row_baseline = spark.sql(SQL_query).first() # query returns dataframe with only 1 row
    baseline = row_baseline['avg(rating)'] # access via dictionary

    # Calculate squared error for each row in the validation set
    se_RDD = valid_DF.rdd.map(lambda row: Row(se=pow(row['rating']-baseline, 2)))
    se_DF = spark.createDataFrame(se_RDD)
    se_DF.createOrReplaceTempView('se_DF') # register temporary view enabling SQL queries

    # Calculate RMSE
    SQL_query = 'SELECT AVG(se) FROM se_DF'
    row_avg = spark.sql(SQL_query).first() # query returns dataframe with only 1 row
    MSE = row_avg['avg(se)'] # access via dictionary
    print(f"Baseline RMSE: {sqrt(MSE)}")

    # 3. Hyperparameter tuning
    # Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.ParamGridBuilder.html#pyspark.ml.tuning.ParamGridBuilder
    # Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.RegressionEvaluator.html#pyspark.ml.evaluation.RegressionEvaluator
    # Source: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html

    als = ALS(maxIter=5, rank=5, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating") # ALS estimator
    param_grid = ParamGridBuilder().addGrid(als.rank, [1, 3, 10]).addGrid(als.regParam, [0.01, 0.03, 0.1, 0.3, 1]).build() # parameter grid
    reg_eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") # regression evaluator
    k=3
    kfold_cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=reg_eval, numFolds=k) # k-fold cross validation
    cvModel = kfold_cv.fit(training_DF)

    # 4. Evaluation
    param_map = zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)
    param_min = min(paramMap, key=lambda x: x[1])
    print(f"Optimal configuration: {param_min}")
    test_pred = cvModel.transform(test_DF)
    rmse = regEval.evaluate(test_pred)
    print(f"RMSE: {rmse}")

    # Shutdown Spark context
    sc.stop()

## Google Cloud

### Authentication

In [None]:
import sys
if 'google.colab' in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

### Project configuration

In [None]:
PROJECT = 'bd-lab-6-test' # Project ID
!gcloud config set project $PROJECT
REGION = 'us-central1'
!gcloud config set compute/region $REGION
!gcloud config set dataproc/region $REGION
!echo "gcloud config list"
!gcloud config list # show some information

### Data upload

In [None]:
# Create storage bucket
BUCKET = 'gs://{}-storage'.format(PROJECT)
!gsutil mb $BUCKET
print(BUCKET)

In [None]:
# Copy data to storage bucket
FILEPATH = "/content/drive/MyDrive/Colab Notebooks/ratings.csv"
!gsutil -m cp $FILEPATH $BUCKET
BUCKET_PATH = BUCKET+'/ratings.csv'
print(BUCKET_PATH)

### Cluster creation
Note:
* Creating the cluster can take a while. You can stop the execution of this cell and the process will still continue in the cloud. Have a look at the Cloud console to see your cluster being provisioned.

In [None]:
CLUSTER = '{}-cluster'.format(PROJECT)
!gcloud dataproc clusters create $CLUSTER \
    --image-version 1.4-ubuntu18 --single-node \
    --master-machine-type n1-standard-2 \
    --master-boot-disk-type pd-ssd --master-boot-disk-size 100 \
    --max-idle 3600s

### Job submission

In [None]:
!gcloud dataproc jobs submit pyspark --cluster $CLUSTER ./als_model.py -- $BUCKET_PATH