# Distributed Training of RNN Models on an HDInsight Spark Cluster

This notebook demonstrates energy consumption prediction using Distributed Keras on an HDInsight Spark cluster. It uses distkeras package to train a GRU model and an LSTM model in a distrbuted manner on the Spark cluster. The data used is the NYISO data which describes the hourly energy consumption of New York City.

This example requires the following packages
- distkeras
- keras
- tensorflow

In [1]:
import os
import time
import pyspark

import datetime as dt
import numpy as np
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *

from distkeras.evaluators import *
from distkeras.predictors import *
from distkeras.trainers import *
from distkeras.transformers import *
from distkeras.utils import *

from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import GRU

from pyspark.ml.feature import VectorAssembler

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
155,application_1526382914088_0018,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
Using TensorFlow backend.

In [2]:
## Setup the pyspark environment

# First, setup the Spark variables. You can modify them according to your need, e.g. increasing num_executors to reduce training time.
application_name = 'Distributed Keras NYISO'
master = 'yarn-client'
num_processes = 2 
num_executors = 2 #4, 3, 2, 1

# This variable is derived from the number of cores and executors, and will be used to assign the number of model trainers.
num_workers = num_executors * num_processes

print('Number of desired executors: ' + str(num_executors))
print('Number of desired processes per executor: ' + str(num_processes))
print('Total number of workers: ' + str(num_workers))

# Use the DataBricks CSV reader, which has some nice functionality regarding invalid values.
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'

# Modify the Spark configuration
conf = pyspark.SparkConf()
conf.set('spark.app.name', application_name)
conf.set('spark.master', master)
conf.set('spark.executor.cores', num_processes)
conf.set('spark.executor.instances', num_executors)
conf.set('spark.executor.memory', '2g')
conf.set('spark.locality.wait', '0')
conf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer');
conf.set('spark.local.dir', '/tmp/' + get_os_username() + '/dist-keras');

# Create the Spark context
sc.stop()
sc = pyspark.SparkContext(conf=conf)
sqlc = pyspark.sql.SQLContext(sc)

Number of desired executors: 2
Number of desired processes per executor: 2
Total number of workers: 4

In [3]:
## Define variables
LEN_SEQ_IN = 24
LEN_SEQ_OUT = 1
LEN_EXTRA_IN = LEN_SEQ_OUT
LEN_TEST_DATA = 120
N_UNITS = 128

BATCH_SIZE = 32
COM_WINDOW = 5
N_EPOCHS = 20

input_container = 'nyiso'
storage_account = 'publicdat'
storage_key = 'GNDru9kic+4jXle1roq4klFzwQ3Jy9CvsvMJl8o5b8W/DoX6v14PE8aPuX48V3r9yc3tuKP3NKCSUHqFobib4A=='
data_path = 'wasb://{}@{}.blob.core.windows.net/NYISO_data_1region_small.csv'.format(input_container, storage_account)

## Load and Prepare Data

In [4]:
## Load data

# Attach the blob storage to the spark cluster  
def attach_storage_container(spark, account, key):
    config = spark._sc._jsc.hadoopConfiguration()
    setting = 'fs.azure.account.key.' + account + '.blob.core.windows.net'
    if not config.get(setting):
        config.set(setting, key)

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
attach_storage_container(spark, storage_account, storage_key)
time.sleep(4) 

# Load the input dataset
raw_df = sqlc.read.format('com.databricks.spark.csv') \
                  .options(header='true', inferSchema='true') \
                  .load(data_path)

In [5]:
# Convert data type and select columns
func = F.udf (lambda x: dt.datetime.strptime(x[:19], '%m/%d/%Y %H:%M:%S'), TimestampType())
raw_df = raw_df.withColumn('TimeStamp', func(F.col('TimeStamp')))
df = raw_df.select(['TimeStamp', 'Name', 'HourAvgLoad', 'temperature'])
df.show(3)

+--------------------+------+-----------+-----------+
|           TimeStamp|  Name|HourAvgLoad|temperature|
+--------------------+------+-----------+-----------+
|2016-01-02 00:00:...|N.Y.C.|   4836.967|      34.44|
|2016-01-02 01:00:...|N.Y.C.|   4613.692|      33.79|
|2016-01-02 02:00:...|N.Y.C.|   4448.508|      33.13|
+--------------------+------+-----------+-----------+
only showing top 3 rows

In [6]:
# Normalize the data
orig_min = df.select(F.min('temperature')).collect()[0][0]
orig_max = df.select(F.max('temperature')).collect()[0][0]
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
                                o_min=orig_min, o_max=orig_max, \
                                input_col='temperature', \
                                output_col='NormTemp', \
                                is_vector=False)
df = transformer.transform(df)
orig_min = df.select(F.min('HourAvgLoad')).collect()[0][0]
orig_max = df.select(F.max('HourAvgLoad')).collect()[0][0]
transformer = MinMaxTransformer(n_min=0.0, n_max=1.0, \
                                o_min=orig_min, o_max=orig_max, \
                                input_col='HourAvgLoad', \
                                output_col='NormLoad', \
                                is_vector=False)
df = transformer.transform(df)
df.show(3)

+--------------------+------+-----------+-----------+-------------------+-------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|
+--------------------+------+-----------+-----------+-------------------+-------------------+
|2016-01-02 00:00:...|N.Y.C.|   4836.967|      34.44| 0.3608378292605522|0.13131163940862922|
|2016-01-02 01:00:...|N.Y.C.|   4613.692|      33.79| 0.3539617052787475|0.09975432572999143|
|2016-01-02 02:00:...|N.Y.C.|   4448.508|      33.13|0.34697979477414576|0.07640749647148892|
+--------------------+------+-----------+-----------+-------------------+-------------------+
only showing top 3 rows

In [7]:
# Create input features and output targets
wSpec = Window.partitionBy('Name').orderBy('TimeStamp')
for n_lag in range(LEN_SEQ_IN, 0, -1):
    df = df.withColumn('NormLoad_lag'+str(n_lag), F.lag(F.col('NormLoad'), count = n_lag).over(wSpec))
for n_lag in range(1, LEN_SEQ_OUT+1):
    df = df.withColumn('NormLoad_next'+str(n_lag), F.lead(F.col('NormLoad'), count = n_lag).over(wSpec))
    df = df.withColumn('OrigNormLoad_next'+str(n_lag), F.lead(F.col('HourAvgLoad'), count = n_lag).over(wSpec))
    df = df.withColumn('NormTemp_next'+str(n_lag), F.lead(F.col('NormTemp'), count = n_lag).over(wSpec))

In [8]:
# Drop null values
df = df.na.drop()
df.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|     NormLoad_lag15|    NormLoad_lag14|     NormLoad_lag13|     NormLoad_lag12|     NormLoad_lag11

In [9]:
# Assemble all the features
features = ['NormLoad_lag'+str(n) for n in range(LEN_SEQ_IN, 0, -1)] + ['NormTemp_next'+str(n) for n in range(1, LEN_SEQ_OUT+1)]
vector_assembler = VectorAssembler(inputCols=features, outputCol='features')
df = vector_assembler.transform(df)
df.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|     NormLoad_lag15|    NormLoad_lag14|     NormLoad_lag13|     NormLoad_lag1

In [10]:
# Reshape the vectors into the format that Keras requires
reshape_transformer = ReshapeTransformer('features', 'feature_matrix', (LEN_SEQ_IN+LEN_EXTRA_IN, 1))
df = reshape_transformer.transform(df)
df.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+--------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|     NormLoad_lag15|    NormLoad_lag14|     NormLoad_lag

In [11]:
# Assemble all the target variables
targets = ['NormLoad_next'+str(n) for n in range(1, LEN_SEQ_OUT+1)]
vector_assembler = VectorAssembler(inputCols=targets, outputCol='labels')
df = vector_assembler.transform(df)

targets = ['OrigNormLoad_next'+str(n) for n in range(1, LEN_SEQ_OUT+1)]
vector_assembler = VectorAssembler(inputCols=targets, outputCol='labels2')
df = vector_assembler.transform(df)

df.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+--------------------+--------------------+----------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|     NormLoad_lag15|    

In [12]:
# Reshape the labels
reshape_transformer = ReshapeTransformer('labels', 'label_matrix', (LEN_SEQ_OUT, 1))
df = reshape_transformer.transform(df)
df.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+--------------------+--------------------+----------+--------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|   

In [13]:
# Partition the data into training and testing sets
df_train = df.limit(df.count() - LEN_TEST_DATA)
df2 = df
df2 = df2.orderBy('TimeStamp', ascending=False)
df_test = df2.limit(LEN_TEST_DATA).orderBy('TimeStamp', ascending=True)

In [14]:
df_train.show(1)

+--------------------+------+-----------+-----------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+--------------------+--------------------+----------+--------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|           NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|      NormLoad_lag21|     NormLoad_lag20|     NormLoad_lag19|     NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|   

In [15]:
df_test.show(1)

+--------------------+------+-----------+-----------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+--------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+-------------------+------------------+-------------------+-------------------+-------------------+------------------+------------------+--------------------+--------------------+--------------------+----------+--------------------+
|           TimeStamp|  Name|HourAvgLoad|temperature|          NormTemp|           NormLoad|     NormLoad_lag24|     NormLoad_lag23|     NormLoad_lag22|     NormLoad_lag21|    NormLoad_lag20|      NormLoad_lag19|      NormLoad_lag18|     NormLoad_lag17|     NormLoad_lag16|     

In [16]:
# Select the desired columns to reduce network usage
df_train = df_train.select('features', 'feature_matrix', 'labels', 'labels2', 'label_matrix')
df_test = df_test.select('features', 'feature_matrix', 'labels', 'labels2', 'label_matrix')

# Repartition the data
df_train = df_train.repartition(num_workers)
df_test = df_test.repartition(num_workers)

In [17]:
# Cache the data
df_train.cache()
df_test.cache()

DataFrame[features: vector, feature_matrix: array<array<double>>, labels: vector, labels2: vector, label_matrix: array<array<double>>]

## Train and Evaluate GRU Model 

In [18]:
# Create the GRU network
gru_model = Sequential()
gru_model.add(GRU(N_UNITS, input_shape=(LEN_SEQ_IN+LEN_EXTRA_IN, 1))) 
gru_model.add(Dense(LEN_SEQ_OUT, activation='linear')) 
gru_model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
gru_1 (GRU)                  (None, 128)               49920     
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 129       
Total params: 50,049
Trainable params: 50,049
Non-trainable params: 0
_________________________________________________________________

In [19]:
# Fit the model
gru_trainer = ADAG(keras_model=gru_model, worker_optimizer='adagrad', loss='mean_squared_error',
                   num_workers=num_workers, batch_size=BATCH_SIZE, communication_window=COM_WINDOW, 
                   num_epoch=N_EPOCHS, features_col='feature_matrix', label_col='labels')
trained_gru_model = gru_trainer.train(df_train)

In [20]:
# gru_trainer = DynSGD(keras_model=gru_model, worker_optimizer='adagrad', loss='mean_squared_error',
#                    num_workers=num_workers, batch_size=BATCH_SIZE, communication_window=COM_WINDOW, 
#                    num_epoch=N_EPOCHS, features_col='feature_matrix', label_col='labels')
# trained_gru_model = gru_trainer.train(df_train)

In [21]:
print('Number of parameter updates ' + str(gru_trainer.parameter_server.num_updates))
print('Total training time in seconds ' + str(gru_trainer.get_training_time()))

Number of parameter updates 1425
Total training time in seconds 88.4753541946

In [22]:
predictor = ModelPredictor(keras_model=trained_gru_model, features_col='feature_matrix')
df_pred = predictor.predict(df_test.limit(24))
df_pred.show(24)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|            features|      feature_matrix|              labels|   labels2|        label_matrix|          prediction|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|[0.20217386818392...|[WrappedArray(0.2...|[0.04815170463909...|[4248.592]|[WrappedArray(0.0...|[0.06627381592988...|
|[0.02909929893365...|[WrappedArray(0.0...|[0.04732840891776...|[4242.767]|[WrappedArray(0.0...| [0.088986337184906]|
|[0.12240732516944...|[WrappedArray(0.1...|[0.30439454761984...|[6061.567]|[WrappedArray(0.3...|[0.34112250804901...|
|[0.30659815338656...|[WrappedArray(0.3...|[0.32994851610309...|[6242.367]|[WrappedArray(0.3...|[0.3110193908214569]|
|[0.31091731114585...|[WrappedArray(0.3...|[0.32862573075445...|[6233.008]|[WrappedArray(0.3...|[0.3101566433906555]|
|[0.28088517929190...|[WrappedArray(0.2...|[0.2466554406

In [23]:
# Convert the values to the original range
inverse_transformer = MinMaxTransformer(n_min=orig_min, n_max=orig_max, \
                                        o_min=0.0, o_max=1.0, \
                                        input_col='prediction', \
                                        output_col='prediction2', \
                                        is_vector=True)
df_pred = inverse_transformer.transform(df_pred)
df_pred.show(24)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|            features|      feature_matrix|              labels|   labels2|        label_matrix|          prediction|         prediction2|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|[0.14004507561741...|[WrappedArray(0.1...|[0.02862694626401...| [4110.45]|[WrappedArray(0.0...|[0.01342243980616...|[4002.8747414102418]|
|[0.01939755388594...|[WrappedArray(0.0...|[0.11942508659092...|[4752.867]|[WrappedArray(0.1...|[0.1151493564248085]| [4722.615259862646]|
|[0.20520091101028...|[WrappedArray(0.2...|[0.3104531560988476]|[6104.433]|[WrappedArray(0.3...|[0.3623005151748657]| [6471.264575576543]|
|[0.31344740843467...|[WrappedArray(0.3...|[0.32840439494336...|[6231.442]|[WrappedArray(0.3...|[0.3151266276836395]| [6137.498848973095]|
|[0.31198794892937...|[Wrap

In [24]:
# Compute Mean-Absolute-Percentage Errors (MAPEs)
def get_MAPE(actual, pred): 
    """
    Compute the mean-absolute-percentage-error (MAPE)
    """
    actual, pred = np.array(actual), np.array(pred)
    mape = np.mean(np.abs((actual - pred) / actual)) * 100
    if mape == np.inf:
        mape = np.nan
    return mape

actual = df_pred.select('labels2').rdd.map(lambda x: list(x[0])).collect()
pred = df_pred.select('prediction2').rdd.map(lambda x: list(x[0])).collect()
get_MAPE(actual, pred)

2.8088257141182944

## Train and Evaluate LSTM Model 

In [25]:
# Create the LSTM network
lstm_model = Sequential()
lstm_model.add(LSTM(N_UNITS, input_shape=(LEN_SEQ_IN+LEN_EXTRA_IN, 1))) 
lstm_model.add(Dense(LEN_SEQ_OUT, activation='linear')) 
lstm_model.summary()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
lstm_1 (LSTM)                (None, 128)               66560     
_________________________________________________________________
dense_2 (Dense)              (None, 1)                 129       
Total params: 66,689
Trainable params: 66,689
Non-trainable params: 0
_________________________________________________________________

In [26]:
# Fit the model
lstm_trainer = ADAG(keras_model=lstm_model, worker_optimizer='adam', loss='mean_squared_error',
                    num_workers=num_workers, batch_size=BATCH_SIZE, communication_window=COM_WINDOW, 
                    num_epoch=N_EPOCHS, features_col='feature_matrix', label_col='labels')
trained_lstm_model = lstm_trainer.train(df_train)

In [27]:
print('Number of parameter updates ' + str(lstm_trainer.parameter_server.num_updates))
print('Total training time in seconds ' + str(lstm_trainer.get_training_time()))

Number of parameter updates 1425
Total training time in seconds 99.2543179989

In [28]:
predictor = ModelPredictor(keras_model=trained_lstm_model, features_col='feature_matrix')
df_pred = predictor.predict(df_test.limit(24))
df_pred.show(24)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|            features|      feature_matrix|              labels|   labels2|        label_matrix|          prediction|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|[0.14004507561741...|[WrappedArray(0.1...|[0.02862694626401...| [4110.45]|[WrappedArray(0.0...|[-0.0090088704600...|
|[0.01939755388594...|[WrappedArray(0.0...|[0.11942508659092...|[4752.867]|[WrappedArray(0.1...|[0.1277359277009964]|
|[0.20520091101028...|[WrappedArray(0.2...|[0.3104531560988476]|[6104.433]|[WrappedArray(0.3...|[0.3263910710811615]|
|[0.31344740843467...|[WrappedArray(0.3...|[0.32840439494336...|[6231.442]|[WrappedArray(0.3...|[0.3154873549938202]|
|[0.31198794892937...|[WrappedArray(0.3...|[0.3032673745078247]|[6053.592]|[WrappedArray(0.3...|[0.3446936011314392]|
|[0.24669543937985...|[WrappedArray(0.2...|[0.1986346152

In [29]:
# Convert the values to the original range
df_pred = inverse_transformer.transform(df_pred)
df_pred.show(24)

+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|            features|      feature_matrix|              labels|   labels2|        label_matrix|          prediction|         prediction2|
+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+
|[0.14004507561741...|[WrappedArray(0.1...|[0.02862694626401...| [4110.45]|[WrappedArray(0.0...|[-0.0090088704600...| [3844.168241525599]|
|[0.01939755388594...|[WrappedArray(0.0...|[0.11942508659092...|[4752.867]|[WrappedArray(0.1...|[0.1277359277009964]| [4811.668045860499]|
|[0.20520091101028...|[WrappedArray(0.2...|[0.3104531560988476]|[6104.433]|[WrappedArray(0.3...|[0.3263910710811615]| [6217.197286716997]|
|[0.31344740843467...|[WrappedArray(0.3...|[0.32840439494336...|[6231.442]|[WrappedArray(0.3...|[0.3154873549938202]|[6140.0510747740855]|
|[0.31198794892937...|[Wrap

In [30]:
# Compute MAPE
actual = df_pred.select('labels2').rdd.map(lambda x: list(x[0])).collect()
pred = df_pred.select('prediction2').rdd.map(lambda x: list(x[0])).collect()
get_MAPE(actual, pred)

3.5870739850117217