<center>
    <a href="https://www.aus.edu/"><img src="https://i.imgur.com/pdZvnSD.png" width=200> </a>
</center>

<h1 align=center><font size = 5>Anomaly Detection</font>
<h1 align=center><font size = 5>Prepared by Alex Aklson, Ph.D.</font>
<h1 align=center><font size = 5>November 7, 2024</font>

Import libraries and packages.

In [1]:
import numpy as np
import pandas as pd

from scipy.stats import norm
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

Read Aircraft Engine Data

In [2]:
aircraft_data = pd.read_csv('/content/aircraft_engine.csv')

In [3]:
aircraft_data.shape

(1020, 3)

Split data into normal and anomaly data

In [5]:
normal_data = aircraft_data[aircraft_data['label'] == 0]
anomaly_data = aircraft_data[aircraft_data['label'] == 1]

Use 60% of data for training and 40% for validation and testing.

In [6]:
train_normal, valid_test_normal = train_test_split(
    normal_data,
    test_size=0.4,
    random_state=42
)

Split validataion and test data into validation set and test set.

In [7]:
val_normal, test_normal = train_test_split(
    valid_test_normal,
    test_size=0.5,
    random_state=42
)

Repeat the same split for anomaly data.

In [8]:
val_anomaly, test_anomaly = train_test_split(
    anomaly_data,
    test_size=0.5,
    random_state=42
)

Create validation and test sets by combining normal and anomaly data together.

In [9]:
val_data = pd.concat(
    [val_normal, val_anomaly], ignore_index=True
)

In [10]:
test_data = pd.concat(
    [test_normal, test_anomaly], ignore_index=True
)

Fit the training data by estimating the means and standard deviations of features.

In [12]:
mean_1 = train_normal['heat_generated'].mean()
std_1 = train_normal['heat_generated'].std()

In [13]:
mean_2 = train_normal['vibration_intensity'].mean()
std_2 = train_normal['vibration_intensity'].std()

Get the probabilities for each point in the validation set.

In [15]:
val_data['prob_feature_1'] = norm.pdf(val_data['heat_generated'], mean_1, std_1)
val_data['prob_feature_2'] = norm.pdf(val_data['vibration_intensity'], mean_2, std_2)
val_data['overall_prob'] = val_data['prob_feature_1'] * val_data['prob_feature_2']

Use the validation set to estimate the threshold epsilon ($\epsilon$).

In [16]:
sorted_val_data = val_data.sort_values(
    by='overall_prob', ascending=False
).reset_index(drop=True)

In [17]:
first_anomalous_index = sorted_val_data[sorted_val_data['label'] == 1].index[0]
first_anomalous_score = sorted_val_data.loc[first_anomalous_index, 'overall_prob']

first_anomalous_score could serve as ($\epsilon$).

Or we can add some buffer to reduce false negatives.

In [18]:
margin = 0.01 * (sorted_val_data['overall_prob'].max() - sorted_val_data['overall_prob'].min())
threshold = first_anomalous_score + margin

Evaluate the anomaly detection model on the test data.

In [20]:
test_data['prob_feature_1'] = norm.pdf(test_data['heat_generated'], mean_1, std_1)

In [21]:
test_data['prob_feature_2'] = norm.pdf(test_data['vibration_intensity'], mean_2, std_2)

In [22]:
test_data['overall_prob'] = test_data['prob_feature_1'] * test_data['prob_feature_2']

In [23]:
test_data['predicted_label'] = (test_data['overall_prob'] < threshold).astype(int)

Calculate the evaluation metrics.

In [24]:
precision = precision_score(test_data['label'], test_data['predicted_label'])

In [25]:
recall = recall_score(test_data['label'], test_data['predicted_label'])

In [26]:
f1 = f1_score(test_data['label'], test_data['predicted_label'])

In [27]:
conf_matrix = confusion_matrix(test_data['label'], test_data['predicted_label'])

In [29]:
print("Threshold: {}".format(threshold))
print("Precision: {}".format(np.round(precision)))
print("Recall: {}".format(np.round(recall)))
print("F1 Score: {}".format(np.round(f1)))
print("Confusion Matrix:")
print(conf_matrix)

Threshold: 0.00026769751119968285
Precision: 1.0
Recall: 1.0
F1 Score: 1.0
Confusion Matrix:
[[198   2]
 [  0  10]]
