In [None]:
# <update with the URL of your MLFLOW instance>
MLFLOW_URI='http://localhost:8080/'

In [None]:
param_space={
    "n_epochs": tune.choice([200]),
    "n_layers": tune.choice([1, 3, 5, 10]),
    "n_dense_layers": tune.choice([1, 3, 5]), 
    "learning_rate": tune.choice([0.003, 0.001]),
    "activation": tune.choice(['tanh', 'relu']), 
    "conv_kernel": tune.choice([2, 3]),
    "n_neurons": tune.choice([32, 64, 96, 128, 256]),
    "T": tune.choice([5, 10, 15, 20]),
    "dense_dp": tune.choice([0.2, 0.3, 0.4]),
    "DP": tune.choice([0, 0.2, 0.4]),
}

In [None]:
"""
 This function uses one Tensor formatted input from the X_test dataset in order to predict ahead by a number
 of given steps. Then it readjusts by using another true value from X_test (as the nex index) before starting
 a new prediction cycle
"""
def generate_nsteps_forecast(x_test, y_test, nn_model, pred_ahead):
    max_len = x_test.shape[0]
    y_pred = []
    last_x = x_test[0]
    index = 0
    ae_errors_at_p = []
    se_errors_at_p = []
    while len(y_pred) < max_len:
        sequence = 0
        while sequence < pred_ahead:
            try:
                x_crt_input = last_x.reshape(1, -1, 1)
                p_vector = nn_model.predict(x_crt_input, verbose=0)
                p = p_vector[0,0] # 1x1 array -> scalar
            except:
                print(f'Prediction error for x={x_crt_input} at sequence={sequence} for start index={index} when pred_ahead={pred_ahead}')
                print(f'Model config was:{nn_model.get_config()}')
                p = 0
                
            # update the predictions list
            y_pred.append(p)

            # make the new input
            last_x = np.roll(last_x, -1)
            last_x[-1] = p
            
            # increase index for the next run
            sequence += 1

        index += sequence
        
        if index < max_len:
            last_x = x_test[index]
            ae_errors_at_p.append(np.absolute(p - y_test[index-1]))
            se_errors_at_p.append(np.square(p - y_test[index-1]))
            #print(f"Arrived at index = {index} of {max_len} with value X={last_x}")
    
    if len(y_pred) > max_len:
        # predicted too much, cutoff the tail
        y_pred = y_pred[0:max_len]

    y_pred_array = np.array(y_pred)
    avg_ae = np.mean(ae_errors_at_p)
    avg_se = np.mean(se_errors_at_p)
    return (y_pred_array, avg_ae, avg_se)

In [None]:
def train_model(config):

    n_neurons = config['n_neurons']
    n_epochs = config['n_epochs']
    learning_rate = config['learning_rate']
    T = config['T']

    model_exp =  f'{modelTypeName}'
    UUID = uuid.uuid4().hex

    mlflow_exp_name = f'{modelTypeName}-{UUID}-T_{T}-LY_{config["n_layers"]}-DLY_{config["n_dense_layers"]}-NN_{n_neurons}'

    import mlflow

    mlflow.set_tracking_uri(MLFLOW_URI)
    mlflow.set_registry_uri(MLFLOW_URI)
        
    nn_model = create_model(config)
    nn_model.compile(loss='mse', 
                     metrics=["mse"], 
                     #metrics="mse", #tf 2.11-2.13
                     optimizer=Adam(learning_rate=config["learning_rate"]))
    
    X_train, Y_train, X_test, Y_test = prepare_dataset(dataFrame[dataColumnName], T)
    
    model_name = modelTypeName + "-T_" + str(T) + "-LY_" + str(config['n_layers']) + "-DLY_" + str(config['n_dense_layers']) +\
                 "-NN_" + str(n_neurons) + "-LR_" + str(learning_rate) + "-epochs_" + str(n_epochs) +"-" + UUID + ".keras"
    es = EarlyStopping(monitor='val_loss', mode='min', verbose=0, patience=5)
    mc = ModelCheckpoint(model_name, monitor='val_loss', mode='min', save_best_only=True)    
    
    train_results = nn_model.fit(X_train, Y_train, 
                                 epochs=config['n_epochs'], 
                                 validation_data=(X_test, Y_test), 
                                 #callbacks=[ReportCheckpointCallback(metrics={"mse": "mse"}, checkpoint_on="train_end")],
                                 callbacks=[es, mc],
                                 verbose = 0
                                )
    
    #evaluate and print results
    try:
        saved_model = load_model(model_name)
    except:
        saved_model = nn_model
        save_model(nn_model, model_name)

    # Reconstruct signal using the train data and get the AE and SE errors to obtain the 3-sigma thresholds
    y_pred_train = saved_model.predict(X_train, verbose=0)
    
    errors_ae_train = calculate_absolute_prediction_errors(Y_train, y_pred_train)
    lo_3sigma_ae, up_3sigma_ae = calculate_3sigma_threshold(errors_ae_train)
    anomalies_ae_train = calculate_3sigma_anomalies(errors_ae_train, lo_3sigma_ae, up_3sigma_ae)
    
    errors_se_train = calculate_squared_prediction_errors(Y_train, y_pred_train)
    lo_3sigma_se, up_3sigma_se = calculate_3sigma_threshold(errors_se_train)
    anomalies_se_train = calculate_3sigma_anomalies(errors_se_train, lo_3sigma_se, up_3sigma_se)

    three_sigma_thresholds = {}
    three_sigma_thresholds['lo_3sigma_ae'] = lo_3sigma_ae
    three_sigma_thresholds['up_3sigma_ae'] = up_3sigma_ae
    three_sigma_thresholds['lo_3sigma_se'] = lo_3sigma_se
    three_sigma_thresholds['up_3sigma_se'] = up_3sigma_se
    
    # Test spike detection using 3-sigma rule on the test data
    y_predict = saved_model.predict(X_test, verbose=0)
    errors_ae = calculate_absolute_prediction_errors(Y_test, y_predict)
    anomalies_ae = calculate_3sigma_anomalies(errors_ae, lo_3sigma_ae, up_3sigma_ae)
    errors_se = calculate_squared_prediction_errors(Y_test, y_predict)
    anomalies_se = calculate_3sigma_anomalies(errors_se, lo_3sigma_se, up_3sigma_se)
    
    try:
        r2 = r2_score(Y_test, y_predict)
    except:
        r2 = 110
    if np.isnan(r2):
        r2 = 110

    try:
        mae = mean_absolute_error(Y_test, y_predict)
    except:
        mae = 100
    if np.isnan(mae):
        mae = 100

    try:
        mape = mean_absolute_percentage_error(Y_test, y_predict)
    except:
        mape = 100
    if np.isnan(mape):
        mape = 100

    try:
        mse = mean_squared_error(Y_test, y_predict)
    except:
        mse = 100
    if np.isnan(mse):
        mse = 100
    
    try:
        pcc = np.corrcoef(Y_test, y_predict.flatten())[0,1]
    except:
        pcc = 100
    if np.isnan(pcc):
        pcc = 100

    experiment_id = mlflow.create_experiment(mlflow_exp_name)
    with mlflow.start_run(run_name=mlflow_exp_name, experiment_id=experiment_id) as mlflowrun:
        run_id = mlflowrun.info.run_id

        fig = plt.figure(figsize=(20,15))
        plt.title("Spikes Train Data")
        plt.plot(Y_train,label="Original Data", alpha=0.6, c='gray')
        plt.plot(y_pred_train,label="Predict 1-step Forecast", alpha=0.6, c='red', linewidth=3)
        plt.scatter(np.where(anomalies_ae_train==True), y_pred_train[np.where(anomalies_ae_train==True)], 
                    alpha=0.8, color='green', s=350, label="3-Sigma Anomalies AE")
        plt.scatter(np.where(anomalies_se_train==True), y_pred_train[np.where(anomalies_se_train==True)], 
                    alpha=0.8, color='magenta', s=150, label="3-Sigma Anomalies SE")
        plt.legend()
        figName = f"Y_train_spikes-T_{T}.png"
        mlflow.log_figure(fig, figName)
        #plt.savefig(figName, transparent=False)
        fig.clf()
        plt.close()
    
        fig = plt.figure(figsize=(20,15))
        plt.title("Spikes Test Data T=" + str(T) + " with predict 1 on "+ str(model_exp) +": NN=" + str(n_neurons) + " epochs=" + str(n_epochs) +
                  " lr=" + str(learning_rate))
        plt.plot(y_predict,label="Predict 1-step Forecast", alpha=0.6, c='red', linewidth=3)
        plt.plot(Y_test,label="Original Data", alpha=0.6, c='black')
        plt.scatter(np.where(anomalies_ae==True), y_predict[np.where(anomalies_ae==True)], 
                    alpha=0.8, color='green', s=350, label="3-Sigma Anomalies AE")
        plt.scatter(np.where(anomalies_se==True), y_predict[np.where(anomalies_se==True)], 
                    alpha=0.8, color='magenta', s=150, label = "3-Sigma Anomalies SE")
        plt.legend()    
        figName = f"Y_predict-1-step-spikes-T_{T}.png"
        mlflow.log_figure(fig, figName)
        #plt.savefig(figName, transparent=False)
        fig.clf()
        plt.close()
        
        try:
            signature = infer_signature(X_test, y_predict)
    
            mlflow.tensorflow.log_model(nn_model, model_name, 
                                            signature = signature,
                                            #input_example=X_train[0].reshape(1, -1, 1), 
                                            registered_model_name = model_name)
        except:
            print(f'Ray-MLFlow: Could not save model {model_name}')
            
        try:
            mlflow.log_dict(three_sigma_thresholds, "three_sigma_thresholds.json")
            mlflow.log_param("n_layer_size", n_neurons)
            mlflow.log_param("n_layers", config['n_layers'])
            mlflow.log_param("n_dense_layers", config['n_dense_layers'])
            mlflow.log_param("activation_fn", config['activation'])
            mlflow.log_param("epochs", n_epochs)
            mlflow.log_param("learning_rate", learning_rate)
            mlflow.log_param("optimizer", "adam")
            mlflow.log_param("time_window", config['T'])
            mlflow.log_param("dense_dp", config['dense_dp'])
            mlflow.log_param("model_exp", model_exp)
            
            mlflow.log_metric("mae", mae)
            mlflow.log_metric("mse", mse)
            mlflow.log_metric("mape", mape)
            mlflow.log_metric("r2_score", r2)
            mlflow.log_metric("pearson_corr_coef", pcc)
            
        except:
            exception_param_metric_dict = {}
            log_metric_dict = {
                'r2_score': r2,
                'mae': mae,
                'mape': mape,
                'mse': mse,
                'pcc': pcc
            }        
            log_param_dict = {
                "n_layer_size": n_neurons,
                "n_layers": config['n_layers'],
                "n_dense_layers": config['n_dense_layers'],
                "activation_fn": config['activation'],
                "epochs": n_epochs,
                "learning_rate": learning_rate,
                "optimizer": "adam",
                "time_window": config['T'],
                "dense_dp": config['dense_dp'],
                "model_exp": model_exp            
            }
            exception_param_metric_dict['log_param_dict'] = log_param_dict
            exception_param_metric_dict['log_metric_dict'] = log_metric_dict
            mlflow.log_dict(exception_param_metric_dict, "exception_param_metric_dict.json")

    train.report({"mse":mse, "mae":mae, "mape":mape, "r2":r2, "mlflow_exp":mlflow_exp_name, "model_name":model_name, "T":T, "run_id":run_id}) # for Ray>=2.7
    
    # air.session.report({"mse":mse, "mae":mae, "mape":mape, "r2":r2, "mlflow_exp":mlflow_exp_name, "model_name":model_name, "T":T, "run_id":run_id})

In [None]:
def tune_model(num_training_iterations, num_samples):
    sched = AsyncHyperBandScheduler(
        time_attr="training_iteration", max_t=10, grace_period=5
    )
    
    #we have a cluster of 10 worker nodes with requests 1 CPU and limits 2 CPU settings for the pod
    resource_group = tune.PlacementGroupFactory([{'CPU': 1.0}] * 2) 
    tuner = tune.Tuner(
        tune.with_resources(train_model, resources=resource_group),
        tune_config=tune.TuneConfig(
            metric="mse",
            mode="min",
            scheduler=sched,
            num_samples=num_samples,
            max_concurrent_trials=10,
        ),
        run_config=air.RunConfig(
            name=modelTypeName,
            verbose = 1,
            stop={"training_iteration": num_training_iterations},
        ),
        param_space=param_space
    )
    
    results = tuner.fit()
    return results    

In [None]:
@ray.remote
def run_n_step_evaluation(model_name, run_id, T, predict_ahead):
    import mlflow

    mlflow.set_tracking_uri(MLFLOW_URI)
    mlflow.set_registry_uri(MLFLOW_URI)

    model = mlflow.tensorflow.load_model(f'models:/{model_name}/1')

    params = mlflow.get_run(run_id).to_dictionary()['data']['params']
    n_neurons = params['n_layer_size']
    learning_rate = params['learning_rate']
    n_epochs = params['epochs']
    model_exp = params['model_exp']
    
    n_step_metrics = {}
    
    X_train, Y_train, X_test, Y_test = prepare_dataset(dataFrame[dataColumnName], T)
    
    y_predict = model.predict(X_test, verbose=0)
    y_pred_nsteps, avg_ae, avg_se = generate_nsteps_forecast(X_test, Y_test, model, predict_ahead)
    
    errors_ae2 = calculate_absolute_prediction_errors(Y_test, y_pred_nsteps)
    errors_se2 = calculate_squared_prediction_errors(Y_test, y_pred_nsteps)

    try:
        r2_nStep = r2_score(Y_test, y_pred_nsteps)
    except:
        r2_nStep = 100

    try:
        mae_nStep = mean_absolute_error(Y_test, y_pred_nsteps)
    except:
        mae_nStep = 100

    try:
        mape_nStep = mean_absolute_percentage_error(Y_test, y_pred_nsteps)
    except:
        mape_nStep = 100

    try:
        mse_nStep = mean_squared_error(Y_test, y_pred_nsteps)
    except:
        mse_nStep = 100

    try:
        pcc_nStep = np.corrcoef(Y_test, y_pred_nsteps.flatten())[0,1]
    except:
        pcc_nStep = 100

    crt_step = f'predict_ahead_{predict_ahead}'
    n_step_metrics[crt_step] = {
                                    'r2_nStep': r2_nStep,
                                    'mae_nStep': mae_nStep,
                                    'mape_nStep': mape_nStep,
                                    'mse_nStep': mse_nStep,
                                    'pcc_nStep': pcc_nStep,
                                    'agv_ae_at_nStep': avg_ae,
                                    'agv_se_at_nStep': avg_se
                                   }

    with mlflow.start_run(run_id=run_id, nested=True) as run:
        artifact_uri = run.info.artifact_uri
        three_sigma_thresholds = mlflow.artifacts.load_dict(artifact_uri + "/three_sigma_thresholds.json")        

        anomalies_ae2 = calculate_3sigma_anomalies(errors_ae2, 
                                                   three_sigma_thresholds['lo_3sigma_ae'], 
                                                   three_sigma_thresholds['up_3sigma_ae'])
        anomalies_se2 = calculate_3sigma_anomalies(errors_se2, 
                                                   three_sigma_thresholds['lo_3sigma_se'], 
                                                   three_sigma_thresholds['up_3sigma_se'])
        
        fig = plt.figure(figsize=(20,15))
        plt.title("Compare forecasts T=" + str(T) + " predict_ahead=" + str(predict_ahead) + " with predict 1" + 
                 " for DNN "+ modelTypeName +": NN=" + str(n_neurons) + " LR= " + str(learning_rate) + " epochs=" + str(n_epochs))
        plt.plot(Y_test,label="Original Data", alpha=0.6, c='red',linewidth=2)
        plt.plot(y_predict,label="Predicted Data 1-step", alpha=0.6, c='black', linewidth=2)
        plt.plot(y_pred_nsteps,label="Predicted Data " + str(predict_ahead) + "-steps", alpha=0.6, c='blue', linewidth=2)
        plt.legend()
        figName = f"compare-forecasts-1_{predict_ahead}.png"
        mlflow.log_figure(fig, figName)
        #plt.savefig(figName, transparent=False)
        fig.clf()
        plt.close()
        
        fig = plt.figure(figsize=(20,15))        
        plt.title("Predict Spikes T=" + str(T) + " with predict " + str(predict_ahead) + " on " + str(model_exp) + ": NN=" 
                  + str(n_neurons) + " epochs=" + str(n_epochs) + " lr=" + str(learning_rate))
        plt.plot(y_pred_nsteps,label="Predict " + str(predict_ahead) + "-step Forecast", alpha=0.6, c='red', linewidth=3)
        plt.plot(Y_test,label="Original Data", alpha=0.6, c='black')
        plt.scatter(np.where(anomalies_ae2==True), y_pred_nsteps[np.where(anomalies_ae2==True)], 
                    alpha=0.8, color='green', s=350, label="Anomalies AE")
        plt.scatter(np.where(anomalies_se2==True), y_pred_nsteps[np.where(anomalies_se2==True)], 
                    alpha=0.8, color='magenta', s=150, label = "Anomalies SE")
        plt.legend();    
        figName = f"Y-predict-spikes-step-{predict_ahead}-with-T_{T}.png"
        mlflow.log_figure(fig, figName)
        #plt.savefig(figName, transparent=False)
        fig.clf()
        plt.close()

        fname = f'{predict_ahead}-step-metric.json'
        mlflow.log_dict(n_step_metrics, fname)
        
    return n_step_metrics


In [None]:
@ray.remote
def run_new_data_evaluation(model_name, run_id, T, predict_ahead, trial_fname):
    import mlflow

    mlflow.set_tracking_uri(MLFLOW_URI)
    mlflow.set_registry_uri(MLFLOW_URI)

    model = mlflow.tensorflow.load_model(f'models:/{model_name}/1')

    params = mlflow.get_run(run_id).to_dictionary()['data']['params']
    n_neurons = params['n_layer_size']
    learning_rate = params['learning_rate']
    n_epochs = params['epochs']
    model_exp = params['model_exp']
    
    n_step_metrics = {}
    one_step_metrics = {}
    
    X_train, Y_train, X_test, Y_test = prepare_dataset(dataFrame[dataColumnName], T)
    
    y_predict = model.predict(X_test, verbose=0)
    errors_ae = calculate_absolute_prediction_errors(Y_test, y_predict)    
    errors_se = calculate_squared_prediction_errors(Y_test, y_predict)
    
    y_pred_nsteps, avg_ae, avg_se = generate_nsteps_forecast(X_test, Y_test, model, predict_ahead)
    errors_ae2 = calculate_absolute_prediction_errors(Y_test, y_pred_nsteps)
    errors_se2 = calculate_squared_prediction_errors(Y_test, y_pred_nsteps)

    try:
        r2 = r2_score(Y_test, y_predict)
    except:
        r2 = 110
    if np.isnan(r2):
        r2 = 110

    try:
        mae = mean_absolute_error(Y_test, y_predict)
    except:
        mae = 100
    if np.isnan(mae):
        mae = 100

    try:
        mape = mean_absolute_percentage_error(Y_test, y_predict)
    except:
        mape = 100
    if np.isnan(mape):
        mape = 100

    try:
        mse = mean_squared_error(Y_test, y_predict)
    except:
        mse = 100
    if np.isnan(mse):
        mse = 100
    
    try:
        pcc = np.corrcoef(Y_test, y_predict.flatten())[0,1]
    except:
        pcc = 100
    if np.isnan(pcc):
        pcc = 100

    one_step_metrics = {
                        'r2_1Step': r2,
                        'mae_1Step': mae,
                        'mape_1Step': mape,
                        'mse_1Step': mse,
                        'pcc_1Step': pcc
    }
    
    try:
        r2_nStep = r2_score(Y_test, y_pred_nsteps)
    except:
        r2_nStep = 100

    try:
        mae_nStep = mean_absolute_error(Y_test, y_pred_nsteps)
    except:
        mae_nStep = 100

    try:
        mape_nStep = mean_absolute_percentage_error(Y_test, y_pred_nsteps)
    except:
        mape_nStep = 100

    try:
        mse_nStep = mean_squared_error(Y_test, y_pred_nsteps)
    except:
        mse_nStep = 100

    try:
        pcc_nStep = np.corrcoef(Y_test, y_pred_nsteps.flatten())[0,1]
    except:
        pcc_nStep = 100

    crt_step = f'predict_ahead_{predict_ahead}'
    n_step_metrics[crt_step] = {
                                    'r2_nStep': r2_nStep,
                                    'mae_nStep': mae_nStep,
                                    'mape_nStep': mape_nStep,
                                    'mse_nStep': mse_nStep,
                                    'pcc_nStep': pcc_nStep,
                                    'agv_ae_at_nStep': avg_ae,
                                    'agv_se_at_nStep': avg_se
                                   }

    with mlflow.start_run(run_id=run_id, nested=True) as run:
        artifact_uri = run.info.artifact_uri
        three_sigma_thresholds = mlflow.artifacts.load_dict(artifact_uri + "/three_sigma_thresholds.json")        

        anomalies_ae = calculate_3sigma_anomalies(errors_ae, 
                                                   three_sigma_thresholds['lo_3sigma_ae'], 
                                                   three_sigma_thresholds['up_3sigma_ae'])
        anomalies_se = calculate_3sigma_anomalies(errors_se, 
                                                   three_sigma_thresholds['lo_3sigma_se'], 
                                                   three_sigma_thresholds['up_3sigma_se'])

        anomalies_ae2 = calculate_3sigma_anomalies(errors_ae2, 
                                                   three_sigma_thresholds['lo_3sigma_ae'], 
                                                   three_sigma_thresholds['up_3sigma_ae'])
        anomalies_se2 = calculate_3sigma_anomalies(errors_se2, 
                                                   three_sigma_thresholds['lo_3sigma_se'], 
                                                   three_sigma_thresholds['up_3sigma_se'])
        
        fig = plt.figure(figsize=(20,15))
        title = "Compare Forecasts for T=" + str(T) + " with predict 1 on "+ str(model_exp) + " for trial " + str(trial_fname)
        plt.title(title)
        plt.plot(y_predict,label="Predict 1-step Forecast", alpha=0.6, c='red', linewidth=3)
        plt.plot(Y_test,label="Original Data", alpha=0.6, c='black')
        plt.scatter(np.where(anomalies_ae==True), y_predict[np.where(anomalies_ae==True)], 
                    alpha=0.8, color='green', s=350, label="Anomalies AE")
        plt.scatter(np.where(anomalies_se==True), y_predict[np.where(anomalies_se==True)], 
                    alpha=0.8, color='magenta', s=150, label = "Anomalies SE")
        plt.legend();    
        figName = f"Y_predict_ahead-1-step-T_{T}-fname-{trial_fname}.png"
        mlflow.log_figure(fig, figName)
        fig.clf()
        plt.close()
        
        fig = plt.figure(figsize=(20,15))
        title = "Compare Forecasts for T=" + str(T) + " with predict_ahead= "+ str(predict_ahead) + " on model " + str(model_exp) + " for trial " + str(trial_fname)
        plt.title(title)
        plt.plot(y_pred_nsteps,label="Predict n-step Forecast", alpha=0.6, c='red', linewidth=3)
        plt.plot(Y_test,label="Original Data", alpha=0.6, c='black')
        plt.scatter(np.where(anomalies_ae2==True), y_pred_nsteps[np.where(anomalies_ae2==True)], 
                    alpha=0.8, color='green', s=350, label="Anomalies AE")
        plt.scatter(np.where(anomalies_se2==True), y_pred_nsteps[np.where(anomalies_se2==True)], 
                    alpha=0.8, color='magenta', s=150, label = "Anomalies SE")
        plt.legend();    
        figName = f"Y_predict_ahead-{predict_ahead}-step-T_{T}-fname-{trial_fname}.png"
        mlflow.log_figure(fig, figName)
        fig.clf()
        plt.close()        

        fname_nstep = f'{predict_ahead}-{trial_fname}-nstep-metric.json'
        fname_1step = f'{predict_ahead}-{trial_fname}-1step-metric.json'
        mlflow.log_dict(n_step_metrics, fname_nstep)
        mlflow.log_dict(one_step_metrics, fname_1step)
        
        result = {
            'one_step_metrics' : one_step_metrics,
            'n_step_metrics' : n_step_metrics
        }
        
    return result
