In [None]:
from sklearn import metrics
import joblib

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import warnings
warnings.simplefilter('ignore')
warnings.filterwarnings('ignore')

In [None]:
from numpy.random import seed
seed(42)
import tensorflow as tf
tf.get_logger().setLevel('ERROR')

In [None]:
import pytz
from datetime import timedelta, datetime
local_tz = pytz.timezone('America/Toronto') # Set local timezone for InfluxDB based times calculations
today=datetime.today().strftime('%Y-%m-%d')

#### Note: if you did not create new data for the model training, a sample is provided in this repository. 
#### To use it, ensure that in the next cell the instruction with the file name lt_results_2022-10-01.csv in it is executed and not the one using "today's" date

In [None]:
#data = pd.read_csv('lt_results_'+today+'.csv', index_col='DateTime', parse_dates=True, infer_datetime_format=True)
data = pd.read_csv('lt_results_2022-10-01.csv', index_col='DateTime', parse_dates=True, infer_datetime_format=True)

In [None]:
data.head()

In [None]:
data = data.drop(columns=['req2xx', 'testDurationSeconds'])

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(data, data.mean_tps,
                                                    test_size=0.1,
                                                    random_state=0) # we are setting the seed here
X_train.shape, X_test.shape

In [None]:
target_var = 'mean_tps'
X_train = X_train.drop(target_var, axis=1)
X_test = X_test.drop(target_var, axis=1)

In [None]:
from sklearn.preprocessing import StandardScaler

In [None]:
stdScaler = StandardScaler()
targetStdScaler = StandardScaler()

In [None]:
X_train_scaled = stdScaler.fit_transform(X_train.values)
y_train_scaled = targetStdScaler.fit_transform(y_train.values.reshape(-1,1))

In [None]:
X_test_scaled = stdScaler.transform(X_test.values)
y_test_scaled = targetStdScaler.transform(y_test.values.reshape(-1,1))

In [None]:
# Neural Nets imports
from tensorflow.keras.models import Sequential, load_model, save_model
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization 
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras.regularizers import l1, l2, l1_l2

In [None]:
import os

In [None]:
#Retrieve AWS access from the data connection attached to the workbench
aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
endpoint_url = os.environ["AWS_S3_ENDPOINT"]
bucket_name = os.environ["AWS_S3_BUCKET"]
region_name = os.environ["AWS_DEFAULT_REGION"]

In [None]:
verboseLevel=0
validationSplit=0.2
batchSize=30
epochs=1000

In [None]:
# callback preparation
reduce_lr = ReduceLROnPlateau(monitor='val_loss',
                              factor=0.5,
                              patience=2,
                              verbose=verboseLevel,
                              mode='min',
                              min_lr=0.001)

In [None]:
inputSize = X_train_scaled.shape[1]
colList = ['HiddenLayers', 'R2Score', 'MAE', 'MSE', 'MAPE', 'H5FileName', 'TrainHistory', 'TrainPredictions']

In [None]:
target_loss = 'mae'
#loss = 'mse'
measure_metrics = ['mae', 'mse']
#measure_metrics = ['mae']

In [None]:
# This function is the reference for creating and training the models inside the Jupyter notebook pod. We will evaluate it against the Ray cluster distributed one
def createModel(testResultsFrame, layerSize, loops, target_loss, measure_metrics,
                y_train, X_train, y_test, X_test, targetScaler, labelSet):
    print(f'Creating model using layer size = {layerSize} on set = {labelSet}.\n')
    for i in range(loops):
        print(f'Training on {i} hidden layers\n')
        model = Sequential()
        model.add(Dense(layerSize, kernel_initializer='normal',
                        input_dim=inputSize, activation='relu'))
        for j in range(i):
            model.add(Dense(layerSize, 
                            kernel_initializer='normal', activation='relu'))
        model.add(BatchNormalization())
        model.add(Dense(1, kernel_initializer='normal', 
                        activation='linear'))

        optmzr=Adam(learning_rate=0.001)    
        model.compile(optimizer=optmzr, loss=target_loss, metrics=measure_metrics)

        model_h5_name = 'mlp_' + str(layerSize)+ '_' + str(i) + '_model_std_' + labelSet + '.keras'
        checkpoint_nn_std = ModelCheckpoint(model_h5_name,
                                 monitor='val_loss',
                                 verbose=verboseLevel,
                                 save_best_only=True,
                                 mode='min')
        callbacks_list_nn_std = [checkpoint_nn_std, reduce_lr]

        history_MLP_std = model.fit(X_train, y_train,
                                    batch_size=batchSize, 
                                    validation_split=validationSplit, 
                                    epochs=epochs, verbose=verboseLevel,
                                    callbacks=callbacks_list_nn_std)

        #reload the best model!
        model_new = load_model(model_h5_name)
        #Predict
        y_pred_scaled = model_new.predict(X_test)
        #Evaluate metrics
        y_pred = targetScaler.inverse_transform(y_pred_scaled)
        r2_score = metrics.r2_score(y_test, y_pred)
        mae = metrics.mean_absolute_error(y_test, y_pred)
        mse = metrics.mean_squared_error(y_test, y_pred)
        mape = metrics.mean_absolute_percentage_error(y_test, y_pred)
        #store values
        row = [i, r2_score, mae, mse, mape, model_h5_name, history_MLP_std, y_pred]
        df = pd.DataFrame(np.array(row, dtype=object).reshape(1, len(colList)), columns=colList)
        testResultsFrame = testResultsFrame.append(df, ignore_index=True)

        tf.keras.backend.clear_session()
        del(model)
        del(model_new)
        
    return testResultsFrame


In [None]:
%%time
testResDataFrame = pd.DataFrame(columns=colList)
layerSize = 64
loops = 15
testResDataFrame = createModel(testResDataFrame, layerSize, loops, 
                        target_loss, measure_metrics,
                        y_train_scaled, X_train_scaled,
                        y_test, X_test_scaled, 
                        targetStdScaler, 'all')


In [None]:
#Plot train vs validation
plt.figure(figsize=(20,10))
#plt.plot(testResDataFrame['R2Score'])
plt.plot(testResDataFrame['MAE'])
#plt.plot(testResDataFrame['MSE'])
plt.title('Training Scores MLP')
plt.ylabel('Score')
plt.xlabel('Iteration')
plt.legend(['MAE'], loc='upper right')
plt.show()

In [None]:
# Determine the IDX value where the MAE is smallest
minMaeIDX = testResDataFrame.loc[testResDataFrame['MAE']==testResDataFrame['MAE'].min()].index[0]

In [None]:
testResDataFrame.iloc[minMaeIDX]

In [None]:
y_pred_MLP_std = testResDataFrame['TrainPredictions'][minMaeIDX]
# Plot prediction vs original
plt.figure(figsize=(20,10))
plt.scatter(range(y_test.shape[0]),y_test,label="Original Data", alpha=0.6, c='red')
plt.scatter(range(y_pred_MLP_std.shape[0]),y_pred_MLP_std,label="Predicted Data", 
            alpha=0.6, c='black')
plt.ylabel('Mean TPS')
plt.xlabel('Test Records')
plt.title('MLP Std Model for X_test dataset prediction vs original')
plt.legend()
plt.show()

In [None]:
MLFLOW_URI='http://mlflow-server.mlflow-server.svc.cluster.local:8080/'

In [None]:
os.environ["RAY_IGNORE_UNHANDLED_ERRORS"] = "1"

In [None]:
import logging
import ray
from codeflare_sdk import TokenAuthentication, Cluster, ClusterConfiguration
from codeflare_sdk import generate_cert

In [None]:
auth = TokenAuthentication(
    token = "sha256~zAPMzZL-O4dWaBL8oMKXk2Wq8UtUiSb4JwrWEiFe3Cs", # execute ocp whoami -t on the authenticated cluster to obtain the token
    server = "https://api.cluster-ffqgg.ffqgg.sandbox1386.opentlc.com:6443",
    skip_tls = False
)
auth.login()

In [None]:
# Create required TLS cert and export the environment variables to enable TLS
generate_cert.generate_tls_cert('raycluster-complete', 'raycluster')
generate_cert.export_env('raycluster-complete', 'raycluster')

In [None]:
ray_endpoint = 'ray://raycluster-complete-head-svc.raycluster.svc.cluster.local:10001' # ensure your ray cluster URL is correct
ray.shutdown()
ray.init(address=ray_endpoint, logging_level=logging.ERROR, log_to_driver=False)

In [None]:
import tf2onnx

In [None]:
import mlflow

In [None]:
from ray.air.integrations.mlflow import setup_mlflow

In [None]:
@ray.remote
def createRemoteModel(layerSize, loops, target_loss, measure_metrics,
                y_train, X_train, y_test, X_test, targetScaler, labelSet):
        
    mlflow_exp_name = f'mlasp-1-{labelSet}-{loops}'
    mlflow_ray_config = None
    mlflow_ray = setup_mlflow(config=mlflow_ray_config,
                              tracking_uri=MLFLOW_URI,
                              registry_uri=MLFLOW_URI,
                              create_experiment_if_not_exists=True,
                              rank_zero_only=False,
                             experiment_name=mlflow_exp_name)
    
    print(f'Creating model using {loops} hidden layers of size = {layerSize} on set = {labelSet}.\n')
    model = Sequential()
    model.add(Dense(layerSize, kernel_initializer='normal',
                    input_dim=inputSize, activation='relu'))
    for j in range(loops):
        model.add(Dense(layerSize, 
                        kernel_initializer='normal', activation='relu'))
    model.add(BatchNormalization())
    model.add(Dense(1, kernel_initializer='normal', 
                    activation='linear'))

    optmzr=Adam(learning_rate=0.001)    
    model.compile(optimizer=optmzr, loss=target_loss, metrics=measure_metrics)

    model_h5_name = 'mlp_' + str(layerSize)+ '_' + str(loops) + '_model_std_' + labelSet + '.keras'
    checkpoint_nn_std = ModelCheckpoint(model_h5_name,
                             monitor='val_loss',
                             verbose=verboseLevel,
                             save_best_only=True,
                             mode='min')
    callbacks_list_nn_std = [checkpoint_nn_std, reduce_lr]

    history_MLP_std = model.fit(X_train, y_train,
                                batch_size=batchSize, 
                                validation_split=validationSplit, 
                                epochs=epochs, verbose=verboseLevel,
                                callbacks=callbacks_list_nn_std)

    #reload the best model!
    model_new = load_model(model_h5_name)
    #Predict
    y_pred_scaled = model_new.predict(X_test)
    #Evaluate metrics
    y_pred = targetScaler.inverse_transform(y_pred_scaled)
    r2_score = metrics.r2_score(y_test, y_pred)
    mae = metrics.mean_absolute_error(y_test, y_pred)
    mse = metrics.mean_squared_error(y_test, y_pred)
    mape = metrics.mean_absolute_percentage_error(y_test, y_pred)
    
    fig=plt.figure(figsize=(20,10))
    plt.scatter(range(y_test.shape[0]),y_test,label="Original Data", alpha=0.6, c='red')
    plt.scatter(range(y_pred.shape[0]),y_pred,label="Predicted Data", 
                alpha=0.6, c='black')
    plt.ylabel('Mean TPS')
    plt.xlabel('Test Records')
    plt.title('MLP StdScaler Model for X_test dataset prediction vs original')
    plt.legend()    

    mlflow_ray.log_figure(fig,f"{mlflow_exp_name}.png")
    
    mlflow_ray.log_param("batch_size", batchSize)
    mlflow_ray.log_param("layer_size", layerSize)
    mlflow_ray.log_param("hidden_layers", loops)
    mlflow_ray.log_param("activation_function", "relu")
    mlflow_ray.log_param("dense_kernel_initializer", "normal")
    mlflow_ray.log_param("epochs", epochs)
    mlflow_ray.log_param("learning_rate", 0.001)
    mlflow_ray.log_param("optimizer", "adam")

    mlflow_ray.log_metric("mae", mae)
    mlflow_ray.log_metric("mse", mse)
    mlflow_ray.log_metric("mape", mape)
    mlflow_ray.log_metric("r2_score", r2_score)
    
    model_onnx,_ = tf2onnx.convert.from_keras(model_new)
    mlflow_ray.onnx.log_model(model_onnx, f"model-{mlflow_exp_name}")

    row = [loops, r2_score, mae, mse, mape]

    return row


In [None]:
%%time

testResDataFrame2 = []
layerSize = 64
loops = 15


for i in range(loops):
    rowResult = createRemoteModel.remote(layerSize, i, 
                        target_loss, measure_metrics,
                        y_train_scaled, X_train_scaled,
                        y_test, X_test_scaled, 
                        targetStdScaler, labelSet='all_ray')
    testResDataFrame2.append(rowResult)


In [None]:
testResDataFrame2

In [None]:
%%time

tResDF2 = ray.get(testResDataFrame2)

In [None]:
colList2=colList.copy()

In [None]:
df = pd.DataFrame(tResDF2, columns=colList2[0:5])

In [None]:
df

In [None]:
minMaeIDX_ray = df.loc[df['MAE']==df['MAE'].min()].index[0]

In [None]:
df.iloc[minMaeIDX_ray]

In [None]:
#Plot train vs validation
plt.figure(figsize=(20,10))
#plt.plot(df['R2Score'])
plt.plot(df['MAE'])
#plt.plot(df['MSE'])
plt.title('Training Scores MLP')
plt.ylabel('Score')
plt.xlabel('Iteration')
plt.legend(['MAE'], loc='upper right')
plt.show()

In [None]:
X_test.columns

In [None]:
data.head(1)

In [None]:
record = [[True, 21, 277, 1712, 262, 7, 31, 5]]

In [None]:
test_rec = stdScaler.transform(record)

In [None]:
test_rec

In [None]:
test_model = load_model(testResDataFrame['H5FileName'][minMaeIDX])

In [None]:
test_pred = test_model.predict(test_rec)

In [None]:
test_pred

In [None]:
targetStdScaler.inverse_transform(test_pred)

### Save the scalers for the inference calls

In [None]:
joblib.dump(targetStdScaler,'target_scaler.pkl')

In [None]:
joblib.dump(stdScaler,'standard_scaler.pkl')

### Export to ONNX to run on the RHODS model server

In [None]:
model_onnx, _ = tf2onnx.convert.from_keras(test_model, output_path='tf_mlasp.onnx')

### Upload model to S3 bucket

In [None]:
import boto3
import botocore

In [None]:
#S3 ODF GW client
s3_odf = boto3.client(service_name = 's3',
                      aws_access_key_id = aws_access_key_id,
                      aws_secret_access_key = aws_secret_access_key,
                      region_name = 'default',
                      endpoint_url = endpoint_url,
                      config = botocore.client.Config(signature_version = 's3'))

In [None]:
s3_odf.upload_file('tf_mlasp.onnx', bucket_name, 'models/tf_mlasp.onnx')

In [None]:
ray.shutdown()

In [None]:
s3_odf.upload_file('lt_results_2022-10-01.csv', bucket_name, 'data/lt_results_2022-10-01.csv')