<a href="https://colab.research.google.com/github/ABRD123/anamoly-detection/blob/main/LSTMAutoEncoders_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install tensorflow



In [None]:
# @title import libraries
import os
import pandas as pd
import numpy as np
import joblib
import seaborn as sns
sns.set(color_codes=True)
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from numpy.random import seed
import tensorflow as tf
import logging


from keras.layers import Input, Dropout, Dense, LSTM, TimeDistributed, RepeatVector
from keras.models import Model
from keras import regularizers

In [None]:
# @title Set seeds for reproducibility
tf.random.set_seed(42)
seed(10)

In [None]:
# @title Step 1: Load the dataset
# load, average and merge sensor samples
data_dir = 'UCR'  # Directory containing your files
all_data = []     # List to store each file's data

# Loop through each file in the directory
for filename in os.listdir(data_dir):
    file_path = os.path.join(data_dir, filename)
    if os.path.isfile(file_path):  # Ensure it's a file
        # Read the file without headers since each file has one column
        data = pd.read_csv(file_path, sep='\t', header=None)
        all_data.append(data)  # Append each dataframe to the list

# Concatenate all data into a single DataFrame
merged_data = pd.concat(all_data, ignore_index=True)

# Use shape as a property, not a method
print(merged_data.shape)
merged_data.head()

(156134, 1)


Unnamed: 0,0
0,-34.021073
1,-31.984995
2,-31.461852
3,-36.254056
4,-34.811771


In [None]:
# Split Data into Train, Validation, and Test Sets
def split_data(data, test_size=0.2, val_size=0.2):
    # Split into train and temp (validation + test)
    train_data, temp_data = train_test_split(data, test_size=test_size, random_state=42)

    # Split temp_data into validation and test sets
    val_data, test_data = train_test_split(temp_data, test_size=val_size/(1-test_size), random_state=42)

    return train_data, val_data, test_data

# @title Step 2: Split the normalized data
train_data, val_data, test_data = split_data(merged_data)

In [None]:
print("Training dataset shape:", train_data.shape)
print("Test dataset shape:", test_data.shape)

Training dataset shape: (124907, 1)
Test dataset shape: (7807, 1)


In [None]:
for col in train_data.columns:
    try:
        train_data[col].astype(float)
    except ValueError:
        print(f"Non-numeric data found in column: {col}")
        print(train_data[col])

In [None]:
# @title Step 3: Normalize the entire dataset
scaler = MinMaxScaler()
X_train = scaler.fit_transform(train_data)
X_test = scaler.transform(test_data)

scaler_filename = "scaler_data"
joblib.dump(scaler, scaler_filename)

['scaler_data']

In [None]:
# @title Step 4: Reshape inputs for LSTM [samples, timesteps, features]
X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
print("Training data shape:", X_train.shape)
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])
print("Test data shape:", X_test.shape)

Training data shape: (124907, 1, 1)
Test data shape: (7807, 1, 1)


In [None]:
# @title Step 5: Define the autoencoder network model
def autoencoder_model(X):
    inputs = Input(shape=(X.shape[1], X.shape[2]))
    L1 = LSTM(16, activation='relu', return_sequences=True,
              kernel_regularizer=regularizers.l2(0.00))(inputs)
    L2 = LSTM(4, activation='relu', return_sequences=False)(L1)
    L3 = RepeatVector(X.shape[1])(L2)
    L4 = LSTM(4, activation='relu', return_sequences=True)(L3)
    L5 = LSTM(16, activation='relu', return_sequences=True)(L4)
    output = TimeDistributed(Dense(X.shape[2]))(L5)
    model = Model(inputs=inputs, outputs=output)
    return model

In [None]:
# @title Step 6: Create the autoencoder model
model = autoencoder_model(X_train)
model.compile(optimizer='adam', loss='mae')
model.summary()

In [None]:
# @title Step 7: Fit the model to the data
nb_epochs = 100
batch_size = 10
history = model.fit(X_train, X_train, epochs=nb_epochs, batch_size=batch_size,
                    validation_split=0.05).history

Epoch 1/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m62s[0m 5ms/step - loss: 0.0199 - val_loss: 0.0042
Epoch 2/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 5ms/step - loss: 0.0040 - val_loss: 0.0024
Epoch 3/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 5ms/step - loss: 0.0026 - val_loss: 0.0015
Epoch 4/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m55s[0m 5ms/step - loss: 0.0017 - val_loss: 9.2271e-04
Epoch 5/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 5ms/step - loss: 0.0016 - val_loss: 0.0014
Epoch 6/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 5ms/step - loss: 0.0013 - val_loss: 2.8245e-04
Epoch 7/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m57s[0m 5ms/step - loss: 0.0013 - val_loss: 0.0016
Epoch 8/100
[1m11867/11867[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 4ms/step - loss: 0.0013 - v

In [None]:
# @title Step 8: Plot the training losses
fig, ax = plt.subplots(figsize=(14, 6), dpi=80)
ax.plot(history['loss'], 'b', label='Train', linewidth=2)
ax.plot(history['val_loss'], 'r', label='Validation', linewidth=2)
ax.set_title('Model loss', fontsize=16)
ax.set_ylabel('Loss (mae)')
ax.set_xlabel('Epoch')
ax.legend(loc='upper right')
plt.show()

In [None]:
# @title Step 10: Plot the loss distribution of the training set
X_pred = model.predict(X_train)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
X_pred = pd.DataFrame(X_pred, columns=train_data.columns)
X_pred.index = train_data.index

scored = pd.DataFrame(index=train_data.index)
Xtrain = X_train.reshape(X_train.shape[0], X_train.shape[2])
scored['Loss_mae'] = np.mean(np.abs(X_pred-Xtrain), axis = 1)
plt.figure(figsize=(16,9), dpi=80)
plt.title('Loss Distribution', fontsize=16)
scored['Log_Loss_mae'] = np.log1p(scored['Loss_mae'])
sns.histplot(scored['Log_Loss_mae'], bins = 20, kde= True, color = 'blue');

In [None]:
# @title Step 10: Plot the loss distribution of the training set
X_pred = model.predict(X_test)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
X_pred = pd.DataFrame(X_pred, columns=test_data.columns)
X_pred.index = test_data.index

scored = pd.DataFrame(index=test_data.index)
Xtest = X_test.reshape(X_test.shape[0], X_test.shape[2])
scored['Loss_mae'] = np.mean(np.abs(X_pred-Xtest), axis = 1)
scored['Threshold'] = 0.275
scored['Anomaly'] = scored['Loss_mae'] > scored['Threshold']
scored.head()

In [None]:
# @title Loss_mae

from matplotlib import pyplot as plt
scored['Loss_mae'].plot(kind='hist', bins=20, title='Loss_mae')
plt.gca().spines[['top', 'right',]].set_visible(False)

In [None]:
# @title Step 11: Calculate the same metrics for the training set
# and merge all data in a single dataframe for plotting
X_pred_train = model.predict(X_train)
X_pred_train = X_pred_train.reshape(X_pred_train.shape[0], X_pred_train.shape[2])
X_pred_train = pd.DataFrame(X_pred_train, columns=train_data.columns)
X_pred_train.index = train_data.index

threshold = scored['Loss_mae'].quantile(0.99)

scored_train = pd.DataFrame(index=train_data.index)
scored_train['Loss_mae'] = np.mean(np.abs(X_pred_train-Xtrain), axis = 1)
scored_train['Threshold'] = threshold
scored_train['Anomaly'] = scored_train['Loss_mae'] > scored_train['Threshold']
scored = pd.concat([scored_train, scored])

In [None]:
# @title Step 12: Plot bearing failure time plot

plt.figure(figsize=(14, 8))
plt.plot(scored['Loss_mae'], label='Loss_mae', color='blue')
plt.axhline(y=threshold, color='red', linestyle='--', label='Threshold')
plt.scatter(scored.index[scored['Anomaly']], scored['Loss_mae'][scored['Anomaly']],
            color='orange', label='Anomalies')
plt.yscale('log')
plt.xlabel('Index')
plt.ylabel('Loss_mae')
plt.legend()
plt.title('Anomaly Detection based on Loss_mae')
plt.show()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

y_true = scored_train['Anomaly']  # Ground truth labels, 1 for anomaly, 0 for normal (if available)

# Predicted anomalies from the model
y_pred = scored_train['Anomaly'].astype(int)  # Convert boolean (True/False) to integer (1/0)

accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-Score: {f1:.4f}")