# This Notebook uses the Household Electric Power Consumption Dataset( https://archive.ics.uci.edu/ml/datasets/individual+household+electric+power+consumption) and builds a small LSTM-based Model to demonstrate the usage of MLflow Tracking

In [1]:
import mlflow
import numpy as np
import os
import shutil
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras
from itertools import product
from dotenv import load_dotenv
load_dotenv()

%load_ext watermark
%watermark -v -iv

mlflow           1.12.0
pandas           1.0.5
tensorflow.keras 2.4.0
tensorflow       2.3.1
numpy            1.18.1
CPython 3.7.4
IPython 7.8.0


## Setting Registry and Tracking URI for MLflow

In [2]:
# Use this registry uri when mlflow is created by docker container with a mysql db backend
#registry_uri = os.path.expandvars('mysql+pymysql://${MYSQL_USER}:${MYSQL_PASSWORD}@localhost:3306/${MYSQL_DATABASE}')

# Use this registry uri when mlflow is running locally by the command:
# "mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 0.0.0.0"
registry_uri = 'sqlite:///mlflow.db'

tracking_uri = 'http://localhost:5000'

mlflow.tracking.set_registry_uri(registry_uri)
mlflow.tracking.set_tracking_uri(tracking_uri)

## Attribute Information of Household Electric Power Consumption Dataset:

1. date: Date in format dd/mm/yyyy
2. time: time in format hh:mm:ss
3. global_active_power: household global minute-averaged active power (in kilowatt)
4. global_reactive_power: household global minute-averaged reactive power (in kilowatt)
5. voltage: minute-averaged voltage (in volt)
6. global_intensity: household global minute-averaged current intensity (in ampere)
7. sub_metering_1: energy sub-metering No. 1 (in watt-hour of active energy). It corresponds to the kitchen, containing mainly a dishwasher, an oven and a microwave (hot plates are not electric but gas powered).
8. sub_metering_2: energy sub-metering No. 2 (in watt-hour of active energy). It corresponds to the laundry room, containing a washing-machine, a tumble-drier, a refrigerator and a light.
9. sub_metering_3: energy sub-metering No. 3 (in watt-hour of active energy). It corresponds to an electric water-heater and an air-conditioner.

In [3]:
# The data includes 'nan' and '?' as a string, both will be imported as numpy nan
# Note that I will only use the first 2000 rows for the example

df = pd.read_csv('./household_power_consumption.txt', sep=';', 
                 parse_dates={'dt' : ['Date', 'Time']}, infer_datetime_format=True, 
                 low_memory=False, na_values=['nan','?'], index_col='dt')
df

Unnamed: 0_level_0,Global_active_power,Global_reactive_power,Voltage,Global_intensity,Sub_metering_1,Sub_metering_2,Sub_metering_3
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2006-12-16 17:24:00,4.216,0.418,234.84,18.4,0.0,1.0,17.0
2006-12-16 17:25:00,5.360,0.436,233.63,23.0,0.0,1.0,16.0
2006-12-16 17:26:00,5.374,0.498,233.29,23.0,0.0,2.0,17.0
2006-12-16 17:27:00,5.388,0.502,233.74,23.0,0.0,1.0,17.0
2006-12-16 17:28:00,3.666,0.528,235.68,15.8,0.0,1.0,17.0
...,...,...,...,...,...,...,...
2006-12-18 02:39:00,0.318,0.140,246.58,1.4,0.0,0.0,0.0
2006-12-18 02:40:00,0.312,0.138,245.93,1.4,0.0,0.0,0.0
2006-12-18 02:41:00,0.310,0.138,246.03,1.4,0.0,0.0,0.0
2006-12-18 02:42:00,0.308,0.138,245.98,1.4,0.0,0.0,0.0


In [4]:
# filling nan with mean in any columns

for j in range(0, 7):
    df.iloc[:, j] = df.iloc[:, j].fillna(df.iloc[:, j].mean())

## Standardization

In [5]:
mean = df.mean(axis=0)
std = df.std(axis=0)
standardized = (df - mean) / std
standardized

Unnamed: 0_level_0,Global_active_power,Global_reactive_power,Voltage,Global_intensity,Sub_metering_1,Sub_metering_2,Sub_metering_3
dt,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2006-12-16 17:24:00,1.451287,2.461631,-1.139460,1.554075,-0.174796,-0.176156,0.915653
2006-12-16 17:25:00,2.328673,2.621166,-1.422746,2.394718,-0.174796,-0.176156,0.799254
2006-12-16 17:26:00,2.339410,3.170675,-1.502347,2.394718,-0.174796,-0.048967,0.915653
2006-12-16 17:27:00,2.350147,3.206127,-1.396992,2.394718,-0.174796,-0.176156,0.915653
2006-12-16 17:28:00,1.029467,3.436566,-0.942798,1.078929,-0.174796,-0.176156,0.915653
...,...,...,...,...,...,...,...
2006-12-18 02:39:00,-1.538268,-0.002296,1.609117,-1.552650,-0.174796,-0.303344,-1.063131
2006-12-18 02:40:00,-1.542869,-0.020022,1.456939,-1.552650,-0.174796,-0.303344,-1.063131
2006-12-18 02:41:00,-1.544403,-0.020022,1.480351,-1.552650,-0.174796,-0.303344,-1.063131
2006-12-18 02:42:00,-1.545937,-0.020022,1.468645,-1.552650,-0.174796,-0.303344,-1.063131


## Grid Search Hyperparameter
Dictionary with different hyperparameters to train on.
MLflow will track those in a database.

In [6]:
grid_search_dic = {'hidden_layer_size': [20, 40],
                   'batch_size': [64],
                   'future_length': [5],
                   'window_length': [50],
                   'dropout_fc': [0.0, 0.2],
                   'n_output_features': [1]}

# Cartesian product
grid_search_param = [dict(zip(grid_search_dic, v)) for v in product(*grid_search_dic.values())]
grid_search_param

[{'hidden_layer_size': 20,
  'batch_size': 64,
  'future_length': 5,
  'window_length': 50,
  'dropout_fc': 0.0,
  'n_output_features': 1},
 {'hidden_layer_size': 20,
  'batch_size': 64,
  'future_length': 5,
  'window_length': 50,
  'dropout_fc': 0.2,
  'n_output_features': 1},
 {'hidden_layer_size': 40,
  'batch_size': 64,
  'future_length': 5,
  'window_length': 50,
  'dropout_fc': 0.0,
  'n_output_features': 1},
 {'hidden_layer_size': 40,
  'batch_size': 64,
  'future_length': 5,
  'window_length': 50,
  'dropout_fc': 0.2,
  'n_output_features': 1}]

### LSTM Model in functional API
2 layer LSTM model for time series forecasting.
- Input: 50 rows (time steps) and all 7 columns
- Output: 5 future time steps of 'Global_active_power' column

In [7]:
def build_lstm_2_layer_model(window_length=50, future_length=1, n_input_features=7,
                     n_output_features=3, units_lstm_layer=30, dropout_rate=0.2):
    """Builds 2 Layer LSTM-based TF Model in functional API.
    Args:
      window_length: Input Data as Numpy Array, Shape (rows, n_features)
      future_length: Number of time steps that will be predicted in the future.
      n_input_features: Number of features that will be used as Input.
      n_output_features: Number of features that will be predicted.
      units_lstm_layer: Number of Neurons for the LSTM Layers.
      dropout_rate: Dropout Rate for the last Fully Connected Dense Layer.
    Returns:
      keras.models.Model
    """
    inputs = keras.layers.Input(shape=[window_length, n_input_features], dtype=np.float32)

    # Layer1
    lstm1_output, lstm1_state_h, lstm1_state_c = keras.layers.LSTM(units=units_lstm_layer, return_state=True,
                                                                   return_sequences=True)(inputs)
    lstm1_state = [lstm1_state_h, lstm1_state_c]

    # Layer 2
    lstm2_output, lstm2_state_h, lstm2_state_c = keras.layers.LSTM(units=units_lstm_layer, return_state=True,
                                                                   return_sequences=True)(lstm1_output,
                                                                                         initial_state=lstm1_state)

    reshaped = tf.reshape(lstm2_output,
                          [-1, window_length * units_lstm_layer])
    # Dropout
    dropout = tf.keras.layers.Dropout(dropout_rate)(reshaped)
    
    fc_layer = keras.layers.Dense(n_output_features * future_length, kernel_initializer='he_normal', dtype=tf.float32)(
        dropout)
    
    output = tf.reshape(fc_layer,
                        [-1, future_length, n_output_features])

    model = keras.models.Model(inputs=[inputs],
                               outputs=[output])
    return model

### Applying Sliding Window
I will use the TF Data API (https://www.tensorflow.org/guide/data) for applying sliding windows at the runtime of training to save memory.

The function will return a zipped tf.data.Dataset with the following Shapes:
- x: (batches, window_length, n_features)
- y: (batches, future_length, n_output_features)

In [8]:
def apply_sliding_window_tf_data_api(train_data_x, batch_size=64, window_length=50, future_length=1,
                                     n_output_features=1, validate=False, shift=1):
    """Applies sliding window on the fly by using the TF Data API.
    Args:
      train_data_x: Input Data as Numpy Array, Shape (rows, n_features)
      batch_size: Batch Size.
      window_length: Window Length or Window Size.
      future_length: Number of time steps that will be predicted in the future.
      n_output_features: Number of features that will be predicted.
      validate: True if input data is a validation set and does not need to be shuffled
      shift: Shifts the Sliding Window by this Parameter.
    Returns:
      tf.data.Dataset
    """
    def make_window_dataset(ds, window_size=5, shift=1, stride=1):
        windows = ds.window(window_size, shift=shift, stride=stride)

        def sub_to_batch(sub):
            return sub.batch(window_size, drop_remainder=True)

        windows = windows.flat_map(sub_to_batch)
        return windows

    X = tf.data.Dataset.from_tensor_slices(train_data_x.astype(np.float32))
    y = tf.data.Dataset.from_tensor_slices(train_data_x[window_length:, :n_output_features].astype(np.float32))
    ds_x = make_window_dataset(X, window_size=window_length, shift=shift, stride=1)
    ds_y = make_window_dataset(y, window_size=future_length, shift=shift, stride=1)
    
    if not validate:
        train_tf_data = tf.data.Dataset.zip((ds_x, ds_y)).cache() \
            .shuffle(buffer_size=200000, reshuffle_each_iteration=True).batch(batch_size).prefetch(5)
        return train_tf_data
    else:
        return tf.data.Dataset.zip((ds_x, ds_y)).batch(batch_size).prefetch(1)  

## Custom TF Callback to log Metrics by MLflow

In [9]:
class MlflowLogging(tf.keras.callbacks.Callback):
    def __init__(self, **kwargs):
        super().__init__()  # handles base args (e.g., dtype)

    def on_epoch_end(self, epoch, logs=None):
        keys = list(logs.keys())
        for key in keys:
            mlflow.log_metric(str(key), logs.get(key), step=epoch)

# Training

In [10]:
# enable gpu growth if gpu is available
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
for device in gpu_devices: tf.config.experimental.set_memory_growth(device, True)

with mlflow.start_run() as parent_run:
    for params in grid_search_param:
        batch_size = params['batch_size']
        window_length = params['window_length']
        future_length = params['future_length']
        dropout_fc = params['dropout_fc']
        hidden_layer_size = params['hidden_layer_size']
        n_output_features = params['n_output_features']
        
        with mlflow.start_run(nested=True) as child_run:
            # log parameter
            mlflow.log_param('batch_size', batch_size)
            mlflow.log_param('window_length', window_length)
            mlflow.log_param('hidden_layer_size', hidden_layer_size)
            mlflow.log_param('dropout_fc_layer', dropout_fc)
            mlflow.log_param('future_length', future_length)
            mlflow.log_param('n_output_features', n_output_features)



            model = build_lstm_2_layer_model(window_length=window_length,
                                             future_length=future_length,
                                             n_output_features=n_output_features,
                                             units_lstm_layer=hidden_layer_size,
                                             dropout_rate=dropout_fc)
        
            data_sliding_window = apply_sliding_window_tf_data_api(standardized.values,
                                                                   batch_size=batch_size,
                                                                   window_length=window_length,
                                                                   future_length=future_length,
                                                                   n_output_features=n_output_features)
        
            model.compile(loss='mse', optimizer=keras.optimizers.Nadam(learning_rate=1e-3),
                          metrics=['mse', 'mae'])
        
            model.fit(data_sliding_window, shuffle=True, initial_epoch=0, epochs=10,
                      callbacks=[MlflowLogging()])
            
            model.save("./tmp")

            mlflow.tensorflow.log_model(tf_saved_model_dir='./tmp',
                                        tf_meta_graph_tags='serve',
                                        tf_signature_def_key='serving_default',
                                        artifact_path='saved_model',
                                        registered_model_name='Electric Power Consumption Forecasting')
            
            shutil.rmtree("./tmp")

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: ./tmp\assets


2020/12/01 11:04:05 INFO mlflow.tensorflow: Validating the specified TensorFlow model by attempting to load it in a new TensorFlow graph...
2020/12/01 11:04:07 INFO mlflow.tensorflow: Validation succeeded!
Registered model 'Electric Power Consumption Forecasting' already exists. Creating a new version of this model...
2020/12/01 11:04:08 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: Electric Power Consumption Forecasting, version 5
Created version '5' of model 'Electric Power Consumption Forecasting'.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: ./tmp\assets


2020/12/01 11:04:21 INFO mlflow.tensorflow: Validating the specified TensorFlow model by attempting to load it in a new TensorFlow graph...
2020/12/01 11:04:23 INFO mlflow.tensorflow: Validation succeeded!
Registered model 'Electric Power Consumption Forecasting' already exists. Creating a new version of this model...
2020/12/01 11:04:23 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: Electric Power Consumption Forecasting, version 6
Created version '6' of model 'Electric Power Consumption Forecasting'.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: ./tmp\assets


2020/12/01 11:04:35 INFO mlflow.tensorflow: Validating the specified TensorFlow model by attempting to load it in a new TensorFlow graph...
2020/12/01 11:04:38 INFO mlflow.tensorflow: Validation succeeded!
Registered model 'Electric Power Consumption Forecasting' already exists. Creating a new version of this model...
2020/12/01 11:04:38 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: Electric Power Consumption Forecasting, version 7
Created version '7' of model 'Electric Power Consumption Forecasting'.


Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
INFO:tensorflow:Assets written to: ./tmp\assets


2020/12/01 11:04:51 INFO mlflow.tensorflow: Validating the specified TensorFlow model by attempting to load it in a new TensorFlow graph...
2020/12/01 11:04:53 INFO mlflow.tensorflow: Validation succeeded!
Registered model 'Electric Power Consumption Forecasting' already exists. Creating a new version of this model...
2020/12/01 11:04:54 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: Electric Power Consumption Forecasting, version 8
Created version '8' of model 'Electric Power Consumption Forecasting'.
