In [14]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from statsmodels.graphics.tsaplots import plot_pacf
from statsmodels.graphics.tsaplots import plot_acf
from matplotlib.pyplot import figure
from sklearn.metrics import f1_score, confusion_matrix, roc_auc_score, recall_score, precision_score

In [2]:
def calculate_score(data_frame, score_type, no_sensors):
    """
    :param data_frame: Complete Dataframe
    :param no_sensors: Number of Sensors in Dataframe
    :param start_time: Start point. Value from time column
    :param max_delta: Number of samples. max_time = (no_samples * (time between two samples))
    :param step_size: It is the number of samples added to current time_delta to get next time_delta
    :return:
    """
    labels_pred = data_frame.iloc[:, (4 * no_sensors) + 2:(5 * no_sensors) + 2]
    labels_true = data_frame.iloc[:, (5 * no_sensors) + 2].values
    
    result_score = np.zeros((no_sensors))

    for sensor in range(no_sensors):
        anomaly_pred = labels_pred.iloc[:, sensor].values
        if score_type == "f1_score":
            result = f1_score(labels_true, anomaly_pred, average="binary")
            index_names = ["f1 sensor_" + str(i) for i in range(no_sensors)]
        elif score_type == "precision_score":
            result = precision_score(labels_true, anomaly_pred, average="binary")
            index_names = ["precision sensor_" + str(i) for i in range(no_sensors)]
        elif score_type == "recall_score":
            result = recall_score(labels_true, anomaly_pred, average="binary")
            index_names = ["recall sensor_" + str(i) for i in range(no_sensors)]   
        else:
            result = 0
            index_names = ["error" for i in range(no_sensors)]
            
        result_score[sensor] = result

    score_df = pd.Series(data=result_score, index=index_names)
    return score_df

In [56]:
def get_confusion_matrix(data_frame, no_sensors, specific_sensor):
    labels_pred = data_frame.iloc[:, (4 * no_sensors) + 2:(5 * no_sensors) + 2]
    labels_true = data_frame.iloc[:, (5 * no_sensors) + 2].values
    
    # No errors are on sawtooth signal (labels_true in dataframe is regarding sine wave)
    if specific_sensor == 1:
        labels_true = np.full((data_frame.shape[0]), 0)
    
    anomaly_pred = labels_pred.iloc[:, specific_sensor].values
    return confusion_matrix(labels_true, anomaly_pred).ravel()

In [21]:
def visualise_metric_per_sensor(results, title):
    fig, axes = plt.subplots(results.shape[1]-1, 1, figsize=(10,20),constrained_layout=False)
    ax = axes.ravel()
    t = results.loc[:,"delta_t"]
    columns = results.columns
    for i in range(results.shape[1]-1): 
        sns.lineplot(data=results, 
                     x=t, 
                     y=columns[i], 
                     ax=ax[i],
                     linewidth=1,
                     color="black")
        ax[i].set_xlabel("delta t [in samples]")
    plt.tight_layout()
    plt.subplots_adjust(top=0.95)
    plt.suptitle(title, fontsize=16)

In [4]:
def visualise_metric_machine(results, score_type, phase):
    t = results.loc[:,"delta_t"]
    complete_title = "CPPS Data - Beginning of Phase '{}''".format(phase)
    
    # Caluculate Metric for hole machine (sum over sensors and devide by no_sensors)
    labels = results.drop(columns="delta_t", axis=0)
    result_machine = labels.sum(axis=1) / results.shape[1]
    
    # Visualise Results
    sns.lineplot(x=t, 
                 y=result_machine, 
                 linewidth=1,
                 color="black")
    plt.xlabel("delta t [in samples]")
    plt.ylabel("{} over all dim".format(score_type))
    #plt.tight_layout()
    plt.title(complete_title, fontsize=16, y=1.12)

# Evaluation of Prediction Interval Metric
## Setup

In [7]:
all_data = pd.read_csv("../../files/prediction/MLE/artifical_2_signals.csv", sep=";")

In [8]:
all_data.head()

Unnamed: 0,ID,sine_signal target,sawtooth_signal target,sine_signal mu predicted,sawtooth_signal mu predicted,sine_signal sigma predicted,sawtooth_signal sigma predicted,mean normalised residual,sine_signal normalised residual,sawtooth_signal normalised residual,Anomaly Sensor_1,Anomaly Sensor_2,anomaly
0,100.0,0.094079,-1.559943,0.042451,0.508776,0.122669,1.103673,-0.726763,0.42087,-1.874395,0,0,0
1,101.0,0.213996,-1.146374,0.158888,-0.624207,0.116932,0.750029,-0.112455,0.471285,-0.696196,0,0,0
2,102.0,0.350721,-1.672505,0.346661,-0.944065,0.117273,0.630579,-0.560284,0.034625,-1.155193,0,0,0
3,103.0,0.56445,-0.276615,0.533433,-0.875655,0.118977,0.530954,0.694465,0.260696,1.128233,0,0,0
4,104.0,0.692688,-0.955894,0.720765,-0.810976,0.120779,0.521465,-0.255184,-0.232465,-0.277904,0,0,0


# Evaluation Metrics
## F1-score

In [11]:
f1_score = calculate_score(all_data, "f1_score", 2)
print("F1 score: {}".format(f1_score.sum()/2))

F1 score: 0.3038645570587786


## Precision Score

In [15]:
precision_score = calculate_score(all_data, "precision_score", 2)
print("Precision score: {}".format(precision_score.sum()/2))

Precision score: 0.3011042735042735


## Recall Score

In [16]:
recall_score = calculate_score(all_data, "recall_score", 2)
print("Recall score: {}".format(recall_score.sum()/2))

Recall score: 0.3075


## Confusion Matrix
### Sensor with errors

In [53]:
print("Positive --> Anomaly")
print("Negative --> Normal Behaviour")
print("--"*15)
tn, fp, fn, tp = get_confusion_matrix(all_data, 2, 0)
print("Sensor No. {}:".format(1))
print("True negative: {}".format(tn))
print("False positive: {}".format(fp))
print("False negative: {}".format(fn))
print("True positive: {}".format(tp))

Positive --> Anomaly
Negative --> Normal Behaviour
------------------------------
11022
Sensor No. 1:
True negative: 11022
False positive: 278
False negative: 253
True positive: 347


### Sensor without errors

In [57]:
print("Positive --> Anomaly")
print("Negative --> Normal Behaviour")
print("--"*15)
tn, fp, fn, tp = get_confusion_matrix(all_data, 2, 1)
print("Sensor No. {}:".format(2))
print("True negative: {}".format(tn))
print("False positive: {}".format(fp))
print("False negative: {}".format(fn))
print("True positive: {}".format(tp))

Positive --> Anomaly
Negative --> Normal Behaviour
------------------------------
Sensor No. 2:
True negative: 11432
False positive: 468
False negative: 0
True positive: 0
