-sandbox
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

-sandbox
<img src="https://brookewenig.github.io/img/DL/movie-camera.png" style="float:right; height: 200px; margin: 10px; border: 1px solid #ddd; border-radius: 15px 15px 15px 15px; padding: 10px"/>

# Movie Recommendations

In the previous labs, we didn't need to do any data preprocessing. In this lab, we will use our preprocessing steps from Spark as input to Horovod. 

Here, we will use 1 million movie ratings from the [MovieLens stable benchmark rating dataset](http://grouplens.org/datasets/movielens/). We will start by building a benchmark model with ALS, and then see if we can beat that benchmark with a neural network!

Let's start by mounting our dataset if it is not already there.

In [3]:
%run "./Includes/Classroom Setup"

In [4]:
moviesDF = spark.read.parquet("dbfs:/mnt/training/movielens/movies.parquet/")
ratingsDF = spark.read.parquet("dbfs:/mnt/training/movielens/ratings.parquet/")

ratingsDF.cache()
moviesDF.cache()

ratingsCount = ratingsDF.count()
moviesCount = moviesDF.count()

print('There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount))

Let's take a quick look at some of the data in the two DataFrames.

In [6]:
display(moviesDF)

ID,title
1,Toy Story (1995)
2,Jumanji (1995)
3,Grumpier Old Men (1995)
4,Waiting to Exhale (1995)
5,Father of the Bride Part II (1995)
6,Heat (1995)
7,Sabrina (1995)
8,Tom and Huck (1995)
9,Sudden Death (1995)
10,GoldenEye (1995)


In [7]:
display(ratingsDF)

userId,movieId,rating
19,1,5.0
19,3,4.0
19,6,5.0
19,7,5.0
19,14,4.0
19,17,4.0
19,25,3.0
19,32,2.0
19,55,3.0
19,61,4.0


## **Part 2: Collaborative Filtering**

Let's start by splitting our data into a training and test set.

In [9]:
seed=42
(trainingDF, testDF) = ratingsDF.randomSplit([0.8, 0.2], seed=seed)

print('Training: {0}, test: {1}'.format(trainingDF.count(), testDF.count()))

## ALS

![factorization](http://spark-mooc.github.io/web-assets/images/matrix_factorization.png)

In [11]:
from pyspark.ml.recommendation import ALS

als = (ALS()
       .setUserCol("userId")
       .setItemCol("movieId")
       .setRatingCol("rating")
       .setPredictionCol("prediction")
       .setMaxIter(2)
       .setSeed(seed)
       .setRegParam(0.1)
       .setColdStartStrategy("drop")
       .setRank(12))

alsModel = als.fit(trainingDF)

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

regEval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mse")

predictedTestDF = alsModel.transform(testDF)

testMse = regEval.evaluate(predictedTestDF)

print('The model had a MSE on the test set of {0}'.format(testMse))

## Deep Learning

In [14]:
userFactors = alsModel.userFactors.selectExpr("id as userId", "features as uFeatures")
itemFactors = alsModel.itemFactors.selectExpr("id as movieId", "features as iFeatures")
joinedTrainDF = trainingDF.join(itemFactors, on="movieId").join(userFactors, on="userId")
joinedTestDF = testDF.join(itemFactors, on="movieId").join(userFactors, on="userId")

In [15]:
display(joinedTrainDF)

In [16]:
from itertools import chain
from pyspark.sql.functions import *
from pyspark.sql.types import *

def concat_arrays(*args):
    return list(chain(*args))
    
concat_arrays_udf = udf(concat_arrays, ArrayType(FloatType()))

concatTrainDF = (joinedTrainDF
                 .select('userId', 'movieId', concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"),
                         col('rating').cast("float")))
concatTestDF = (joinedTestDF
                .select('userId', 'movieId', concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), 
                        col('rating').cast("float")))

In [17]:
display(concatTrainDF.limit(10))

## Model_fn

Unfortunately, Databricks' HorovodEstimator does not have a Keras API yet and our engineering team does not recommend the [Keras model to TF Estimator](https://www.tensorflow.org/api_docs/python/tf/keras/estimator/model_to_estimator) because it has not been fully tested with our HorovodEstimator. Instead, you will need to write a Tensorflow model function directly.

But do not fear! All of the concepts you learned about Keras are directly applicable here (Keras is just a high level wrapper with a Tensorflow backend). You now get to experience all of the joy (and agony) of the very low-level Tensorflow APIs Google designed.

Our tf.estimator-style `model_fn` ([see TensorFlow docs](https://www.tensorflow.org/api_docs/python/tf/estimator/Estimator)) works by:
1. Defining the model's network structure, then
2. Specifying the model's output on a single batch of data during training, eval, and prediction (inference) phases.

**Note**: If you have a single-machine `model_fn`, you can prepare it for distributed training with a one-line code change. Simply wrap your optimizer in a `HorovodDistributedOptimizer`, as in the example below.

## Option 1: Make your Estimator

In [20]:
import tensorflow as tf
import horovod.tensorflow as hvd

tf.set_random_seed(seed=40)

def model_fn(features, labels, mode, params):
    features_with_shape = tf.reshape(features["features"], [-1, 24]) # Explicitly specify dimensions
    
    hidden_layer1 = tf.layers.dense(inputs=features_with_shape, units=params["hidden_layer1"], activation=tf.nn.relu)
    hidden_layer2 = tf.layers.dense(inputs=hidden_layer1, units=params["hidden_layer2"], activation=tf.nn.relu)
    predictions = tf.squeeze(tf.layers.dense(inputs=hidden_layer2, units=1, activation=None), axis=-1)
    
    # If the estimator is running in PREDICT mode, we can stop building our model graph here and simply return
    # our model's inference outputs
    serving_key = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
    export_outputs = {serving_key: tf.estimator.export.PredictOutput({"predictions": predictions})}
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions, export_outputs=export_outputs)
      
    # Calculate Loss (for both TRAIN and EVAL modes)
    loss = tf.losses.mean_squared_error(labels, predictions)
    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"] * hvd.size())
        optimizer = hvd.DistributedOptimizer(optimizer)
        
        train_op = optimizer.minimize(loss=loss, global_step=tf.train.get_global_step())
        return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op,
                                          export_outputs=export_outputs)
    # If running in EVAL mode, add model evaluation metrics (accuracy) to our EstimatorSpec so that
    # they're logged when model evaluation runs
    eval_metric_ops = {"rmse": tf.metrics.root_mean_squared_error(labels=labels, predictions=predictions)}
    return tf.estimator.EstimatorSpec(
        mode=mode, loss=loss, eval_metric_ops=eval_metric_ops, export_outputs=export_outputs)

## Option 2: Premade estimators

You can extract the model_fn created by the premade estimator.

In [22]:
import tensorflow as tf
import horovod.tensorflow as hvd

tf.set_random_seed(seed=40)

def model_fn(features, labels, mode, params, config):
    feat_cols = [tf.feature_column.numeric_column(key="features", shape=(24,))]
    regressor = tf.estimator.DNNRegressor(
      hidden_units=[params["hidden_layer1"], params["hidden_layer2"]],
      feature_columns=feat_cols,
      optimizer=hvd.DistributedOptimizer(tf.train.AdamOptimizer(params["learning_rate"] * hvd.size())))
    estimator_spec = regressor.model_fn(features, labels, mode, config)
    export_outputs = estimator_spec.export_outputs
    if export_outputs is not None:
      export_outputs[tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY] = export_outputs["predict"]
    return tf.estimator.EstimatorSpec(mode=mode, loss=estimator_spec.loss, train_op=estimator_spec.train_op,
                                      export_outputs=export_outputs, training_hooks=estimator_spec.training_hooks, predictions=estimator_spec.predictions)

Create model directory

In [24]:
import time

trainValDF = concatTrainDF.withColumn("isVal", when(rand() > 0.8, True).otherwise(False))

model_dir = "/tmp/horovodDemo/" + str(int(time.time()))
print(model_dir)

## Launch model training

In [26]:
from sparkdl.estimators.horovod_estimator.estimator import HorovodEstimator

est = HorovodEstimator(modelFn=model_fn,
                       featureMapping={"features":"features"},
                       modelDir=model_dir,
                       labelCol="rating",
                       batchSize=128,
                       maxSteps=20000,
                       isValidationCol="isVal",  
                       modelFnParams={"hidden_layer1": 30, "hidden_layer2": 20, "learning_rate": 0.0001},
                       saveCheckpointsSecs=30)
transformer = est.fit(trainValDF)

In [27]:
predDF = transformer.transform(concatTestDF)
display(predDF.select("userId", "movieId", "predictions", "rating"))

In [28]:
from pyspark.sql.types import FloatType
def _pred(v):
  return float(v[0])

pred = udf(_pred, FloatType())
predDF = predDF.withColumn("prediction", pred(predDF.predictions))

In [29]:
testMse = regEval.evaluate(predDF)

print('The model had a MSE on the test set of {0}'.format(testMse))

-sandbox
&copy; 2018 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>