# Distributed Keras *
<sub>* Lack of a better name, but at least it describes what it is :)</sub>

**Joeri Hermans** (Technical Student, IT-DB-SAS, CERN)             
*Departement of Knowledge Engineering*         
*Maastricht University, The Netherlands*

In [None]:
!(date +%d\ %B\ %G)

This presentation will give the reader an introduction to the topic of distributed deep learning (DDL) and to the issues which need to be taken into consideration when applying this technique. Furthermore, we will introduce a DDL framework based on [Apache Spark](https://spark.apache.org/) and [Keras](https://keras.io).

## Contents

- [Introduction and problem statement](#Distributed-Deep-Learning,-an-introduction.)
  - [Model parallelism](#Model-parallelism)
  - [Data parallelism](#Data-parallelism)
- [Conclusion](#Conclusion)
- [References](#References)

## Distributed Deep Learning, an introduction.

Unsupervised feature learning and deep learning has shown that being able to train large models can dramatically improve performance. However, consider the problem of training a deep network with billions of parameters. How do we achieve this without waiting for days, or even weeks, and thus leaving more time to tune the model? Dean et al. [[1]](https://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf) proposed an training paradigm which allows us to train a model on multiple physical machines. The authors describe two methods to achieve this, i.e., **data parallelism** and **model parallelism**<sup>1</sup>.

### Model parallelism

In model parallelism a *single* model is distributed over multiple machines. The performance benefits of distributeing a deep network across multiple machines depends on the connectivity structure and computational needs of the model. Models with a large number of parameters typically benefit from access to more CPUs and memory, up to the point where communication costs, i.e., propagation of the weight updates and synchronization mechanisms, dominate [[1]](https://papers.nips.cc/paper/4687-large-scale-distributed-deep-networks.pdf).

![Model Parallelism](../resources/model_parallelism.png)

### Data parallelism

TODO

![Data Parallelism](../resources/data_parallelism.png)

<sub>**1:** Hybrids are possible as well.</sub>

In [None]:
import numpy as np

import time

import requests

from keras.optimizers import *
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation

from pyspark import SparkContext
from pyspark import SparkConf

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from distkeras.distributed import *
from distkeras.utils import *

In [None]:
# Modify these variables according to your needs.
application_name = "Distributed Keras Notebook"
using_spark_2 = False
yarn = "p01001532067275.cern.ch:8088" # Address:port of ResourceManager
if not yarn:
    # Tell master to use local resources.
    master = "local[*]"
    num_cores = 3
    num_executors = 1
else:
    # Tell master to use YARN.
    master = "yarn-client"
    max_num_executors = 7
    num_cores = 4

In [None]:
# Check if YARN is specified.
if yarn:
    # Build the ResourceManager metrics URI.
    yarn_metrics_uri = "http://" + yarn + "/ws/v1/cluster/metrics"
    # Fetch the number of available nodes
    response = requests.get(yarn_metrics_uri)
    data = response.json()
    # Fetch the number of active nodes.
    num_active_nodes = int(data['clusterMetrics']['activeNodes'])
    # Assign the number of executors.
    num_executors = num_active_nodes
    if num_executors > max_num_executors:
        num_executors = max_num_executors

In [None]:
# This variable is derived from the number of cores and executors, and will 
# be used to assign the number of model trainers.
num_workers = num_executors * num_cores

print("Number of desired executors: " + `num_executors`)
print("Number of desired cores / executor: " + `num_cores`)
print("Total number of workers: " + `num_workers`)

In [None]:
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

## Preparing a Spark Context

In order to read our (big) dataset into our Spark Cluster, we first need a Spark Context. However, since Spark 2.0 there are some changes in the initialization of a Spark Context. For example, SQLContext and HiveContext do not have to be initialized seperatly anymore, i.e., the initialization process is simplified.

In [None]:
conf = SparkConf()
conf.set("spark.app.name", application_name)
conf.set("spark.master", master)
conf.set("spark.executor.cores", `num_cores`)
conf.set("spark.executor.instances", `num_executors`)
conf.set("spark.locality.wait", "0")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

# Check if the user is running Spark 2.0 +
if using_spark_2:
    sc = SparkSession.builder.config(conf=conf) \
            .appName(application_name) \
            .getOrCreate()
else:
    # Create the Spark context.
    sc = SparkContext(conf=conf)
    # Add the missing imports
    from pyspark import SQLContext
    sqlContext = SQLContext(sc)

In [None]:
# Check if we are using Spark 2.0
if using_spark_2:
    reader = sc
else:
    reader = sqlContext
# Read the dataset.
raw_dataset = reader.read.format('com.databricks.spark.csv') \
                    .options(header='true', inferSchema='true').load("data/atlas_higgs.csv")

In [None]:
# Double-check the inferred schema.
raw_dataset.printSchema()

## Dataset preprocessing and normalization

Since Spark's MLlib has some nice features for some distributed dataprocessing, we follow MLlib (dataframe) API in order to ensure compatibility. What it basically boils down to, is that all the features (which can have different type) will be aggregated into a single column. More information on Spark MLlib (and other APIs) can be found here: [http://spark.apache.org/docs/latest/ml-guide.html](http://spark.apache.org/docs/latest/ml-guide.html)

In the following steps we will show you how to extract the desired columns from the dataset and prepare the for further processing.

In [None]:
# First, we would like to extract the desired features from the raw dataset.
# We do this by constructing a list with all desired columns.
features = raw_dataset.columns
features.remove('EventId')
features.remove('Weight')
features.remove('Label')
# Next, we use Spark's VectorAssembler to "assemble" (create) a vector of all desired features.
# http://spark.apache.org/docs/latest/ml-features.html#vectorassembler
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")
# This transformer will take all columns specified in features, and create an additional column "features"
# which will contain all the desired features aggregated into a single vector.
dataset = vector_assembler.transform(raw_dataset)

In [None]:
# Apply feature normalization with standard scaling. This will transform a feature to have mean 0, and std 1.
# http://spark.apache.org/docs/latest/ml-features.html#standardscaler
standard_scaler = StandardScaler(inputCol="features", outputCol="features_normalized", withStd=True, withMean=True)
standard_scaler_model = standard_scaler.fit(dataset)
dataset = standard_scaler_model.transform(dataset)

In [None]:
# If we look at the dataset, the Label column consists of 2 entries, i.e., b (background), and s (signal).
# Our neural network will not be able to handle these characters, so instead, we convert it to an index
# so we can indicate that output neuron with index 0 is background, and 1 is signal.
# http://spark.apache.org/docs/latest/ml-features.html#stringindexer
label_indexer = StringIndexer(inputCol="Label", outputCol="label_index").fit(dataset)
dataset = label_indexer.transform(dataset)

In [None]:
# Define some properties of the neural network for later use.
nb_classes = 2 # Number of output classes (signal and background)
nb_features = len(features)

In [None]:
# We observe that Keras is not able to work with these indexes. What it actually
# expects is a vector with an identical size to the output layer. Our framework provides
# functionality to do this with ease. What it basically does, given an expected vector dimension,
# it prepares zero vector with the specified dimensionality, and will set the neuron with a specific
# label index to one.

# For example:
# 1. Assume we have a label index: 3
# 2. Output dimensionality: 5
# With these parameters, we obtain the following vector in the DataFrame column: [0,0,0,1,0]

label_vector_transformer = LabelVectorTransformer(output_dim=nb_classes, input_col="label_index", output_col="label")
dataset = label_vector_transformer.transform(dataset).toDF()
# Only select the columns we need (less data shuffling) while training.
dataset = dataset.select("features_normalized", "label_index", "label")

In [None]:
# Shuffle the dataset.
dataset = shuffle(dataset)

# Note: we also support shuffling in the trainers by default.
# However, since this would require a shuffle for every training we will only do it once here.
# If you want, you can enable the training shuffling by specifying shuffle=True in the train() function.

In [None]:
# Finally, we create a trainingset and a testset.
(trainingSet, testSet) = dataset.randomSplit([0.7, 0.3])
trainingSet.cache()
testSet.cache()

## Model construction

We will now construct a relatively simple Keras model (without any modifications) which, hopefully, will be able to classify the dataset.

In [None]:
model = Sequential()
model.add(Dense(600, input_shape=(nb_features,)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(600))
model.add(Activation('relu'))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))

In [None]:
# Summarize the model.
model.summary()

### Worker Optimizer and Loss

In order to evaluate the gradient on the model replicas, we have to specify an optimizer and a loss method. For this, we just follow the Keras API as defined in the documentation: [https://keras.io/optimizers/](https://keras.io/optimizers/) and [https://keras.io/objectives/](https://keras.io/objectives/).

In [None]:
optimizer = 'adagrad'
loss = 'categorical_crossentropy'

## Training

In the following cells we will train and evaluate the model using different distributed trainers, however, we will as well provide a baseline metric using a **SingleTrainer**, which is basically an instance of the Adagrad optimizer running on Spark.

Furthermore, we will also evaluate every training using Spark's MulticlassClassificationEvaluator [https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator).

### Evaluator

We will evaluate all algorithms using the F1 [https://en.wikipedia.org/wiki/F1_score](https://en.wikipedia.org/wiki/F1_score) metric.

In [None]:
metric_name = "f1"
evaluator = MulticlassClassificationEvaluator(metricName=metric_name, predictionCol="predicted_index", labelCol="label_index")

In [None]:
def evaluate(model):
    global testSet
    
    # Clear the prediction column from the testset.
    testSet = testSet.select("features_normalized", "label_index", "label")
    # Apply a prediction from a trained model.
    predictor = ModelPredictor(keras_model=trained_model, features_col="features_normalized")
    testSet = predictor.predict(testSet).toDF()
    # Transform the prediction vector to an indexed label.
    testSet = index_transformer.transform(testSet).toDF()
    # Store the F1 score of the SingleTrainer.
    score = evaluator.evaluate(testSet)
    
    return score

However, before we can evaluate the prediction, we will also need to converted the neural network prediction, which is a vector which has the same dimensionality as the output layer, to a label index. This is shown in the cell below.

In [None]:
index_transformer = LabelIndexTransformer(output_dim=nb_classes)

We will also have to keep track of the evaluated models.

In [None]:
results = {}
time_spent = {}

In [None]:
# Distribute the training and test set to the workers.
testSet = testSet.repartition(num_workers)
trainingSet = trainingSet.repartition(num_workers)

### SingleTrainer

In [None]:
time_start = time.time()
single_trainer = SingleTrainer(keras_model=model, loss=loss, worker_optimizer=optimizer, 
                               features_col="features_normalized", num_epoch=1, batch_size=64)
trained_model = single_trainer.train(trainingSet)
dt = time.time() - time_start
time_spent['single_trainer'] = dt

# Note that this time also includes shuffling the data from different places. If you run this
# for the second time (after each other), it will be a lot faster since the data is already
# on the physical machine.
print("Time spent (SingleTrainer): " + `dt` + " seconds.")

In [None]:
score = evaluate(trained_model)
results['single_trainer'] = score

print("F1 (SingleTrainer): " + `score`)

### EASGD

In [None]:
time_start = time.time()
easgd_trainer = EASGD(keras_model=model, loss=loss, worker_optimizer=optimizer, 
                      features_col="features_normalized", batch_size=64, num_epoch=1,
                      num_workers=num_workers, rho=5.0, learning_rate=0.04)
trained_model = easgd_trainer.train(trainingSet)
dt = time.time() - time_start
time_spent['easgd_trainer'] = dt

print("Time spent (EASGD): " + `dt` + " seconds.")

In [None]:
score = evaluate(trained_model)
results['easgd_trainer'] = score

print("F1 (EASGD): " + `score`)

### Asynchronous EASGD

In [None]:
time_start = time.time()
async_easgd_trainer = AsynchronousEASGD(keras_model=model, features_col="features_normalized",
                                        batch_size=10, num_workers=num_workers, rho=5.0, learning_rate=0.05,
                                        worker_optimizer=optimizer, loss=loss, communication_window=30)
trained_model = async_easgd_trainer.train(trainingSet)
dt = time.time() - time_start
time_spent['async_easgd_trainer'] = dt

print("Time spent (Asynchronous EASGD): " + `dt` + " seconds.")

In [None]:
score = evaluate(trained_model)
results['async_easgd_trainer'] = score

print("F1 (Asynchronous EASGD): " + `score`)

### Asynchronous EAMSGD

In [None]:
time_start = time.time()
async_eamsgd_trainer = AsynchronousEAMSGD(keras_model=model, features_col="features_normalized",
                                          batch_size=10, num_workers=num_workers, rho=5.0, learning_rate=0.05,
                                          worker_optimizer=optimizer, loss=loss, communication_window=10, momentum=0.85)
trained_model = async_eamsgd_trainer.train(trainingSet)
dt = time.time() - time_start
time_spent['async_eamsgd_trainer'] = dt

print("Time spend (Asynchronous EAMSGD): " + `dt` + " seconds.")

In [None]:
score = evaluate(trained_model)
results['async_eamsgd_trainer'] = score

print("F1 (Asynchronous EAMSGD): " + `score`)

### DOWNPOUR

In [None]:
time_start = time.time()
downpour_trainer = DOWNPOUR(keras_model=model, worker_optimizer=optimizer, loss=loss, features_col="features_normalized", batch_size=62,
                   num_workers=num_workers, learning_rate=0.05)
trained_model = downpour_trainer.train(trainingSet)
dt = time.time() - time_start
time_spent['downpour_trainer'] = dt

print("Time spent (DOWNPOUR): " + `dt` + " seconds.")

In [None]:
score = evaluate(trained_model)
results['downpour_trainer'] = score

print("F1 (DOWNPOUR): " + `score`)

### Summary of initial experiments

In [None]:
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
# Plot the F1 score of the trainers.
plt.bar(range(len(results)), results.values(), 0.2, align='center', color='b')
plt.xticks(range(len(results)), results.keys(), rotation=25)
plt.xlabel('Trainers')
plt.ylabel('F1')
plt.title("F1 score with different trainers - (higher is better)")
plt.show()

In [None]:
# Plot the training time of the trainers.
plt.bar(range(len(time_spent)), time_spent.values(), 0.2, align='center', color='b')
plt.xticks(range(len(time_spent)), time_spent.keys(), rotation=25)
plt.xlabel('Trainers')
plt.ylabel('Seconds')
plt.title("Training time - (lower is better)")
plt.show()

In [None]:
print("Number of parallel workers: " + `num_workers`)
improvement = (time_spent['single_trainer'] / time_spent['async_easgd_trainer']) * 100.0
print("Improvement using AEASGD: " + `improvement` + " %")

## Conclusion

## References