In [1]:
import kfp
import kfp.components as comp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from typing import NamedTuple
from itertools import product

%load_ext watermark
%watermark -v -iv

kfp 1.1.2
CPython 3.6.9
IPython 7.11.1


In [2]:
# In v1.1.0, in-cluster communication from notebook to Kubeflow Pipeline is not supported in this phase.
# In order to use kfp as previous, user needs to pass a cookie to KFP for communication as a workaround.
# https://www.kubeflow.org/docs/aws/pipeline/#authenticate-kubeflow-pipeline-using-sdk-inside-cluster

authservice_session='authservice_session=<cookie>'
client = kfp.Client(host='http://192.168.52.92:30178/pipeline', cookies=authservice_session)
namespace='anonymous'
client.list_experiments(namespace=namespace)

{'experiments': [{'created_at': datetime.datetime(2020, 12, 16, 7, 31, 48, tzinfo=tzlocal()),
                  'description': None,
                  'id': '2e84ecc3-6228-4675-906f-a6e538a7ab1c',
                  'name': 'Default',
                  'resource_references': [{'key': {'id': 'anonymous',
                                                   'type': 'NAMESPACE'},
                                           'name': None,
                                           'relationship': 'OWNER'}],
                  'storage_state': 'STORAGESTATE_AVAILABLE'}],
 'next_page_token': None,
 'total_size': 1}

## Component: Load Raw Data

In [3]:
def load_raw_data(path: str, raw_data_path: OutputPath('CSV')):
    print(raw_data_path)

    import subprocess

    # downlaod the dataset from the mlflow repo
    def download_dataset():
        subprocess.call(['apt-get', 'update'])
        subprocess.call(['apt-get', 'install', 'curl', '-y'])
        subprocess.call(['curl', '-o', 'household_power_consumption.txt',
                         'https://raw.githubusercontent.com/felix-exel/mlflow/master/household_power_consumption.txt'])

    download_dataset()

    import pandas as pd

    df = pd.read_csv(path, sep=';',
                     parse_dates={'dt': ['Date', 'Time']}, infer_datetime_format=True,
                     low_memory=False, na_values=['nan', '?'], index_col='dt')

    df.to_csv(raw_data_path)

In [4]:
load_raw_data_op = comp.func_to_container_op(load_raw_data,
                                             base_image='python:3.7-slim',
                                             packages_to_install=['pandas==1.0.5'])

## Component: Filling NaNs with Mean

In [5]:
def filling_nans_with_mean(input_data_path: InputPath('CSV'), output_data_path: OutputPath('CSV')):

    import pandas as pd

    df = pd.read_csv(input_data_path, sep=',', header=0, infer_datetime_format=True,
                     low_memory=False, parse_dates=True, index_col='dt')

    # filling nan with mean in any columns
    for j in range(0, df.shape[1]):
        df.iloc[:, j] = df.iloc[:, j].fillna(df.iloc[:, j].mean())

    print(df.isnull().sum())

    df.to_csv(output_data_path)

In [6]:
filling_nans_with_mean_op = comp.func_to_container_op(filling_nans_with_mean,
                                                      base_image='python:3.7-slim',
                                                      packages_to_install=['pandas==1.0.5'])

## Component: Split Data

In [7]:
def split_data(input_data_path: InputPath('CSV'), output_train_data_path: OutputPath('CSV'), output_test_data_path: OutputPath('CSV')):

    import pandas as pd

    df = pd.read_csv(input_data_path, sep=',', header=0, infer_datetime_format=True,
                     low_memory=False, parse_dates=True, index_col='dt')

    # split training and test set
    number_training = round(df.shape[0] * 0.8)

    train_data = df[:number_training]
    test_data = df[number_training:]

    train_data.to_csv(output_train_data_path)
    test_data.to_csv(output_test_data_path)

In [8]:
split_data_op = comp.func_to_container_op(split_data,
                                          base_image='python:3.7-slim',
                                          packages_to_install=['pandas==1.0.5'])

## Component: Standardization

In [9]:
def standardization(input_train_data_path: InputPath('CSV'), input_test_data_path: InputPath('CSV'),
                    output_train_data_path: OutputPath('CSV'), output_test_data_path: OutputPath('CSV')):

    import pandas as pd

    train_data = pd.read_csv(input_train_data_path, sep=',', header=0, infer_datetime_format=True,
                             low_memory=False, parse_dates=True, index_col='dt')

    test_data = pd.read_csv(input_test_data_path, sep=',', header=0, infer_datetime_format=True,
                            low_memory=False, parse_dates=True, index_col='dt')

    # Standardization
    mean = train_data.mean(axis=0)
    std = train_data.std(axis=0)
    train_data_standardized = (train_data - mean) / std

    # take mean and std of the train data to standardize the test data
    test_data_standardized = (test_data - mean) / std

    print(train_data_standardized.describe())

    train_data_standardized.to_csv(output_train_data_path)
    test_data_standardized.to_csv(output_test_data_path)

In [10]:
standardization_op = comp.func_to_container_op(standardization,
                                               base_image='python:3.7-slim',
                                               packages_to_install=['pandas==1.0.5'])

## Component: Model Training

In [11]:
def training(input_train_data_path: InputPath('CSV'),
             input_test_data_path: InputPath('CSV'),
             batch_size: int,
             window_length: int,
             future_length: int,
             dropout_fc: float,
             hidden_layer_size: int,
             n_output_features: int,
             mlpipeline_metrics_path: OutputPath('Metrics')):

    import pandas as pd
    import tensorflow as tf
    import tensorflow.keras as keras
    import numpy as np
    import json
    import mlflow
    import os

    def set_mlflow_settings():
        os.environ["GIT_PYTHON_REFRESH"] = "quiet"

        registry_uri = 'mysql+pymysql://mlflow:mlflow@mysql.mlflow.svc.cluster.local:3306/mlflow'
        tracking_uri = 'http://mlflow-service.mlflow.svc.cluster.local:5001'

        mlflow.tracking.set_registry_uri(registry_uri)
        mlflow.tracking.set_tracking_uri(tracking_uri)

    set_mlflow_settings()

    train_data = pd.read_csv(input_train_data_path, sep=',', header=0, infer_datetime_format=True,
                             low_memory=False, parse_dates=True, index_col='dt')

    test_data = pd.read_csv(input_test_data_path, sep=',', header=0, infer_datetime_format=True,
                            low_memory=False, parse_dates=True, index_col='dt')

    def build_lstm_2_layer_model(window_length=50, future_length=1, n_input_features=7,
                                 n_output_features=3, units_lstm_layer=30, dropout_rate=0.2):
        """Builds 2 Layer LSTM-based TF Model in functional API.
        Args:
          window_length: Input Data as Numpy Array, Shape (rows, n_features)
          future_length: Number of time steps that will be predicted in the future.
          n_input_features: Number of features that will be used as Input.
          n_output_features: Number of features that will be predicted.
          units_lstm_layer: Number of Neurons for the LSTM Layers.
          dropout_rate: Dropout Rate for the last Fully Connected Dense Layer.
        Returns:
          keras.models.Model
        """
        inputs = keras.layers.Input(
            shape=[window_length, n_input_features], dtype=np.float32)

        # Layer1
        lstm1_output, lstm1_state_h, lstm1_state_c = keras.layers.LSTM(units=units_lstm_layer, return_state=True,
                                                                       return_sequences=True)(inputs)
        lstm1_state = [lstm1_state_h, lstm1_state_c]

        # Layer 2
        lstm2_output, lstm2_state_h, lstm2_state_c = keras.layers.LSTM(units=units_lstm_layer, return_state=True,
                                                                       return_sequences=True)(lstm1_output,
                                                                                              initial_state=lstm1_state)

        reshaped = tf.reshape(lstm2_output,
                              [-1, window_length * units_lstm_layer])
        # Dropout
        dropout = tf.keras.layers.Dropout(dropout_rate)(reshaped)

        fc_layer = keras.layers.Dense(n_output_features * future_length, kernel_initializer='he_normal', dtype=tf.float32)(
            dropout)

        output = tf.reshape(fc_layer,
                            [-1, future_length, n_output_features])

        model = keras.models.Model(inputs=[inputs],
                                   outputs=[output])
        return model

    def apply_sliding_window_tf_data_api(train_data_x, batch_size=64, window_length=50, future_length=1,
                                         n_output_features=1, validate=False, shift=1):
        """Applies sliding window on the fly by using the TF Data API.
        Args:
          train_data_x: Input Data as Numpy Array, Shape (rows, n_features)
          batch_size: Batch Size.
          window_length: Window Length or Window Size.
          future_length: Number of time steps that will be predicted in the future.
          n_output_features: Number of features that will be predicted.
          validate: True if input data is a validation set and does not need to be shuffled
          shift: Shifts the Sliding Window by this Parameter.
        Returns:
          tf.data.Dataset
        """
        def make_window_dataset(ds, window_size=5, shift=1, stride=1):
            windows = ds.window(window_size, shift=shift, stride=stride)

            def sub_to_batch(sub):
                return sub.batch(window_size, drop_remainder=True)

            windows = windows.flat_map(sub_to_batch)
            return windows

        X = tf.data.Dataset.from_tensor_slices(train_data_x.astype(np.float32))
        y = tf.data.Dataset.from_tensor_slices(
            train_data_x[window_length:, :n_output_features].astype(np.float32))
        ds_x = make_window_dataset(
            X, window_size=window_length, shift=shift, stride=1)
        ds_y = make_window_dataset(
            y, window_size=future_length, shift=shift, stride=1)

        if not validate:
            train_tf_data = tf.data.Dataset.zip((ds_x, ds_y)).cache() \
                .shuffle(buffer_size=200000, reshuffle_each_iteration=True).batch(batch_size).prefetch(5)
            return train_tf_data
        else:
            return tf.data.Dataset.zip((ds_x, ds_y)).batch(batch_size).prefetch(1)

    class MlflowLogging(tf.keras.callbacks.Callback):
        def __init__(self, **kwargs):
            super().__init__()  # handles base args (e.g., dtype)

        def on_epoch_end(self, epoch, logs=None):
            keys = list(logs.keys())
            for key in keys:
                mlflow.log_metric(str(key), logs.get(key), step=epoch)

    # enable gpu growth if gpu is available
    gpu_devices = tf.config.experimental.list_physical_devices('GPU')
    for device in gpu_devices:
        tf.config.experimental.set_memory_growth(device, True)

    with mlflow.start_run() as run:
        # log parameter
        mlflow.log_param('batch_size', batch_size)
        mlflow.log_param('window_length', window_length)
        mlflow.log_param('hidden_layer_size', hidden_layer_size)
        mlflow.log_param('dropout_fc_layer', dropout_fc)
        mlflow.log_param('future_length', future_length)
        mlflow.log_param('n_output_features', n_output_features)

        model = build_lstm_2_layer_model(window_length=window_length,
                                         future_length=future_length,
                                         n_output_features=n_output_features,
                                         units_lstm_layer=hidden_layer_size,
                                         dropout_rate=dropout_fc)

        train_data_sliding_window = apply_sliding_window_tf_data_api(train_data.values,
                                                                     batch_size=batch_size,
                                                                     window_length=window_length,
                                                                     future_length=future_length,
                                                                     n_output_features=n_output_features)

        test_data_sliding_window = apply_sliding_window_tf_data_api(test_data.values,
                                                                    batch_size=batch_size,
                                                                    window_length=window_length,
                                                                    future_length=future_length,
                                                                    n_output_features=n_output_features,
                                                                    validate=True)

        model.compile(loss='mse', optimizer=keras.optimizers.Nadam(learning_rate=1e-3),
                      metrics=['mse', 'mae'])

        model.fit(train_data_sliding_window, shuffle=True, initial_epoch=0, epochs=10, validation_data=test_data_sliding_window,
                  callbacks=[MlflowLogging()])

        metrics = model.evaluate(test_data_sliding_window)
        print(metrics)

        # Exports MSE and MAE metrics:
        metrics = {
            'metrics': [{
                'name': 'mse',
                'numberValue':  float(metrics[1]),
            }, {
                'name': 'mae',
                'numberValue':  float(metrics[2]),
            }]}

        with open(mlpipeline_metrics_path, 'w') as f:
            json.dump(metrics, f)

        # Save the model to the artifact store: s3 bucket
        model_path = './model'
        os.makedirs(model_path)
        model.save(model_path)

        mlflow.tensorflow.log_model(tf_saved_model_dir=model_path,
                                    tf_meta_graph_tags='serve',
                                    tf_signature_def_key='serving_default',
                                    artifact_path='saved_model',
                                    registered_model_name='Electric Power Consumption Forecasting')

In [12]:
training_op = comp.func_to_container_op(training,
                                        base_image='tensorflow/tensorflow:2.3.1',
                                        packages_to_install=['pandas>=1.0.5', 'mlflow', 'boto3', 'pymysql'])

## Creating a Pipeline

In [13]:
@dsl.pipeline(
    name='Training pipeline',
    description='Training pipeline for time series forecasting on household power consumption dataset.'

)
def training_pipeline(
    path='./household_power_consumption.txt'
):

    # Returns a dsl.ContainerOp class instance.
    load_raw_data_task = load_raw_data_op(
        path).set_display_name('Load Raw Data')

    key_output = str(list(load_raw_data_task.outputs.keys())[0])

    filling_nans_with_mean_op_task = filling_nans_with_mean_op(load_raw_data_task.outputs[key_output]).after(
        load_raw_data_task).set_display_name('Filling NaNs with Mean')

    key_output = str(list(filling_nans_with_mean_op_task.outputs.keys())[0])

    split_data_task = split_data_op(filling_nans_with_mean_op_task.outputs[key_output]).after(
        filling_nans_with_mean_op_task).set_display_name('Split Data')

    key_output1 = str(list(split_data_task.outputs.keys())[0])
    key_output2 = str(list(split_data_task.outputs.keys())[1])

    standardization_task = standardization_op(split_data_task.outputs[key_output1], split_data_task.outputs[key_output2]).after(
        split_data_task).set_display_name('Standardization')

    key_output1 = str(list(standardization_task.outputs.keys())[0])
    key_output2 = str(list(standardization_task.outputs.keys())[1])

    grid_search_dic = {'hidden_layer_size': [20, 40],
                       'batch_size': [64],
                       'future_length': [5],
                       'window_length': [50],
                       'dropout_fc': [0.0, 0.2],
                       'n_output_features': [1]}

    # Cartesian product
    grid_search_param = [dict(zip(grid_search_dic, v))
                         for v in product(*grid_search_dic.values())]

    for i, params in enumerate(grid_search_param):
        batch_size = params['batch_size']
        window_length = params['window_length']
        future_length = params['future_length']
        dropout_fc = params['dropout_fc']
        hidden_layer_size = params['hidden_layer_size']
        n_output_features = params['n_output_features']

        training_task = training_op(standardization_task.outputs[key_output1],
                                    standardization_task.outputs[key_output2],
                                    batch_size,
                                    window_length,
                                    future_length,
                                    dropout_fc,
                                    hidden_layer_size,
                                    n_output_features).after(standardization_task).set_display_name('Model Training '+str(i+1))\
            .apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

## Creating a Pipeline Run

In [14]:
arguments = {'path': './household_power_consumption.txt'}

# Submit a pipeline run
client.create_run_from_pipeline_func(
    training_pipeline, arguments=arguments, namespace=namespace)

RunPipelineResult(run_id=0613dbf9-d870-4477-b4b3-549edff88e16)

## Uploading the Pipeline to be reuseable by others

In [15]:
kfp.compiler.Compiler().compile(training_pipeline, 'workflow.yaml')
client.upload_pipeline(pipeline_package_path='workflow.yaml',
                             pipeline_name='Training Pipeline .')

{'created_at': datetime.datetime(2020, 12, 16, 8, 3, 40, tzinfo=tzlocal()),
 'default_version': {'code_source_url': None,
                     'created_at': datetime.datetime(2020, 12, 16, 8, 3, 40, tzinfo=tzlocal()),
                     'id': '1d6a0b49-b620-40c8-9c2a-e9b0d9860ac0',
                     'name': 'Training Pipeline .',
                     'package_url': None,
                     'parameters': [{'name': 'path',
                                     'value': './household_power_consumption.txt'}],
                     'resource_references': [{'key': {'id': '1d6a0b49-b620-40c8-9c2a-e9b0d9860ac0',
                                                      'type': 'PIPELINE'},
                                              'name': None,
                                              'relationship': 'OWNER'}]},
 'description': None,
 'error': None,
 'id': '1d6a0b49-b620-40c8-9c2a-e9b0d9860ac0',
 'name': 'Training Pipeline .',
 'parameters': [{'name': 'path', 'value': './household_po