<a href="https://colab.research.google.com/github/MarkDC95/anomaly-detection-honours-project/blob/main/results.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Import libraries
#import Helper as hp
import IPython.display as ipd
import tensorflow as tf
# import tensorflow_io as tfio
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

# Import librosa after pandas and numpy
import librosa
import librosa.display

import os
import sys
import warnings
warnings.filterwarnings('ignore')

from IPython.display import display
from IPython.display import Audio
import random

from sklearn.metrics import roc_curve, auc
from sklearn.metrics import confusion_matrix #plot_confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from tensorflow.keras.layers import Input, Conv2D, BatchNormalization, Activation, Flatten, Dense,Reshape, Conv2DTranspose
from tensorflow.keras import Model
from keras.backend import int_shape
from tensorflow.keras.callbacks import EarlyStopping

In [None]:
def get_model(inputDim, latentDim):
    """
    define the keras model
    the model based on the simple convolutional auto encoder
    """
    input_img = Input(shape=(inputDim[0], inputDim[1], 1))

    # encoder
    x = Conv2D(256, (5, 5),strides=(1,2), padding='same')(input_img)   #32x128 -> 32x64
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(128, (5, 5),strides=(1,2), padding='same')(x)           #32x32
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(64, (5, 5),strides=(2,2), padding='same')(x)          #16x16
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    volumeSize = int_shape(x)
    # at this point the representation size is latentDim i.e. latentDim-dimensional
    x = Conv2D(latentDim, (4,4), strides=(1,1), padding='same')(x)
    encoded = Flatten()(x)


    # decoder
    x = Dense(volumeSize[1] * volumeSize[2] * volumeSize[3])(encoded)
    x = Reshape((volumeSize[1], volumeSize[2], 64))(x)                #4x4

    x = Conv2DTranspose(128, (5, 5),strides=(2,2), padding='same')(x)   #32x32
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2DTranspose(256, (5, 5),strides=(1,2), padding='same')(x)   #32x64
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    decoded = Conv2DTranspose(1, (5, 5),strides=(1,2), padding='same')(x)

    return Model(inputs=input_img, outputs=decoded)

In [None]:
#AutoEncoder model base purohitt
def autoencoder(input_shape):
    # Encoder
    inputs = tf.keras.Input(shape=input_shape)
    x = tf.keras.layers.Flatten()(inputs)

    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    encoded = tf.keras.layers.Dense(8)(x)

    # Decoder
    x = tf.keras.layers.Dense(512)(encoded)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Dense(512)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.Activation('relu')(x)

    x = tf.keras.layers.Dense(np.prod(input_shape))(x)
    decoded = tf.keras.layers.Reshape(input_shape)(x)
    # Autoencoder
    autoencoder_model = tf.keras.models.Model(inputs, decoded)
    return autoencoder_model


In [None]:
# Get the CAE model
cae_model = get_model(inputDim = (128, 32), latentDim=32)  # You can customize the latent dimension
early_stopping = EarlyStopping(monitor='val_loss', patience=5, min_delta=0.001)

# Compile the model
learning_rate = 0.01
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

# Compile the model
cae_model.compile(optimizer=optimizer, loss='mean_squared_error')  # Using MSE as the loss function

In [None]:
ids = ['fan_id_00','fan_id_02','fan_id_04','fan_id_06']#['pump_id_00','pump_id_02','pump_id_04','pump_id_06']
random_states = [42, 52, 62]
snr = ['-6 dB', '0 dB', '6 dB']
# Create an empty DataFrame to store the results
for idx in ids:
    results_df = pd.DataFrame(columns=[
        "Machine Type",
        "Machine Number",
        "SNR",
        "Random State Run Number",
        "ROC-AUC",
        "Accuracy",
        "Precision",
        "Recall",
        "F1-Score"
    ])
    machine_type = 'Fan'#'Pump'
    id = idx

    for noise in snr:
        for j in random_states:
            random_state = j
            tf.random.set_seed= random_state
            np.random.seed = random_state

            file_path = fr"/content/drive/MyDrive/cnn preprocess/{id}/{noise}/{str(j)}/"
            X_train =np.load(fr"{file_path}X_train.npy")
            X_test =np.load(fr"{file_path}X_test.npy")
            ground_truth_file = np.load(fr"{file_path}ground_truth.npy")

            # Create the autoencoder model
            input_shape = (128, 32)

            # Reshape the data to (650*9, 128, 32, 1) for training
            X_train_reshaped = X_train.reshape(-1, np.prod(input_shape))
            X_test_reshaped = X_test.reshape(-1, np.prod(input_shape))



            autoencoder_model = autoencoder((np.prod(input_shape),))

            # Compile the model
            learning_rate = 0.01
            optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
            autoencoder_model.compile(optimizer=optimizer, loss='mse')

            # Train the model on the segmented data
            history=autoencoder_model.fit(X_train.reshape(-1, np.prod(input_shape)), X_train.reshape(-1, np.prod(input_shape)),
                                            epochs=15,
                                            batch_size=32,
                                            shuffle=True,
                                            validation_split=0.1)

            # Make predictions on the test data
            reconstructed_data = autoencoder_model.predict(X_test_reshaped)
            # Calculate the reconstruction error for each sample
            mse = np.mean(np.square(X_test_reshaped - reconstructed_data), axis=1)
            ground_truth = list(ground_truth_file)

            # MSE of entire concatenated sample
            outcome_arr = []
            for i in range(len(mse)):
                if i % 9 == 0:
                    outcome_arr.append(np.average(mse[i:i+9]))
            print(f" the {len(outcome_arr)} samples have been concatenated")

            # ROC AUC SCORE
            fpr, tpr, thresholds = roc_curve(ground_truth, outcome_arr)
            roc_auc = auc(fpr, tpr)

            # get the best threshold using Youdens Index
            J = tpr - fpr
            ix = np.argmax(J)
            best_thresh = thresholds[ix]
            print('Best Threshold= %f' % (best_thresh))

            outcome = [0 if i < best_thresh else 1 for i in outcome_arr]

            y_pred = outcome

            accuracy = accuracy_score(ground_truth, y_pred)
            precision = precision_score(ground_truth, y_pred)
            recall = recall_score(ground_truth, y_pred)
            f1 = f1_score(ground_truth, y_pred)

            # Open the text file in append mode
                    # Append the results to the DataFrame
            results_df = results_df.append({
                "Machine Type": machine_type,
                "Machine Number": id,
                "SNR": noise,
                "Random State Run Number": random_state,
                "ROC-AUC": roc_auc,
                "Accuracy": accuracy,
                "Precision": precision,
                "Recall": recall,
                "F1-Score": f1
            }, ignore_index=True)

            #Loss Curve
            plt.figure()
            plt.plot(history.history['loss'], lw=2, color='darkblue')
            plt.plot(history.history['val_loss'], lw=2, color='darkorange')
            plt.title('Autoencoder Loss for DAE model')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.legend(['Train', 'Validation'], loc='upper right')
            file_path = f"/content/drive/MyDrive/DAE Results/Loss Curves/{id} {noise} {random_state} Loss Curve.png"
            plt.savefig(file_path)
            plt.close()


            #mse curve
            halfway_point = len(y_pred)/2  # enter halfway point
            plt.figure(figsize=(15, 5))
            # Plot the entire data
            # Create discrete x-values for the bar plot
            x_values = np.arange(len(y_pred))
            # Plot the entire data as discrete bars
            plt.scatter(x_values, outcome_arr, marker='o', label='MSE', s=5, color='black')
            # plt.plot(outcome_arr, )
            # Highlight the right half with a different background color
            plt.axhline(y=best_thresh, color='r', linestyle='-', label=f'threshold = {best_thresh:.4f}')
            plt.axvspan(halfway_point, len(outcome_arr), facecolor='green', alpha=0.2, label='Abnormal Test Data')
            plt.xlabel("Audio sample")
            plt.ylabel("MSE")
            plt.title("MSE of Test Data")
            plt.axvspan(0, halfway_point, facecolor='white', alpha=0.1, label='Normal Test Data')
            plt.legend()  # Add a legend to label the plot elements
            file_path = f"/content/drive/MyDrive/DAE Results/MSE Graphs/{id} {noise} {random_state} MSE Graph.png"
            plt.savefig(file_path)
            plt.close()

            #ROC Plot
            plt.figure()
            lw = 2
            plt.plot(fpr, tpr, color='darkorange',
                    lw=lw, label='ROC curve (area = %0.3f)' % roc_auc)
            plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title('Receiver operating characteristic')
            plt.legend(loc="lower right")
            file_path = f"/content/drive/MyDrive/DAE Results/ROC Curves/{id} {noise} {random_state} ROC Curve.png"
            plt.savefig(file_path)
            plt.close()

            # confusion matrix
            conf_mat = confusion_matrix(ground_truth, outcome)
            plt.figure()
            sns.heatmap(conf_mat, cmap="Greens", annot=True, cbar_kws={"label": "Count"}, fmt='d',
                        xticklabels=[0, 1], yticklabels=[0, 1])
            plt.xlabel("Predicted Values")
            plt.ylabel("Actual Values")
            plt.title("Confusion Matrix")
            file_path = f"/content/drive/MyDrive/DAE Results/Confusion Matrices/{id} {noise} {random_state} confusion_matrix.png"
            plt.savefig(file_path)
            plt.close()

            model_path = f"/content/drive/MyDrive/DAE Results/Models/{id} {noise} {random_state} model"
            autoencoder_model.save(model_path)

            print(f"{id} {noise} {random_state} completed")



    csv_file_path = f'/content/drive/MyDrive/DAE Results/Result Metrics/results_{id}_cnn.csv'
    #Save the DataFrame to a CSV file
    results_df.to_csv(csv_file_path, index=False)



Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
 the 650 samples have been concatenated
Best Threshold= 4.297944
fan_id_00 -6 dB 42 completed
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
 the 650 samples have been concatenated
Best Threshold= 4.429513
fan_id_00 -6 dB 52 completed
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
 the 650 samples have been concatenated
Best Threshold= 5.739589
fan_id_00 -6 dB 62 completed
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
 the 650 samples have been concate

# CAE Model

In [None]:
ids =['fan_id_00','fan_id_02','fan_id_04','fan_id_06']
#Change the id to any of the following series
#['pump_id_00','pump_id_02','pump_id_04','pump_id_06']
#['fan_id_00','fan_id_02','fan_id_04','fan_id_06']#
random_states = [42, 52, 62]
snr = ['-6 dB', '0 dB', '6 dB']
# Create an empty DataFrame to store the results
for idx in ids:
    results_df = pd.DataFrame(columns=[
        "Machine Type",
        "Machine Number",
        "SNR",
        "Random State Run Number",
        "ROC-AUC",
        "Accuracy",
        "Precision",
        "Recall",
        "F1-Score"
    ])
    machine_type = 'Fan'#'Pump'
    id = idx

    for noise in snr:
        for j in random_states:
            random_state = j
            tf.random.set_seed= random_state
            np.random.seed = random_state

            file_path = fr"/content/drive/MyDrive/cnn preprocess/{id}/{noise}/{str(j)}/"
            X_train =np.load(fr"{file_path}X_train.npy")
            X_test =np.load(fr"{file_path}X_test.npy")
            ground_truth_file = np.load(fr"{file_path}ground_truth.npy")

            # Reshape the data to (650*9, 128, 32, 1) for training
            X_train_reshaped = X_train.reshape(-1, 128, 32, 1)
            X_test_reshaped = X_test.reshape(-1,128,32,1)


            #####################
            # Get the CAE model
            cae_model = get_model(inputDim = (128, 32), latentDim=32)  # You can customize the latent dimension
            early_stopping = EarlyStopping(monitor='val_loss', patience=5, min_delta=0.001)

            # Compile the model
            learning_rate = 0.001
            optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

            # Compile the model
            cae_model.compile(optimizer=optimizer, loss='mean_squared_error')  # Using MSE as the loss function

            # Train the model
            epochs = 25  # You can adjust the number of training epochs
            batch_size = 32  # You can adjust the batch size

            # Use the fit method to train the model
            history = cae_model.fit(X_train_reshaped,
                                    X_train_reshaped,
                                    epochs=epochs,
                                    verbose=0,
                                    batch_size=batch_size,
                                    validation_split=0.2,
                                    callbacks=[early_stopping])

            # Make predictions on the test data
            reconstructed_data = cae_model.predict(X_test_reshaped)
            # Calculate the reconstruction error for each sample
            mse = np.mean(np.square(X_test_reshaped - reconstructed_data), axis=1)
            ground_truth = list(ground_truth_file)

            # MSE of entire concatenated sample
            outcome_arr = []
            for i in range(len(mse)):
                if i % 9 == 0:
                    outcome_arr.append(np.average(mse[i:i+9]))
            print(f" the {len(outcome_arr)} samples have been concatenated")

            # ROC AUC SCORE
            fpr, tpr, thresholds = roc_curve(ground_truth, outcome_arr)
            roc_auc = auc(fpr, tpr)

            # get the best threshold using Youdens Index
            J = tpr - fpr
            ix = np.argmax(J)
            best_thresh = thresholds[ix]
            print('Best Threshold= %f' % (best_thresh))

            outcome = [0 if i < best_thresh else 1 for i in outcome_arr]

            y_pred = outcome

            accuracy = accuracy_score(ground_truth, y_pred)
            precision = precision_score(ground_truth, y_pred)
            recall = recall_score(ground_truth, y_pred)
            f1 = f1_score(ground_truth, y_pred)

            # Open the text file in append mode
                    # Append the results to the DataFrame
            results_df = results_df.append({
                "Machine Type": machine_type,
                "Machine Number": id,
                "SNR": noise,
                "Random State Run Number": random_state,
                "ROC-AUC": roc_auc,
                "Accuracy": accuracy,
                "Precision": precision,
                "Recall": recall,
                "F1-Score": f1
            }, ignore_index=True)

            #Loss Curve
            plt.figure()
            plt.plot(history.history['loss'], lw=2, color='darkblue')
            plt.plot(history.history['val_loss'], lw=2, color='darkorange')
            plt.title('Autoencoder Loss for CAE model')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.legend(['Train', 'Validation'], loc='upper right')
            file_path = f"/content/drive/MyDrive/CAE Results/Loss Curves/{id} {noise} {random_state} Loss Curve.png"
            plt.savefig(file_path)
            plt.close()


            #mse curve
            halfway_point = len(mse)/2  # enter halfway point
            plt.figure(figsize=(15, 5))
            # Plot the entire data
            # Create discrete x-values for the bar plot
            x_values = np.arange(len(outcome_arr))
            # Plot the entire data as discrete bars
            plt.scatter(x_values, outcome_arr, marker='o', label='MSE', s=5, color='black')
            # plt.plot(outcome_arr, )
            # Highlight the right half with a different background color
            plt.axhline(y=best_thresh, color='r', linestyle='-', label=f'threshold = {best_thresh:.4f}')
            plt.axvspan(halfway_point, len(outcome_arr), facecolor='green', alpha=0.2, label='Abnormal Test Data')
            plt.xlabel("Audio sample")
            plt.ylabel("MSE")
            plt.title("MSE of Test Data")
            plt.axvspan(0, halfway_point, facecolor='white', alpha=0.1, label='Normal Test Data')
            plt.legend()  # Add a legend to label the plot elements
            file_path = f"/content/drive/MyDrive/CAE Results/MSE Graphs/{id} {noise} {random_state} MSE Graph.png"
            plt.savefig(file_path)
            plt.close()

            #ROC Plot
            plt.figure()
            lw = 2
            plt.plot(fpr, tpr, color='darkorange',
                    lw=lw, label='ROC curve (area = %0.3f)' % roc_auc)
            plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title('Receiver operating characteristic')
            plt.legend(loc="lower right")
            file_path = f"/content/drive/MyDrive/CAE Results/ROC Curves/{id} {noise} {random_state} ROC Curve.png"
            plt.savefig(file_path)
            plt.close()

            # confusion matrix
            conf_mat = confusion_matrix(ground_truth, outcome)
            plt.figure()
            sns.heatmap(conf_mat, cmap="Greens", annot=True, cbar_kws={"label": "Count"}, fmt='d',
                        xticklabels=[0, 1], yticklabels=[0, 1])
            plt.xlabel("Predicted Values")
            plt.ylabel("Actual Values")
            plt.title("Confusion Matrix")
            file_path = f"/content/drive/MyDrive/CAE Results/Confusion Matrices/{id} {noise} {random_state} confusion_matrix.png"
            plt.savefig(file_path)
            plt.close()

            model_path = f"/content/drive/MyDrive/CAE Results/Models/{id} {noise} {random_state} model"
            cae_model.save(model_path)

            print(f"{id} {noise} {random_state} completed")



    csv_file_path = f'/content/drive/MyDrive/CAE Results/Result Metrics/results_{id}_cnn.csv'
    #Save the DataFrame to a CSV file
    results_df.to_csv(csv_file_path, index=False)

#print(f"Results saved to {csv_file_path}")


 the 650 samples have been concatenated
Best Threshold= 2.142015
fan_id_00 -6 dB 42 completed
 the 650 samples have been concatenated
Best Threshold= 3.700976
fan_id_00 -6 dB 52 completed
 the 650 samples have been concatenated
Best Threshold= 2.767649
fan_id_00 -6 dB 62 completed
 the 650 samples have been concatenated
Best Threshold= 1.900491
fan_id_00 0 dB 42 completed
 the 650 samples have been concatenated
Best Threshold= 2.793451
fan_id_00 0 dB 52 completed
 the 650 samples have been concatenated
Best Threshold= 2.389297
fan_id_00 0 dB 62 completed
 the 650 samples have been concatenated
Best Threshold= 2.394622
fan_id_00 6 dB 42 completed
 the 650 samples have been concatenated
Best Threshold= 4.814653
fan_id_00 6 dB 52 completed
 the 650 samples have been concatenated
Best Threshold= 3.859504
fan_id_00 6 dB 62 completed
 the 650 samples have been concatenated
Best Threshold= 1.415458
fan_id_02 -6 dB 42 completed
 the 650 samples have been concatenated
Best Threshold= 1.420355
f