In [1]:
# libraries importing
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from sklearn import metrics
import numpy as np
import os

colors = ['#1f77b4','#ff7f0e','#2ca02c','#d62728','#9467bd','#8c564b','#e377c2','#7f7f7f','#bcbd22','#17becf']

from algorithms.Vanilla_LSTM import Vanilla_LSTM
from process_data import process_data

## Data loading

In [2]:
datasets = process_data()

valve1_X =  datasets["valve1_X"]
valve1_y = datasets["valve1_y"]
valve2_X = datasets["valve2_X"]
valve2_y = datasets["valve2_y"]
other_anomaly_X = datasets["other_anomaly_X"]
other_anomaly_y = datasets["other_anomaly_y"]

In [3]:
# hyperparameters selection
N_STEPS = 5
EPOCHS = 10
BATCH_SIZE = 32
VAL_SPLIT = 0.2
Qs = np.arange(0.55, 0.90, 0.05) # quantile for upper control limit (UCL) selection
PARAMS = [N_STEPS, EPOCHS, BATCH_SIZE, VAL_SPLIT]
model = Vanilla_LSTM(PARAMS)

In [4]:
def test_train_split(df_X, df_y):
    size_train = int(df_X.shape[0]*0.8)
    size_test = df_X.shape[0] - size_train
    x_train = df_X[:size_train]
    y_train = df_y[:size_train].anomaly
    x_test = df_X[-size_test:]
    y_test = df_y[-size_test:].anomaly
    return x_train, y_train, x_test, y_test

In [5]:
def split_sequences(sequences, n_steps):
    X, y = list(), list()
    for i in range(len(sequences)):
        # find the end of this pattern
        end_ix = i + n_steps
        # check if we are beyond the dataset
        if end_ix > len(sequences)-1:
            break
        # gather input and output parts of the pattern
        seq_x, seq_y = sequences[i:end_ix, :], sequences[end_ix, :]
        X.append(seq_x)
        y.append(seq_y)
    return np.array(X), np.array(y)

#### Test model for valve 1

In [6]:
results_valve1 = pd.DataFrame(columns = ['Q', 'TPR', 'TNR', 'PPV', 'NPV', 'FPR', 'FNR', 'FDR', 'ACC'])

x_train, y_train, x_test, y_test = test_train_split(valve1_X, valve1_y)
x_train_steps, y_train_steps = split_sequences(np.array([row.values for i, row in x_train.iterrows()]), N_STEPS)
x_test_steps, y_test_steps = split_sequences(np.array([row.values for i, row in x_test.iterrows()]), N_STEPS)

model.fit(x_train_steps,y_train_steps)

for Q in Qs:

    # results predicting
    residuals_train = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    UCL = residuals_train.quantile(Q)

    # train predicting
    lstm_residuals = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    yhat_train = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_train[N_STEPS:].index).fillna(0)

    # test prediction
    lstm_residuals = pd.DataFrame(y_test_steps - model.predict(x_test_steps)).abs().sum(axis=1)
    yhat_test = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_test[N_STEPS:].index).fillna(0)

    conf_matrix = metrics.confusion_matrix(y_test[N_STEPS:], yhat_test)

    TN, FP, FN, TP = conf_matrix.ravel()

    # Sensitivity, hit rate, recall, or true positive rate
    TPR = TP/(TP+FN)
    # Specificity or true negative rate
    TNR = TN/(TN+FP)
    # Precision or positive predictive value
    PPV = TP/(TP+FP)
    # Negative predictive value
    NPV = TN/(TN+FN)
    # Fall out or false positive rate FAR false alarm rate
    FPR = FP/(FP+TN)
    # False negative rate MAR missing alarm rate
    FNR = FN/(TP+FN)
    # False discovery rate
    FDR = FP/(TP+FP)
    # Overall accuracy
    ACC = (TP+TN)/(TP+FP+FN+TN)

    row = dict(Q = Q,
            TPR = TPR,
            TNR = TNR,
            PPV = PPV,
            NPV = NPV,
            FPR = FPR,
            FNR = FNR,
            FDR = FDR,
            ACC = ACC)

    results_valve1 = pd.concat([results_valve1, pd.DataFrame(row, index = [0])], ignore_index = True)



  results_valve1 = pd.concat([results_valve1, pd.DataFrame(row, index = [0])], ignore_index = True)




In [7]:
for i, col in enumerate(results_valve1.columns[1:]):
    fig = go.Figure()

    fig.add_trace(go.Scatter(mode='lines+text', x=results_valve1.Q, y=results_valve1[f'{col}'],
                            marker=dict(color=colors[i]),
                            texttemplate='%{y:.2f}', textposition='top center',
                            textfont=dict(color=colors[i], size=12),
                            name=f'{col}',
                            showlegend=True)
                    )

    fig.update_layout(height=400,width=900, template='plotly_white',
                    title=dict(text=f'{col} with different Q values', font=dict(size=18), x=.5, y=.95),
                    yaxis=dict(title=f'{col}', side='left', showgrid=True,),
                    xaxis=dict(title='Q', showgrid=False),
                    legend=dict(orientation="h", yanchor="bottom", y=1, x=0.5, xanchor="center"),
                    )

    fig.show()

#### Test model for valve 2

In [8]:
results_valve2 = pd.DataFrame(columns = ['Q', 'TPR', 'TNR', 'PPV', 'NPV', 'FPR', 'FNR', 'FDR', 'ACC'])

x_train, y_train, x_test, y_test = test_train_split(valve2_X, valve2_y)
x_train_steps, y_train_steps = split_sequences(np.array([row.values for i, row in x_train.iterrows()]), N_STEPS)
x_test_steps, y_test_steps = split_sequences(np.array([row.values for i, row in x_test.iterrows()]), N_STEPS)

model.fit(x_train_steps,y_train_steps)

for Q in Qs:

    # results predicting
    residuals_train = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    UCL = residuals_train.quantile(Q)

    # train predicting
    lstm_residuals = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    yhat_train = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_train[N_STEPS:].index).fillna(0)

    # test prediction
    lstm_residuals = pd.DataFrame(y_test_steps - model.predict(x_test_steps)).abs().sum(axis=1)
    yhat_test = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_test[N_STEPS:].index).fillna(0)

    conf_matrix = metrics.confusion_matrix(y_test[N_STEPS:], yhat_test)

    TN, FP, FN, TP = conf_matrix.ravel()

    # Sensitivity, hit rate, recall, or true positive rate
    TPR = TP/(TP+FN)
    # Specificity or true negative rate
    TNR = TN/(TN+FP)
    # Precision or positive predictive value
    PPV = TP/(TP+FP)
    # Negative predictive value
    NPV = TN/(TN+FN)
    # Fall out or false positive rate FAR false alarm rate
    FPR = FP/(FP+TN)
    # False negative rate MAR missing alarm rate
    FNR = FN/(TP+FN)
    # False discovery rate
    FDR = FP/(TP+FP)
    # Overall accuracy
    ACC = (TP+TN)/(TP+FP+FN+TN)

    row = dict(Q = Q,
            TPR = TPR,
            TNR = TNR,
            PPV = PPV,
            NPV = NPV,
            FPR = FPR,
            FNR = FNR,
            FDR = FDR,
            ACC = ACC)

    results_valve2 = pd.concat([results_valve2, pd.DataFrame(row, index = [0])], ignore_index = True)




The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.





In [9]:
for i, col in enumerate(results_valve2.columns[1:]):
    fig = go.Figure()

    fig.add_trace(go.Scatter(mode='lines+text', x=results_valve2.Q, y=results_valve2[f'{col}'],
                            marker=dict(color=colors[i]),
                            texttemplate='%{y:.2f}', textposition='top center',
                            textfont=dict(color=colors[i], size=12),
                            name=f'{col}',
                            showlegend=True)
                    )

    fig.update_layout(height=400,width=900, template='plotly_white',
                    title=dict(text=f'{col} with different Q values', font=dict(size=18), x=.5, y=.95),
                    yaxis=dict(title=f'{col}', side='left', showgrid=True,),
                    xaxis=dict(title='Q', showgrid=False),
                    legend=dict(orientation="h", yanchor="bottom", y=1, x=0.5, xanchor="center"),
                    )

    fig.show()

#### Test model for other anomalies

In [10]:
results_other_anomaly = pd.DataFrame(columns = ['Q', 'TPR', 'TNR', 'PPV', 'NPV', 'FPR', 'FNR', 'FDR', 'ACC'])

x_train, y_train, x_test, y_test = test_train_split(other_anomaly_X, other_anomaly_y)
x_train_steps, y_train_steps = split_sequences(np.array([row.values for i, row in x_train.iterrows()]), N_STEPS)
x_test_steps, y_test_steps = split_sequences(np.array([row.values for i, row in x_test.iterrows()]), N_STEPS)

model.fit(x_train_steps,y_train_steps)

for Q in Qs:

    # results predicting
    residuals_train = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    UCL = residuals_train.quantile(Q)

    # train predicting
    lstm_residuals = pd.DataFrame(y_train_steps - model.predict(x_train_steps)).abs().sum(axis=1)
    yhat_train = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_train[N_STEPS:].index).fillna(0)

    # test prediction
    lstm_residuals = pd.DataFrame(y_test_steps - model.predict(x_test_steps)).abs().sum(axis=1)
    yhat_test = pd.Series((lstm_residuals > UCL).astype(int).values, 
                                index=x_test[N_STEPS:].index).fillna(0)

    conf_matrix = metrics.confusion_matrix(y_test[N_STEPS:], yhat_test)

    TN, FP, FN, TP = conf_matrix.ravel()

    # Sensitivity, hit rate, recall, or true positive rate
    TPR = TP/(TP+FN)
    # Specificity or true negative rate
    TNR = TN/(TN+FP)
    # Precision or positive predictive value
    PPV = TP/(TP+FP)
    # Negative predictive value
    NPV = TN/(TN+FN)
    # Fall out or false positive rate FAR false alarm rate
    FPR = FP/(FP+TN)
    # False negative rate MAR missing alarm rate
    FNR = FN/(TP+FN)
    # False discovery rate
    FDR = FP/(TP+FP)
    # Overall accuracy
    ACC = (TP+TN)/(TP+FP+FN+TN)

    row = dict(Q = Q,
            TPR = TPR,
            TNR = TNR,
            PPV = PPV,
            NPV = NPV,
            FPR = FPR,
            FNR = FNR,
            FDR = FDR,
            ACC = ACC)

    results_other_anomaly = pd.concat([results_other_anomaly, pd.DataFrame(row, index = [0])], ignore_index = True)




The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.





In [11]:
for i, col in enumerate(results_other_anomaly.columns[1:]):
    fig = go.Figure()

    fig.add_trace(go.Scatter(mode='lines+text', x=results_other_anomaly.Q, y=results_other_anomaly[f'{col}'],
                            marker=dict(color=colors[i]),
                            texttemplate='%{y:.2f}', textposition='top center',
                            textfont=dict(color=colors[i], size=12),
                            name=f'{col}',
                            showlegend=True)
                    )

    fig.update_layout(height=400,width=900, template='plotly_white',
                    title=dict(text=f'{col} with different Q values', font=dict(size=18), x=.5, y=.95),
                    yaxis=dict(title=f'{col}', side='left', showgrid=True,),
                    xaxis=dict(title='Q', showgrid=False),
                    legend=dict(orientation="h", yanchor="bottom", y=1, x=0.5, xanchor="center"),
                    )

    fig.show()