# Anomaly Detection of Hydrogen Pressure Multivariate Time Series

In [None]:
import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split, KFold, GridSearchCV
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, make_scorer
import numpy as np
from sklearn.svm import OneClassSVM
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import BaggingClassifier
from sklearn.ensemble import AdaBoostClassifier
import pygad
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from matplotlib.legend_handler import HandlerPathCollection
from tsfresh import extract_features
from tsfresh.utilities.dataframe_functions import impute
from tsfresh import select_features
import plotly.graph_objects as go
import warnings
from itertools import cycle
warnings.filterwarnings("ignore")

# Dataset Description 

In [None]:
dataset_normale = pd.read_csv("raw_normal_dataset_def.csv")

print("Dataset Normale:")
print("\nDescription dataset:")
dataset_normale.Time = pd.to_datetime(dataset_normale.Time, infer_datetime_format=True, unit='s')
dataset_normale['Time'] = dataset_normale['Time'].apply(lambda x: x.replace(year=2024,month=5, day=15))
display(dataset_normale)
print(dataset_normale.info())
print(dataset_normale.iloc[:, 1:7].shape)
print(dataset_normale.iloc[:, 1:7].describe())



In [None]:

dataset_anomalo_1 = pd.read_csv("raw_anomaly1_dataset_def.csv")
print("Dataset Anomalia 1:")
print("\nDescription dataset:")
dataset_anomalo_1.Time = pd.to_datetime(dataset_anomalo_1.Time, infer_datetime_format=True, unit='s')
dataset_anomalo_1['Time'] = dataset_anomalo_1['Time'].apply(lambda x: x.replace(year=2024,month=5, day=15))
display(dataset_anomalo_1)
print(dataset_anomalo_1.info())
print(dataset_anomalo_1.iloc[:, 1:7].shape)
print(dataset_anomalo_1.iloc[:, 1:7].describe())




In [None]:
dataset_anomalo_2 = pd.read_csv("raw_anomaly2_dataset_def.csv")
print("Dataset Anomalia 2:")
print("\nDescription dataset:")
dataset_anomalo_2.Time = pd.to_datetime(dataset_anomalo_2.Time, infer_datetime_format=True, unit='s')
dataset_anomalo_2['Time'] = dataset_anomalo_2['Time'].apply(lambda x: x.replace(year=2024,month=5, day=15))
display(dataset_anomalo_2)
print(dataset_anomalo_2.info())
print(dataset_anomalo_2.iloc[:, 1:7].shape)
print(dataset_anomalo_2.iloc[:, 1:7].describe())

In [None]:
dataset_anomalo_3 = pd.read_csv("raw_anomaly3_dataset_def.csv")
print("Dataset Anomalia 3:")
print("\nDescription dataset:")
dataset_anomalo_3.Time = pd.to_datetime(dataset_anomalo_3.Time, infer_datetime_format=True, unit='s')
dataset_anomalo_3['Time'] = dataset_anomalo_3['Time'].apply(lambda x: x.replace(year=2024,month=5, day=15))
display(dataset_anomalo_3)
print(dataset_anomalo_3.info())
print(dataset_anomalo_3.iloc[:, 1:7].shape)
print(dataset_anomalo_3.iloc[:, 1:7].describe())

# Plotting Multivariate Time Series Raw Datasets

In [None]:

layout = dict(xaxis=dict(title='Time'), yaxis=dict(title='Pressure'))

fig = go.Figure(layout=layout)

fig.add_trace(go.Scatter(x=dataset_normale.Time, y=dataset_normale.PS_1, 
                         mode='markers',
                         marker=dict(color='red')))
fig.add_trace(go.Scatter(x=dataset_normale.Time, y=dataset_normale.PS_2, 
                         mode='markers',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=dataset_normale.Time, y=dataset_normale.PS_3, 
                         mode='markers',
                         marker=dict(color='green')))
fig.add_trace(go.Scatter(x=dataset_normale.Time, y=dataset_normale.PS_4, 
                         mode='markers',
                         marker=dict(color='yellow')))
fig.add_trace(go.Scatter(x=dataset_normale.Time, y=dataset_normale.PS_5, 
                         mode='markers',
                         marker=dict(color='orange')))

names = cycle(['PS_1', 'PS_2','PS_3','PS_4','PS_5'])
fig.for_each_trace(lambda t:  t.update(name = next(names)))

In [None]:
layout = dict(xaxis=dict(title='Time'), yaxis=dict(title='Pressure'))

fig = go.Figure(layout=layout)

fig.add_trace(go.Scatter(x=dataset_anomalo_1.Time, y=dataset_anomalo_1.PS_1, 
                         mode='markers',
                         marker=dict(color='red')))
fig.add_trace(go.Scatter(x=dataset_anomalo_1.Time, y=dataset_anomalo_1.PS_2, 
                         mode='markers',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=dataset_anomalo_1.Time, y=dataset_anomalo_1.PS_3, 
                         mode='markers',
                         marker=dict(color='green')))
fig.add_trace(go.Scatter(x=dataset_anomalo_1.Time, y=dataset_anomalo_1.PS_4, 
                         mode='markers',
                         marker=dict(color='yellow')))
fig.add_trace(go.Scatter(x=dataset_anomalo_1.Time, y=dataset_anomalo_1.PS_5, 
                         mode='markers',
                         marker=dict(color='orange')))

names = cycle(['PS_1', 'PS_2','PS_3','PS_4','PS_5'])
fig.for_each_trace(lambda t:  t.update(name = next(names)))

In [None]:
layout = dict(xaxis=dict(title='Time'), yaxis=dict(title='Pressure'))

fig = go.Figure(layout=layout)

fig.add_trace(go.Scatter(x=dataset_anomalo_2.Time, y=dataset_anomalo_2.PS_1, 
                         mode='markers',
                         marker=dict(color='red')))
fig.add_trace(go.Scatter(x=dataset_anomalo_2.Time, y=dataset_anomalo_2.PS_2, 
                         mode='markers',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=dataset_anomalo_2.Time, y=dataset_anomalo_2.PS_3, 
                         mode='markers',
                         marker=dict(color='green')))
fig.add_trace(go.Scatter(x=dataset_anomalo_2.Time, y=dataset_anomalo_2.PS_4, 
                         mode='markers',
                         marker=dict(color='yellow')))
fig.add_trace(go.Scatter(x=dataset_anomalo_2.Time, y=dataset_anomalo_2.PS_5, 
                         mode='markers',
                         marker=dict(color='orange')))

names = cycle(['PS_1', 'PS_2','PS_3','PS_4','PS_5'])
fig.for_each_trace(lambda t:  t.update(name = next(names)))

In [None]:
layout = dict(xaxis=dict(title='Time'), yaxis=dict(title='Pressure'))

fig = go.Figure(layout=layout)

fig.add_trace(go.Scatter(x=dataset_anomalo_3.Time, y=dataset_anomalo_3.PS_1, 
                         mode='markers',
                         marker=dict(color='red')))
fig.add_trace(go.Scatter(x=dataset_anomalo_3.Time, y=dataset_anomalo_3.PS_2, 
                         mode='markers',
                         marker=dict(color='blue')))
fig.add_trace(go.Scatter(x=dataset_anomalo_3.Time, y=dataset_anomalo_3.PS_3, 
                         mode='markers',
                         marker=dict(color='green')))
fig.add_trace(go.Scatter(x=dataset_anomalo_3.Time, y=dataset_anomalo_3.PS_4, 
                         mode='markers',
                         marker=dict(color='yellow')))
fig.add_trace(go.Scatter(x=dataset_anomalo_3.Time, y=dataset_anomalo_3.PS_5, 
                         mode='markers',
                         marker=dict(color='orange')))

names = cycle(['PS_1', 'PS_2','PS_3','PS_4','PS_5'])
fig.for_each_trace(lambda t:  t.update(name = next(names)))

# Feature Extraction

In [None]:
dataset_normale.reset_index(inplace=True)
dataset_normale.rename(columns={'index': 'ID'}, inplace=True)
dataset_anomalo_1.reset_index(inplace=True)
dataset_anomalo_1.rename(columns={'index': 'ID'}, inplace=True)
dataset_anomalo_2.reset_index(inplace=True)
dataset_anomalo_2.rename(columns={'index': 'ID'}, inplace=True)
dataset_anomalo_3.reset_index(inplace=True)
dataset_anomalo_3.rename(columns={'index': 'ID'}, inplace=True)

y_tot = pd.concat([dataset_normale['Y'], dataset_anomalo_1['Y'],  dataset_anomalo_2['Y'],  dataset_anomalo_3['Y']], ignore_index=True)
extracted_features_norm = extract_features(dataset_normale.drop(columns='Y'), column_id="ID", column_sort="Time")

extracted_features_anom_1 = extract_features(dataset_anomalo_1.drop(columns='Y'), column_id="ID", column_sort="Time")

extracted_features_anom_2 = extract_features(dataset_anomalo_2.drop(columns='Y'), column_id="ID", column_sort="Time")

extracted_features_anom_3 = extract_features(dataset_anomalo_3.drop(columns='Y'), column_id="ID", column_sort="Time")

impute(extracted_features_norm)
impute(extracted_features_anom_1)
impute(extracted_features_anom_2)
impute(extracted_features_anom_3)

extracted_features_total = pd.concat([extracted_features_norm,  extracted_features_anom_1, extracted_features_anom_2,  extracted_features_anom_3], ignore_index=True)
display(extracted_features_total)

# Feature Selection

In [None]:
features_filtered = select_features(extracted_features_total, y_tot)
display(features_filtered)


# Split Dataset : Train, Validation and Test

In [None]:
X = features_filtered.values  
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_temp, y_train, y_temp = train_test_split(X_scaled, y_tot, test_size=0.2, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.1, random_state=42)

# Anomaly Detection Algorithms

# One Class SVM:  Hyperparameter Optimization with GridSearchCV

In [None]:
oneclass_svm = OneClassSVM()
 
param_grid = {
    'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
    'nu': np.linspace(0.01, 1, 5),
    'gamma': np.linspace(0.01, 1, 5)
}
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=oneclass_svm, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()

# Isolation Forest:  Hyperparameter Optimization with GridSearchCV

In [None]:
iso_forest = IsolationForest()
 
param_grid = {
    'n_estimators': list(range(1,5)),
    'contamination': np.linspace(0.1, 0.5, 5) ,
    'max_features': np.linspace(0.1, 1.0, 5),
    'bootstrap': True,
    'random_state': 42
}
 
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=iso_forest, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()

# Random Forest:  Hyperparameter Optimization with GridSearchCV

In [None]:
rand_forest = RandomForestClassifier()
 
param_grid = {
    'n_estimators': list(range(1,5)),
    'max_depth': list(range(1,10)),
    'random_state': 42,
    'max_samples':  np.linspace(0.1, 0.5, 5),
    'bootstrap': True
}
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=rand_forest, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()

# Local Outlier Factor:  Hyperparameter Optimization with GridSearchCV

In [None]:
loc_outl = LocalOutlierFactor()
 
param_grid = {
    'algorithm': ['ball_tree', 'kd_tree'],
    'n_neighbors': list(range(10,15)),
    'leaf_size' : list(range(30,35)),
    'contamination': np.linspace(0.1, 0.5, 5), 
    'novelty': True
}
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=loc_outl, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()

In [None]:
X_scores = loc_outl.negative_outlier_factor_
 
 
y_pred_val = loc_outl.predict(X_val)
n_errors = (y_pred_val != y_val).sum()
 
 
def update_legend_marker_size(handle, orig):
    "Personalizza la dimensione del marcatore della legenda"
    handle.update_from(orig)
    handle.set_sizes([20])
 
plt.scatter(X_train[:, 0], X_train[:, 1], color="k", s=3.0, label="Data points")
radius = (X_scores.max() - X_scores) / (X_scores.max() - X_scores.min())
scatter = plt.scatter(
    X_train[:, 0],
    X_train[:, 1],
    s=1000 * radius,
    edgecolors="r",
    facecolors="none",
    label="Outlier scores",
)
plt.axis("tight")
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.xlabel("prediction errors: %d" % (n_errors))
plt.legend(
    handler_map={scatter: HandlerPathCollection(update_func=update_legend_marker_size)}
)
plt.title("Local Outlier Factor (LOF)")
plt.show()

# Bagging With One Class SVM:  Hyperparameter Optimization with GridSearchCV

In [None]:
bagging_model = BaggingClassifier()
 
param_grid = {
    'base_estimator': oneclass_svm,
    'n_estimators': list(range(1,8)),
    'max_samples':  np.linspace(0.1, 0.5, 5), 
    'bootstrap': True,
    'random_state':42
}
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=bagging_model, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()

# Boosting (AdaBoost) With One Class SVM:  Hyperparameter Optimization with GridSearchCV

In [None]:
boosting_model = AdaBoostClassifier()
 
param_grid = {
    'base_estimator': oneclass_svm,
    'n_estimators': list(range(1,4)),
    'learning_rate':  np.linspace(0.1, 1, 5),
    'random_state':42
}
 
f1_scorer = make_scorer(f1_score, average='binary')
grid_search = GridSearchCV(estimator=boosting_model, param_grid=param_grid, scoring=f1_scorer, cv=5, n_jobs=-1, verbose=2)
grid_search.fit(X_train, y_train)
 
print("Best parameters found: ", grid_search.best_params_)
print("Best cross-validation F1 score: ", grid_search.best_score_)
 
best_model = grid_search.best_estimator_
y_pred_val = best_model.predict(X_val)
accuracy_val = accuracy_score(y_val, y_pred_val)
f1_score_val = f1_score(y_val, y_pred_val, pos_label=1)
confusion_matrix_val = confusion_matrix(y_val, y_pred_val, labels=[1,-1])
print('accuracy_val: ',accuracy_val)
print('f1_score_val: ',f1_score_val)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_val, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Validation Set')
plt.show()
 
 
y_pred_test = best_model.predict(X_test)
accuracy_test = accuracy_score(y_test, y_pred_test)
f1_score_test = f1_score(y_test, y_pred_test, pos_label=1)
confusion_matrix_test = confusion_matrix(y_test, y_pred_test, labels=[1,-1])
print('accuracy_test: ',accuracy_test)
print('f1_score_test: ',f1_score_test)
 
 
plt.figure(figsize=(8, 6))
sns.heatmap(confusion_matrix_test, annot=True, cmap='Blues', fmt='g', xticklabels=[1, -1], yticklabels=[1, -1])
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix Test Set')
plt.show()