In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

In [None]:
# load the data from the compressed file
loaded_Data = np.load('Final.npz',allow_pickle=True)

# create a new DataFrame from the loaded data
colData=pd.DataFrame(loaded_Data)
Data = pd.DataFrame({colData[0][col]: loaded_Data[colData[0][col]] for col in range(0, len(colData))})
Data

In [None]:
#Exemple for choosing number of samples
samples=50
df=pd.DataFrame((Data['Device'].value_counts()>=samples).value_counts())
df

In [None]:


# Define an autoencoder model using TensorFlow/Keras
def create_autoencoder(input_dim):
    input_layer = Input(shape=(input_dim,))
    encoded = Dense(64, activation='relu')(input_layer)
    encoded = Dense(32, activation='relu')(encoded)
    encoded = Dense(16, activation='relu')(encoded)
    decoded = Dense(32, activation='relu')(encoded)
    decoded = Dense(64, activation='relu')(decoded)
    decoded = Dense(input_dim, activation='sigmoid')(decoded)
    autoencoder = Model(input_layer, decoded)
    encoder = Model(input_layer, encoded)
    autoencoder.compile(optimizer=Adam(), loss='mse')
    return autoencoder, encoder

# Function to train the autoencoder
def train_autoencoder(X, input_dim, epochs=100, batch_size=32):
    autoencoder, encoder = create_autoencoder(input_dim)
    
    # Add EarlyStopping and ModelCheckpoint callbacks
    callbacks = [
        EarlyStopping(monitor='val_loss', patience=10, verbose=1),
        ModelCheckpoint(filepath='best_autoencoder.keras', monitor='val_loss', save_best_only=True, verbose=1)
    ]
    
    autoencoder.fit(X, X, epochs=epochs, batch_size=batch_size, shuffle=False, validation_split=0.2, callbacks=callbacks)
    # Load the best model
    autoencoder.load_weights('best_autoencoder.keras')
    return autoencoder, encoder

array_EER = []
device = pd.DataFrame(Data['Device'].unique())
Train = pd.DataFrame(columns=['HTL','VTL','Slope','MSE','MAE','MedAE','CoD','HA','VA','HMP','VMP',
                              'Device'])
Test = pd.DataFrame(columns=['HTL','VTL','Slope','MSE','MAE','MedAE','CoD','HA','VA','HMP','VMP',
                             'Device'])

for i in range(len(device)):
    total = len(Data[Data['Device'] == device[0][i]])
    if(total>=10):
        dfTrain = Data[Data['Device'] == device[0][i]].head(int(total-total*0.1))
        dfTest = Data[Data['Device'] == device[0][i]].iloc[int(total-total*0.1):total]
        Train = pd.concat([Train, dfTrain], ignore_index=True)
        Test = pd.concat([Test, dfTest], ignore_index=True)
devices=Train['Device'].unique()

# Pre-train the autoencoder on the entire training data
train_data = Train.drop('Device', axis=1).values
scaler = StandardScaler()
train_data = scaler.fit_transform(train_data)
input_dim = train_data.shape[1]

autoencoder, encoder = train_autoencoder(train_data, input_dim, epochs=50)

for device in devices:
    # Train
    X_Train = Train[Train['Device'] == device]
    X_Train = X_Train.drop('Device', axis=1)
    X_train = scaler.transform(X_Train)
    X_train_encoded = encoder.predict(X_train)
    X_train_decoded = autoencoder.predict(X_train)

    # Test
    X_Test = Test[Test['Device'] == device]
    X_Test = X_Test.drop('Device', axis=1)
    X_test = scaler.transform(X_Test)
    X_test_encoded = encoder.predict(X_test)
    X_test_decoded = autoencoder.predict(X_test)

    # Imposter
    X_Imposter = pd.DataFrame(columns=['HTL','VTL','Slope','MSE','MAE','MedAE','CoD','HA','VA','HMP',
                                       'VMP','Device'])
    for device_imposter in devices:
        if device_imposter != device:
            X_Imposter = pd.concat([X_Imposter, Test[Test['Device'] == device_imposter]],
                                   ignore_index=True)
    X_Imposter = X_Imposter.drop('Device', axis=1)
    X_imposter = scaler.transform(X_Imposter)
    X_imposter_encoded = encoder.predict(X_imposter)
    X_imposter_decoded = autoencoder.predict(X_imposter)

    # Calculate reconstruction errors
    reconstruction_error_test = np.mean(np.square(X_test - X_test_decoded), axis=1)
    reconstruction_error_imposter = np.mean(np.square(X_imposter - X_imposter_decoded), axis=1)

    # Sweep through a range of threshold values
    mn = min(reconstruction_error_test)
    mx = max(reconstruction_error_test)
    thresholds = np.linspace(mn, mx, 100)  # Use np.linspace to avoid creating too large an array
    FAR = np.zeros_like(thresholds)
    FRR = np.zeros_like(thresholds)
    pos = 0
    EER = 0
    i = 0
    cl = 30
    distance = len(reconstruction_error_test) + len(reconstruction_error_imposter)
    for threshold in thresholds:
        # Create a binary array indicating whether each data point is an anomaly or not
        y_predI = np.where(reconstruction_error_imposter > threshold, -1, 1)
        y_predO = np.where(reconstruction_error_test > threshold, -1, 1)
        
        FAR[i] = len(y_predI[y_predI == 1]) / len(reconstruction_error_imposter)
        FRR[i] = len(y_predO[y_predO == -1]) / len(reconstruction_error_test)
        if abs(FAR[i] - FRR[i]) < distance:
            distance = abs(FAR[i] - FRR[i])
            pos = threshold
            EER = (FRR[i] + FAR[i]) / 2
            array_EER.append(round(EER * 100, 2))
        i += 1
    
    print('EER :', EER * 100)
    # Plot the FAR and FRR curves
    plt.plot(thresholds, FAR, label='FAR')
    plt.plot(thresholds, FRR, label='FRR')
    plt.plot(pos, EER, 'ro', label='EER')
    plt.xlabel('Threshold')
    plt.ylabel('Error rate')
    plt.legend()
    plt.show()
    
    # Plot the ROC curve
    plt.plot(FAR, 1 - np.array(FRR))
    plt.xlabel('False Acceptance Rate (FAR)')
    plt.ylabel('True Positive Rate (TPR)')
    plt.title('ROC Curve')
    plt.show()
