# Testing

In [None]:
# Prediction margin: the only parameter to set. Recommended: margin in {5, 10} (aka 0.5, 1 second)
margin = 10

## Import libraries and define utility functions

In [None]:
import pandas as pd
import numpy as np
import sys
import random
import pickle
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from math import floor
from tensorflow import keras
from tensorflow.keras import layers, callbacks

In [None]:
mean = lambda l: sum(l) / len(l)

#### Set up global variables

In [None]:
pd.set_option('display.float_format', lambda x: '%.4f' % x)

In [None]:
models_path = "models_" + str(margin) + "/"
results_path = "results_" + str(margin) + "/"
threshold_path = "threshold_" + str(margin) + "/"

In [None]:
w5_features_no_diff = [
 'Gz_mean_w5',
 'Ax_mean_w5',
 'Ay_mean_w5',
 'Gz_std_w5',
 'Ax_std_w5',
 'Ay_std_w5',
 'Gz_min_w5',
 'Ax_min_w5',
 'Ay_min_w5',
 'Gz_max_w5',
 'Ax_max_w5',
 'Ay_max_w5'
]

w5_features_diff = [
 'differencing_Gz_mean_w5',
 'differencing_Ax_mean_w5',
 'differencing_Ay_mean_w5',
 'differencing_Gz_std_w5',
 'differencing_Ax_std_w5',
 'differencing_Ay_std_w5',
 'differencing_Gz_min_w5',
 'differencing_Ax_min_w5',
 'differencing_Ay_min_w5',
 'differencing_Gz_max_w5',
 'differencing_Ax_max_w5',
 'differencing_Ay_max_w5',
]

w10_features_no_diff = [
 'Gz_mean_w10',
 'Ax_mean_w10',
 'Ay_mean_w10',
 'Gz_std_w10',
 'Ax_std_w10',
 'Ay_std_w10',
 'Gz_min_w10',
 'Ax_min_w10',
 'Ay_min_w10',
 'Gz_max_w10',
 'Ax_max_w10',
 'Ay_max_w10'
]

w10_features_diff = [
 'differencing_Gz_mean_w10',
 'differencing_Ax_mean_w10',
 'differencing_Ay_mean_w10',
 'differencing_Gz_std_w10',
 'differencing_Ax_std_w10',
 'differencing_Ay_std_w10',
 'differencing_Gz_min_w10',
 'differencing_Ax_min_w10',
 'differencing_Ay_min_w10',
 'differencing_Gz_max_w10',
 'differencing_Ax_max_w10',
 'differencing_Ay_max_w10'
]

w15_features_no_diff = [
 'Gz_mean_w15',
 'Ax_mean_w15',
 'Ay_mean_w15',
 'Gz_std_w15',
 'Ax_std_w15',
 'Ay_std_w15',
 'Gz_min_w15',
 'Ax_min_w15',
 'Ay_min_w15',
 'Gz_max_w15',
 'Ax_max_w15',
 'Ay_max_w15'
]

w15_features_diff = [
 'differencing_Gz_mean_w15',
 'differencing_Ax_mean_w15',
 'differencing_Ay_mean_w15',
 'differencing_Gz_std_w15',
 'differencing_Ax_std_w15',
 'differencing_Ay_std_w15',
 'differencing_Gz_min_w15',
 'differencing_Ax_min_w15',
 'differencing_Ay_min_w15',
 'differencing_Gz_max_w15',
 'differencing_Ax_max_w15',
 'differencing_Ay_max_w15'
]

w20_features_no_diff = [
 'Gz_mean_w20',
 'Ax_mean_w20',
 'Ay_mean_w20',
 'Gz_std_w20',
 'Ax_std_w20',
 'Ay_std_w20',
 'Gz_min_w20',
 'Ax_min_w20',
 'Ay_min_w20',
 'Gz_max_w20',
 'Ax_max_w20',
 'Ay_max_w20'
]

w20_features_diff = [
 'differencing_Gz_mean_w20',
 'differencing_Ax_mean_w20',
 'differencing_Ay_mean_w20',
 'differencing_Gz_std_w20',
 'differencing_Ax_std_w20',
 'differencing_Ay_std_w20',
 'differencing_Gz_min_w20',
 'differencing_Ax_min_w20',
 'differencing_Ay_min_w20',
 'differencing_Gz_max_w20',
 'differencing_Ax_max_w20',
 'differencing_Ay_max_w20',
]

features = {
    "all_features": w5_features_no_diff + w10_features_no_diff + w15_features_no_diff + w20_features_no_diff + w5_features_diff + w10_features_diff + w15_features_diff + w20_features_diff + ['label'],
    "w5_features": w5_features_no_diff + w5_features_diff + ['label'],
    "w10_features": w10_features_no_diff + w10_features_diff + ['label'],
    "w15_features": w15_features_no_diff + w15_features_diff + ['label'],
    "w20_features": w20_features_no_diff + w20_features_diff + ['label'],
    "no_diff_features": w5_features_no_diff + w10_features_no_diff + w15_features_no_diff + w20_features_no_diff + ['label'],
    "diff_features": w5_features_diff + w10_features_diff + w15_features_diff + w20_features_diff + ['label']
}

#### Data import

In [None]:
df = pd.read_csv("5G_IIoT_RUL_dataset/test/test_" + str(margin) + ".csv", index_col=[0])

In [None]:
df.describe()

## Remaining useful life (RUL)

The following plot shows the Remaining Useful Life (RUL), namely the number of time steps before that a failure occurs.

In [None]:
plt.plot(df["label"])
plt.show()

# Machine learning: testing phase

## Dataset preprocessing for machine learning models

In this section, RUL labels are converted to binary labels (`0/1`, namely `not_fault/fault`) in order to perform classification instead of regression.

For the `AutoEncoder` model, the dataset is partitioned such that the training set does not contain faults or samples which anticipate a fault. In other words, each sample must be compliant with the `good_samples_thr` threshold.

We basically need an entire section of dataset where faults are not present.

In [None]:
def build_dataset_for_ml_model(df, training_columns, split_size=0.75, as_list=False, ae=False):
    dfs = []
    df_main = df[training_columns]
    fault_indexes = df_main.index[df_main["label"] == 0].tolist() # list of indexes representing faults
    good_samples_thr = margin * 2

    previous = 0
    for fi in fault_indexes:
        dfs.append(df_main.iloc[previous:fi+1, :])
        previous = fi + 1

    rnd_list = list(range(len(dfs)))

    # If split_size is 1, there will be no val/test set
    train_size = floor(len(dfs) * split_size)
    train_index = rnd_list[:train_size]
    test_index = rnd_list[train_size:]
    train_rul = []
    test_rul = []

    if not as_list:
        first = True
        for ti in train_index:
            if not ae:
                to_concat = dfs[ti].copy()
            else:
                to_concat = dfs[ti][dfs[ti]["label"] >= good_samples_thr].copy()
            if first:
                training_set = to_concat
                first = False
            else:
                training_set = pd.concat([training_set, to_concat])

        first = True
        for ti in test_index:
            to_concat = dfs[ti].copy()
            if first:
                test_set = to_concat
                first = False
            else:
                test_set = pd.concat([test_set, to_concat])

        train_rul = training_set['label'].tolist()
        if split_size < 1:
            test_rul = test_set['label'].tolist()

        training_set['label'] = (training_set['label'] >= margin).map({True: 1, False: 0})
        if split_size < 1:
            test_set['label'] = (test_set['label'] >= margin).map({True: 1, False: 0})

        training_set = training_set.to_numpy()
        if split_size < 1:
            test_set = test_set.to_numpy()

    else:
        first = True
        for ti in train_index:
            if not ae:
                to_concat = dfs[ti].copy()
            else:
                to_concat = dfs[ti][dfs[ti]["label"] >= good_samples_thr].copy()
            if first:
                training_set = [to_concat]
                first = False
            else:
                training_set.append(to_concat)

        first = True
        for ti in test_index:
            to_concat = dfs[ti].copy()
            if first:
                test_set = [to_concat]
                first = False
            else:
                test_set.append(to_concat)

        for t in training_set:
            train_rul = train_rul + t['label'].tolist()
            t['label'] = (t['label'] >= margin).map({True: 1, False: 0})
        if split_size < 1:
            for t in test_set:
                test_rul = test_rul + t['label'].tolist()
                t['label'] = (t['label'] >= margin).map({True: 1, False: 0})
    if split_size < 1:
        return training_set, test_set
    return training_set

## Cost model for threshold optimization and performance evaluation

In [None]:
all_perf = []

In [None]:
BASE_FP = 0.2
BASE_FN = 1

def false_positive_cost(i, is_fault, fault_found):
    return BASE_FP

def false_negative_cost(i, is_fault, fault_found):
    if not fault_found:
        for j in range(1, margin + 1):
            if i + j < is_fault.shape[0] and not is_fault[i + j] or i + j >= is_fault.shape[0]:
                return (margin + 1 - j) * BASE_FN
    else:
        return 0

In [None]:
def threshold_optimization(signal, rul, start, end, n_steps):
    best_cost = sys.maxsize
    best_thr = -1
    all_cost = []
    all_thr = []
    is_fault = (rul == 0)

    for thr in np.linspace(start, end, n_steps):
        tmp_cost = 0
        fault_found = False
        for i in range(signal.shape[0]):
            if is_fault[i] and signal[i] >= thr:
                fault_found = True
            if not is_fault[i]:
                fault_found = False
            if not is_fault[i] and signal[i] >= thr:
                tmp_cost += false_positive_cost(i, is_fault, fault_found)
            elif is_fault[i] and signal[i] <= thr:
                tmp_cost += false_negative_cost(i, is_fault, fault_found)
        if tmp_cost < best_cost:
            best_thr = thr
            best_cost = tmp_cost
        all_cost.append(tmp_cost)
        all_thr.append(thr)

    return best_cost, best_thr, all_cost, all_thr

In [None]:
def performance_evaluation(signal, thr, rul):
    fp, fn, tp, tot_p = 0, 0, 0, 0
    cost = 0
    alarm = (signal >= thr)
    anticipation = []
    is_fault = (rul == 0)

    fault_found = False
    for i in range(len(rul)):
        if i > 0 and is_fault[i] and not is_fault[i - 1]:
            tot_p += 1
            start = i
        if is_fault[i] and not fault_found and alarm[i]:
            tp += 1
            fault_found = True
            anticipation.append((margin - 1) - (i - start))
        if (i < len(rul) - 1 and is_fault[i] and not is_fault[i + 1] and not fault_found) or (i == len(rul) - 1 and not fault_found):
            fn += 1
        if is_fault[i] and signal[i] <= thr:
            cost += false_negative_cost(i, is_fault, fault_found)
        if not is_fault[i]:
            fault_found = False
            if alarm[i]:
                fp += 1
                cost += false_positive_cost(i, is_fault, fault_found)

    tot_a = sum(anticipation) / 10
    if sum(anticipation) > 0:
        mean_a = mean(anticipation) / 10
    else:
        mean_a = 0

    return [cost, mean_a, tp, fn, fp]

## Baseline: raw signal pre-anomaly detection

In [None]:
seed = 100
feature = "Ax_diff"

f = results_path + "raw_signal-" + feature + "-s" + str(seed) + "-p0"
with open(f, "rb") as file:
    _, _, _, best_cost_raw, best_thr_raw, _, _, _ = pickle.load(file)

training_columns = [feature, "label"]
test_set_raw = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)

test_raw_signal, test_raw_rul = -test_set_raw[:, 0], test_set_raw[:, -1]

t = threshold_path + "raw_signal-" + feature + "-s" + str(seed) + "-p0"
to_serialize = test_raw_signal, best_thr_raw, test_raw_rul
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_raw = performance_evaluation(test_raw_signal, best_thr_raw, test_raw_rul)
all_perf.append(["raw_signal", seed, feature, {}, "0"] + test_perf_raw)

## Autoencoders

In [None]:
params_ae = [{"hidden_ae": [16, 8, 2, 8, 16]},
             {"hidden_ae": [64, 24, 9, 24, 64]},
             {"hidden_ae": [128, 56, 18, 56, 128]}]

seed = 200
columns = "w5_features"
params_idx = 2
params = params_ae[params_idx]

ae = keras.models.load_model(models_path + "autoencoder" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "autoencoder-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_ae, best_thr_ae, _, _, _ = pickle.load(file)

test_set_ae = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)

test_preds_ae = ae.predict(test_set_ae[:, :-1])

test_signal_ae = pd.Series(data=np.sum(np.square(test_preds_ae - test_set_ae[:, :-1]), axis=1))
test_rul_ae = test_set_ae[:, -1]

t = threshold_path + "autoencoder-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_ae, best_thr_ae, test_rul_ae
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_ae = performance_evaluation(test_signal_ae, best_thr_ae, test_rul_ae)
all_perf.append(["autoencoder", seed, columns, params, params_idx] + test_perf_ae)

## Dense Neural Network

In [None]:
params_mlp = [{"hidden_mlp": []},
              {"hidden_mlp": [32]},
              {"hidden_mlp": [64, 32]},
              {"hidden_mlp": [128, 64, 32]}]

seed = 2800
columns = "w5_features"
params_idx = 3
params = params_mlp[params_idx]

mlp = keras.models.load_model(models_path + "mlp" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "mlp-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_mlp, best_thr_mlp, _, _, _ = pickle.load(file)

test_set_mlp = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)

test_preds_mlp = mlp.predict(test_set_mlp[:, :-1]).ravel()

test_signal_mlp = pd.Series(data=(1 - test_preds_mlp))
test_rul_mlp = test_set_mlp[:, -1]

t = threshold_path + "mlp-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_mlp, best_thr_mlp, test_rul_mlp
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_mlp = performance_evaluation(test_signal_mlp, best_thr_mlp, test_rul_mlp)
all_perf.append(["mlp", seed, columns, params, params_idx] + test_perf_mlp)

## Convolutional Neural Network

In [None]:
def sliding_window_2D(data, w_len, stride=1):
    # Get shifted tables
    m = len(data)
    lt = [data.iloc[i:m-w_len+i+1:stride, :].values for i in range(w_len)]
    # Reshape to add a new axis
    s = lt[0].shape
    for i in range(w_len):
        lt[i] = lt[i].reshape(s[0], 1, s[1])
    # Concatenate
    wdata = np.concatenate(lt, axis=1)
    return wdata


def sliding_window_by_fault(data, cols, w_len, stride=1):
    l_w, l_r = [], []
    cols.pop()  # remove "label"
    for gdata in data:
        # Apply a sliding window
        tmp_w = sliding_window_2D(gdata[cols], w_len, stride)
        # Build the RUL vector
        tmp_r = gdata['label'].iloc[w_len-1::stride]
        # Store everything
        l_w.append(tmp_w)
        l_r.append(tmp_r)
    res_w = np.concatenate(l_w)
    res_r = np.concatenate(l_r)
    return res_w, res_r

In [None]:
params_cnn = [{"filters": 1, "kernel_size": 3, "hidden": [32], "w_len": 5},
              {"filters": 4, "kernel_size": 3, "hidden": [32], "w_len": 5},
              {"filters": 1, "kernel_size": 5, "hidden": [32], "w_len": 5},
              {"filters": 4, "kernel_size": 5, "hidden": [32], "w_len": 5},
              {"filters": 4, "kernel_size": 5, "hidden": [64, 32], "w_len": 5},
              {"filters": 1, "kernel_size": 3, "hidden": [32], "w_len": 10},
              {"filters": 4, "kernel_size": 3, "hidden": [32], "w_len": 10},
              {"filters": 1, "kernel_size": 5, "hidden": [32], "w_len": 10},
              {"filters": 4, "kernel_size": 5, "hidden": [32], "w_len": 10},
              {"filters": 4, "kernel_size": 5, "hidden": [64, 32], "w_len": 10},
              {"filters": 4, "kernel_size": 7, "hidden": [128, 64, 32], "w_len": 10},
              {"filters": 1, "kernel_size": 3, "hidden": [64, 32], "w_len": 5},
              {"filters": 1, "kernel_size": 5, "hidden": [64, 32], "w_len": 5},
              {"filters": 1, "kernel_size": 3, "hidden": [64, 32], "w_len": 10},
              {"filters": 1, "kernel_size": 5, "hidden": [64, 32], "w_len": 10}]

In [None]:
seed = 2200
columns = "all_features"
params_idx = 11
params = params_cnn[params_idx]

cnn = keras.models.load_model(models_path + "conv_nn" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "conv_nn-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_cnn, best_thr_cnn, _, _, _ = pickle.load(file)

test_set_cnn = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1, as_list=True)
ts_sw, ts_sw_r = sliding_window_by_fault(test_set_cnn, training_columns.copy(), params["w_len"])

test_preds_cnn = cnn.predict(ts_sw).ravel()

test_signal_cnn = pd.Series(data=(1 - test_preds_cnn))
test_rul_cnn = ts_sw_r

t = threshold_path + "conv_nn-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_cnn, best_thr_cnn, test_rul_cnn
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_cnn = performance_evaluation(test_signal_cnn, best_thr_cnn, test_rul_cnn)
all_perf.append(["conv_nn", seed, columns, params, params_idx] + test_perf_cnn)

## Recurrent Neural Network (LSTM)

In [None]:
def create_dataset_3D(X, y, time_steps):
    Xs, ys = [], []
    for i in range(len(X)-time_steps):
        v = X[i:i+time_steps, :]
        Xs.append(v)
        ys.append(y[i+time_steps])
    return np.array(Xs), np.array(ys).reshape(-1, 1)

In [None]:
params_rnn = [{"time_steps": 5, "units": 64},
              {"time_steps": 5, "units": 128},
              {"time_steps": 10, "units": 64},
              {"time_steps": 10, "units": 128}]

In [None]:
seed = 200
columns = "w5_features"
params_idx = 1
params = params_rnn[params_idx]

model_lstm = keras.models.load_model(models_path + "lstm" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "lstm-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_lstm, best_thr_lstm, _, _, _ = pickle.load(file)

test_set_lstm = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)
X_test_lstm, y_test_lstm = create_dataset_3D(test_set_lstm[:, :-1],
                                test_set_lstm[:, -1],
                                params["time_steps"])
test_preds_lstm = model_lstm.predict(X_test_lstm).ravel()

test_signal_lstm = pd.Series(data=(1 - test_preds_lstm))
test_rul_lstm = y_test_lstm

t = threshold_path + "lstm-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_lstm, best_thr_lstm, test_rul_lstm
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_lstm = performance_evaluation(test_signal_lstm, best_thr_lstm, test_rul_lstm)
all_perf.append(["lstm", seed, columns, params, params_idx] + test_perf_lstm)

## Recurrent Neural Network (GRU)

In [None]:
seed = 300
columns = "all_features"
params_idx = 0
params = params_rnn[params_idx]

model_gru = keras.models.load_model(models_path + "gru" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "gru-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_gru, best_thr_gru, _, _, _ = pickle.load(file)

test_set_gru = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)
X_test_gru, y_test_gru = create_dataset_3D(test_set_gru[:, :-1],
                                test_set_gru[:, -1],
                                params["time_steps"])
test_preds_gru = model_gru.predict(X_test_gru).ravel()

test_signal_gru = pd.Series(data=(1 - test_preds_gru))
test_rul_gru = y_test_gru

t = threshold_path + "gru-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_gru, best_thr_gru, test_rul_gru
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_gru = performance_evaluation(test_signal_gru, best_thr_gru, test_rul_gru)
all_perf.append(["gru", seed, columns, params, params_idx] + test_perf_gru)

## Recurrent Neural Network (BiLSTM)

In [None]:
seed = 100
columns = "w15_features"
params_idx = 1
params = params_rnn[params_idx]

model_bilstm = keras.models.load_model(models_path + "bilstm" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "bilstm-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost_bilstm, best_thr_bilstm, _, _, _ = pickle.load(file)

test_set_bilstm = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1)
X_test_bilstm, y_test_bilstm = create_dataset_3D(test_set_bilstm[:, :-1],
                                test_set_bilstm[:, -1],
                                params["time_steps"])
test_preds_bilstm = model_bilstm.predict(X_test_bilstm).ravel()

test_signal_bilstm = pd.Series(data=(1 - test_preds_bilstm))
test_rul_bilstm = y_test_bilstm

t = threshold_path + "bilstm-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal_bilstm, best_thr_bilstm, test_rul_bilstm
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf_bilstm = performance_evaluation(test_signal_bilstm, best_thr_bilstm, test_rul_bilstm)
all_perf.append(["bilstm", seed, columns, params, params_idx] + test_perf_bilstm)

## Transformers

In [None]:
params_transformer = [{"head_size": 128, "num_heads": 2, "ff_dim": 2, "num_transformer_blocks": 2,
                       "mlp_units": [64], "mlp_dropout": 0.3, "dropout": 0.25, "w_len": 5},
                      {"head_size": 128, "num_heads": 2, "ff_dim": 2, "num_transformer_blocks": 2,
                       "mlp_units": [64], "mlp_dropout": 0.3, "dropout": 0.25, "w_len": 10},
                      {"head_size": 128, "num_heads": 2, "ff_dim": 2, "num_transformer_blocks": 2,
                       "mlp_units": [64], "mlp_dropout": 0.3, "dropout": 0.25, "w_len": 15},
                      {"head_size": 256, "num_heads": 4, "ff_dim": 4, "num_transformer_blocks": 4,
                       "mlp_units": [128], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 5},
                      {"head_size": 256, "num_heads": 4, "ff_dim": 4, "num_transformer_blocks": 4,
                       "mlp_units": [128], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 10},
                      {"head_size": 256, "num_heads": 4, "ff_dim": 4, "num_transformer_blocks": 4,
                       "mlp_units": [128], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 15},
                      {"head_size": 128, "num_heads": 4, "ff_dim": 2, "num_transformer_blocks": 4,
                       "mlp_units": [128], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 10},
                      {"head_size": 128, "num_heads": 4, "ff_dim": 2, "num_transformer_blocks": 2,
                       "mlp_units": [64], "mlp_dropout": 0.3, "dropout": 0.25, "w_len": 10},
                      {"head_size": 256, "num_heads": 2, "ff_dim": 3, "num_transformer_blocks": 4,
                       "mlp_units": [128, 64], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 15},
                      {"head_size": 256, "num_heads": 3, "ff_dim": 3, "num_transformer_blocks": 3,
                       "mlp_units": [128], "mlp_dropout": 0.4, "dropout": 0.25, "w_len": 10},]

In [None]:
seed = 301
columns = "w15_features"
params_idx = 3
params = params_transformer[params_idx]

transformer = keras.models.load_model(models_path + "transformer" + "-" + columns + "-s" + str(seed) + "-p" + str(params_idx))
f = results_path + "transformer-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
with open(f, "rb") as file:
    training_columns, _, _, best_cost, best_thr, _, _, _ = pickle.load(file)

test_set = build_dataset_for_ml_model(df, training_columns=training_columns, split_size=1, as_list=True)
ts_sw, ts_sw_r = sliding_window_by_fault(test_set, training_columns.copy(), params["w_len"])

test_preds = transformer.predict(ts_sw).ravel()

test_signal = pd.Series(data=(1 - test_preds))
test_rul = ts_sw_r

t = threshold_path + "transformer-" + columns + "-s" + str(seed) + "-p" + str(params_idx)
to_serialize = test_signal, best_thr, test_rul
with open(t, "wb") as file:
    pickle.dump(to_serialize, file)

test_perf = performance_evaluation(test_signal, best_thr, test_rul)
all_perf.append(["transformer", seed, columns, params, params_idx] + test_perf)

## Analysis over the test set

In [None]:
df_res = pd.DataFrame(all_perf, columns=["model", "seed", "columns", "params", "params_idx", "cost", "anticipation", "detected_faults", "missed_faults", "false_alarms"])
df_res.to_csv("testing_summary_" + str(margin) + ".csv")