d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 1200px">
</div>

-sandbox
<img src="https://files.training.databricks.com/images/movie-camera.png" style="float:right; height: 200px; margin: 10px; border: 1px solid #ddd; border-radius: 15px 15px 15px 15px; padding: 10px"/>

# Movie Recommendations

In the previous labs, we didn't need to do any data preprocessing. In this lab, we will use our preprocessing steps from Spark as input to Horovod. 

Here, we will use 1 million movie ratings from the [MovieLens stable benchmark rating dataset](http://grouplens.org/datasets/movielens/). We will start by building a benchmark model with ALS, and then see if we can beat that benchmark with a neural network!

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) In this lesson you:<br>
- Combine User + Item factors identified from ALS and use as input to a neural network
- Create custom activation function (scaled sigmoid) to bound output of regression tasks
- Train distributed neural network using Horovod

In [3]:
%run "./Includes/Classroom-Setup"

In [4]:
moviesDF = spark.read.parquet("dbfs:/mnt/training/movielens/movies.parquet/")
ratingsDF = spark.read.parquet("dbfs:/mnt/training/movielens/ratings.parquet/")

ratingsDF.cache()
moviesDF.cache()

ratingsCount = ratingsDF.count()
moviesCount = moviesDF.count()

print(f"There are {ratingsCount} ratings and {moviesCount} movies in the datasets")

Let's take a quick look at some of the data in the two DataFrames.

In [6]:
display(moviesDF)

In [7]:
display(ratingsDF)

What range of values do the ratings take?

In [9]:
display(ratingsDF.select("rating").describe())

Let's start by splitting our data into a training and test set.

In [11]:
seed=42
(trainingDF, testDF) = ratingsDF.randomSplit([0.8, 0.2], seed=seed)

print(f"Training: {trainingDF.count()}, test: {testDF.count()}")

## Alternating Least Squares Method (ALS)

ALS is a parallel algorithm for matrix factorization. Taking the (often low rank) matrix of every user's rating of every movie, ALS will try to find 2 lower dimensional matrices whose product will approximate the original matrix. One matrix represents users and their latent factors while the other contains movies and their latent factors. 

Since our goal is to be able to predict every user's rating of every movie, knowing these 2 lower dimensional matrices will give us what we need to reconstruct this information.

![factorization](https://files.training.databricks.com/images/matrix_factorization.png)

Let's build and train our baseline ALS model.

In [14]:
from pyspark.ml.recommendation import ALS

als = (ALS()
       .setUserCol("userId")
       .setItemCol("movieId")
       .setRatingCol("rating")
       .setPredictionCol("prediction")
       .setMaxIter(3)
       .setSeed(seed)
       .setRegParam(0.1)
       .setColdStartStrategy("drop")
       .setRank(12)
       .setNonnegative(True))

alsModel = als.fit(trainingDF)

Evaluate the model on the test data by looking at the mean squared error of our predictions on test data.

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

regEval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mse")

predictedTestDF = alsModel.transform(testDF)

testMse = regEval.evaluate(predictedTestDF)

print(f"The model had a MSE on the test set of {testMse}")

## Deep Learning

Now let's take a deep learning approach to predicting the rating values.

Let's take the latent factors learned from ALS and include them as features! The following cell extracts the user and item (movie) features from the trained ALS model and joins them in our train and test DataFrames.

In [18]:
userFactors = alsModel.userFactors.selectExpr("id as userId", "features as uFeatures")
itemFactors = alsModel.itemFactors.selectExpr("id as movieId", "features as iFeatures")
joinedTrainDF = trainingDF.join(itemFactors, on="movieId").join(userFactors, on="userId")
joinedTestDF = testDF.join(itemFactors, on="movieId").join(userFactors, on="userId")

In [19]:
display(joinedTrainDF)

We would like to use the `iFeatures` and `uFeatures` columns as input for our deep learning model. However, we need all our features to be in one column of our DataFrame.

The code below creates two new DataFrames, `concatTrainDF` and `concatTestDF`, with the following three columns: `userId`, `movieId`, and `features` which contains the concatenated `iFeatures` and `uFeatures` arrays.

In [21]:
from itertools import chain
from pyspark.sql.functions import col
from pyspark.sql.types import ArrayType, FloatType

def concat_arrays(*args):
    return list(chain(*args))
    
concat_arrays_udf = udf(concat_arrays, ArrayType(FloatType()))

concatTrainDF = joinedTrainDF.select("userId", "movieId", concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), "rating")

concatTestDF = joinedTestDF.select("userId", "movieId", concat_arrays_udf(col("iFeatures"), col("uFeatures")).alias("features"), "rating")

Check that your DataFrame has the correct columms.

In [23]:
display(concatTrainDF.limit(10))

### Petastorm

Prepare data for Petastorm.

In [25]:
from petastorm.spark import SparkDatasetConverter, make_spark_converter

# Clean up the directories if they exist
dbutils.fs.rm(working_dir, recurse=True)

# Define the paths
train_path = f"file://{working_dir}/ALS_train"
test_path =  f"file://{working_dir}/ALS_test"

# Create the required directories
dbutils.fs.mkdirs(train_path)
dbutils.fs.mkdirs(test_path)

# Convert the Spark DF to TensorFlow datasets
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, train_path)
converter_train = make_spark_converter(concatTrainDF.repartition(8).selectExpr("features", "rating as label"))

# Write test out to Parquet
concatTestDF.repartition(8).selectExpr("features", "rating as label").write.mode("overwrite").parquet(test_path)

### Setup Model and HorovodRunner

We'll create two models, a baseline model that uses a linear activation function at the end, and one with a scaled [sigmoid](https://www.tensorflow.org/api_docs/python/tf/keras/backend/sigmoid) function whose outputs are bounded 0.5 to 5 (range of review scores).

In [27]:
import tensorflow as tf
tf.random.set_seed(42)
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import numpy as np
from tensorflow.keras import backend as K
from petastorm import TransformSpec

def build_model():
  return Sequential([Dense(30, input_shape=(24,), activation="relu"),
                     Dense(20, activation="relu"),
                     Dense(1, activation="linear")])

def sigmoid_activation(x): # Scores range from 0.5 to 5
  return (K.sigmoid(x) * 4.5) + .5

def build_model_sigmoid():
  return Sequential([Dense(30, input_shape=(24,), activation="relu"),
                     Dense(20, activation="relu"),
                     Dense(1, activation=sigmoid_activation)])

### Tensorboard
We will set up Tensorboard so we can visualize and debug our network. Take a look at the [dbutils.tensorboard.start](https://docs.microsoft.com/en-us/azure/databricks/applications/deep-learning/single-node-training/tensorflow#--tensorboard) function. 

NOTE: It will not start logging until we start training our model.

In [29]:
dbutils.fs.rm(f"{working_dir}/tensorboard", True)
log_dir = f"{working_dir}/tensorboard"
dbutils.tensorboard.start(log_dir)

### HorvodRunner with Petastorm

In [31]:
from sparkdl import HorovodRunner
from tensorflow.keras import optimizers
from tensorflow.keras.callbacks import *
import horovod.tensorflow.keras as hvd

BATCH_SIZE = 64
NUM_EPOCHS = 1

def run_training_horovod_petastorm(model_type="default"):
  # Horovod: initialize Horovod.
  hvd.init()
  print(f"Rank is: {hvd.rank()}")
  print(f"Size is: {hvd.size()}")
  with converter_train.make_tf_dataset(batch_size=BATCH_SIZE, num_epochs=None, cur_shard=hvd.rank(), shard_count=hvd.size()) as train_dataset:
    dataset = train_dataset.map(lambda x: (x.features, x.label))  
    if model_type == "default":
      model = build_model()
    else:
      model = build_model_sigmoid()
    
    # Number of steps required to go through one epoch
    steps_per_epoch = len(converter_train) // (BATCH_SIZE*hvd.size())
    
    # Horovod: adjust learning rate based on number of CPUs/GPUs.
    optimizer = optimizers.Adam(lr=0.001*hvd.size())

    # Horovod: add Horovod Distributed Optimizer.
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer, loss="mse", metrics=["mse"], experimental_run_tf_function=False)

    # Use the optimized FUSE Mount
    checkpoint_dir = f"{working_dir}/{model_type}_horovod_checkpoint_weights.ckpt"

    callbacks = [
      hvd.callbacks.BroadcastGlobalVariablesCallback(0),
      hvd.callbacks.MetricAverageCallback(),
      hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1),
      ReduceLROnPlateau(monitor="loss", patience=10, verbose=1)
    ]

    # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
    if hvd.rank() == 0:
      callbacks.append(tf.keras.callbacks.ModelCheckpoint(checkpoint_dir))
      callbacks.append(tf.keras.callbacks.TensorBoard(f"{log_dir}/petastorm_{model_type}"))

    history = model.fit(dataset, callbacks=callbacks, steps_per_epoch=steps_per_epoch, epochs=NUM_EPOCHS)

### Train and Evaluate Model with Linear Activation

In [33]:
hr = HorovodRunner(np=-1) # using all workers is very slow
hr.run(run_training_horovod_petastorm, model_type="default")

Load in saved model + test data

In [35]:
import pandas as pd
from tensorflow.keras.models import load_model

model_type = "default"
model = load_model(f"{working_dir}/{model_type}_horovod_checkpoint_weights.ckpt")

testDF = pd.read_parquet(test_path.replace("file://", ""))
X_test = np.array([np.array(row) for row in testDF["features"].values]) # Reshape values
y_test = testDF["label"]

Let's evaluate and compare to how we did with vanilla ALS.

In [37]:
# Get MSE loss of model
loss, _ = model.evaluate(X_test, y_test)
print(f"Default model loss: {loss}")

### Train and Evaluate Model with Scaled Sigmoid

Now train the model with the scaled sigmoid activation function by setting `model_type` to `sigmoid`.

In [39]:
hr = HorovodRunner(np=-1) # using all workers is very slow
hr.run(run_training_horovod_petastorm, model_type="sigmoid")

How much better did the scaled sigmoid do?

In [41]:
model_type = "sigmoid"
model = load_model(f"{working_dir}/{model_type}_horovod_checkpoint_weights.ckpt")

# Get loss of model
loss, _ = model.evaluate(X_test, y_test)
print(f"Scaled sigmoid model loss: {loss}")


-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>