In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_curve
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow.keras.backend as K
import tensorflow as tf
from IPython.display import clear_output

In [None]:
# load the data from the compressed file
loaded_Data = np.load('Final.npz',allow_pickle=True)

# create a new DataFrame from the loaded data
colData=pd.DataFrame(loaded_Data)
Data = pd.DataFrame({colData[0][col]: loaded_Data[colData[0][col]] for col in range(0, len(colData))})
Data

In [None]:
#Exemple for choosing number of samples
samples=50
df=pd.DataFrame((Data['Device'].value_counts()>=samples).value_counts())
df

In [None]:
# Siamese architecture: shared feature extractor
def create_base_network(input_dim):
    input = Input(shape=(input_dim,))
    x = Dense(128, activation='relu')(input)
    x = Dense(64, activation='relu')(x)
    x = Dense(32, activation='relu')(x)
    return Model(input, x)

# Distance function
def euclidean_distance(vects):
    x, y = vects
    return K.sqrt(K.sum(K.square(x - y), axis=1, keepdims=True))

def contrastive_loss(y_true, y_pred):
    margin = 1
    return K.mean(y_true * K.square(y_pred) + (1 - y_true) * K.square(K.maximum(margin - y_pred, 0)))

# Prepare pairs of data
def create_pairs(data, labels):
    pairs, targets = [], []
    label_dict = {label: data[labels == label] for label in np.unique(labels)}
    for label in label_dict:
        examples = label_dict[label]
        for i in range(len(examples) - 1):
            pairs += [[examples[i], examples[i + 1]]]
            targets += [1]
            neg_label = np.random.choice([l for l in label_dict if l != label])
            neg_example = label_dict[neg_label][np.random.randint(len(label_dict[neg_label]))]
            pairs += [[examples[i], neg_example]]
            targets += [0]
    return np.array(pairs), np.array(targets)
minn=50
while minn>0.2 :
    # Efface la sortie précédente
    clear_output(wait=True)
    
    # Load and prepare data
    array_EER = []
    device_list = pd.DataFrame(Data['Device'].unique())
    device_data = {}
    Train = pd.DataFrame(columns=Data.columns)
    Test = pd.DataFrame(columns=Data.columns)
    
    for i in range(len(device_list)):
        dev_id = device_list.iloc[i, 0]
        if (Data['Device'] == dev_id).sum() >= samples:
            df = Data[Data['Device'] == dev_id].head(samples)
            split_idx = int(samples * 0.9)
            dfTrain, dfTest = df.iloc[:split_idx], df.iloc[split_idx:]
            Train = pd.concat([Train, dfTrain], ignore_index=True)
            Test = pd.concat([Test, dfTest], ignore_index=True)
    
    scaler = StandardScaler()
    X_train_all = scaler.fit_transform(Train.drop('Device', axis=1).values)
    y_train_all = Train['Device'].values
    
    # Create Siamese network
    input_dim = X_train_all.shape[1]
    base_network = create_base_network(input_dim)
    
    input_a = Input(shape=(input_dim,))
    input_b = Input(shape=(input_dim,))
    processed_a = base_network(input_a)
    processed_b = base_network(input_b)
    distance = Lambda(euclidean_distance)([processed_a, processed_b])
    siamese_net = Model([input_a, input_b], distance)
    siamese_net.compile(loss=contrastive_loss, optimizer=Adam(1e-3))
    
    # Generate training pairs
    pairs, targets = create_pairs(X_train_all, y_train_all)
    
    # Train the Siamese network
    callbacks = [
        EarlyStopping(monitor='loss', patience=5, verbose=1),
        ModelCheckpoint('New_best_siamese_CLEA50.keras', save_best_only=True, monitor='loss', verbose=1)
    ]
    siamese_net.fit([pairs[:, 0], pairs[:, 1]], targets, batch_size=64, epochs=20, callbacks=callbacks)
    
    # EER computation per device
    devices = Train['Device'].unique()

    Rep=0
    Count=0
    for device in devices:
        X_test = scaler.transform(Test[Test['Device'] == device].drop('Device', axis=1).values)
        X_imposter = scaler.transform(Test[Test['Device'] != device].drop('Device', axis=1).values)
        X_train = scaler.transform(Train[Train['Device'] == device].drop('Device', axis=1).values)
        ref = X_train[:1]
    
        dist_genuine = siamese_net.predict([np.repeat(ref, len(X_test), axis=0), X_test]).flatten()
        dist_imposter = siamese_net.predict([np.repeat(ref, len(X_imposter), axis=0), X_imposter]).flatten()

        # Sweep through a range of threshold values
        mn = min(dist_genuine)
        mx = max(dist_genuine)
        thresholds = np.linspace(mn, mx, 100)  # Use np.linspace to avoid creating too large an array
        cl = 30
        distance = len(dist_genuine) + len(dist_imposter)
        for threshold in thresholds:
            # Create a binary array indicating whether each data point is an anomaly or not
            y_predI = np.where(dist_imposter > threshold, 0, 1)
            y_predO = np.where(dist_genuine > threshold, 0, 1)
            impo = 0
            while impo < len(dist_imposter):
                confidence = 60
                for j in range(impo, impo + len(dist_genuine)):
                    if confidence > 0 and confidence > cl:
                        if y_predI[j] == 0:
                            confidence -= 10
                        else:
                            if confidence < 100:
                                confidence += 5
                    else:
                        y_predI[j] = 0
                impo += len(dist_genuine)
            confidence = 60
            for j in range(len(dist_genuine)):
                if confidence > 0 and confidence > cl:
                    if y_predO[j] == 0:
                        confidence -= 10
                    else:
                        if confidence < 100:
                            confidence += 5
                else:
                    y_predO[j] = 0
                    
        y_true = np.concatenate([y_predO, y_predI])
        y_scores = np.concatenate([dist_genuine, dist_imposter])

        if len(np.unique(y_true)) < 2:
            print("Warning: y_true has only one class, skipping EER computation.")
            eer=0.5
        else:
            fpr, tpr, thresholds = roc_curve(y_true, -y_scores)
            fnr = 1 - tpr
            eer_index = np.nanargmin(np.abs(fpr - fnr))
            eer = (fpr[eer_index] + fnr[eer_index]) / 2
        
        array_EER.append(round(eer * 100, 2))
        print(round(eer * 100, 2))
        eer_percent = round(eer * 100, 2)
        Rep=Rep+1
        if eer_percent==50.0:
            Count=Count+1

        # Sauvegarde temporaire
        device_data[device] = {
            "eer": eer_percent,
            "X_train": X_train,
            "X_test": X_test,
            "X_imposter": X_imposter
        }
        
        # ceci sort de la boucle for, mais pas du while
        if Rep==5 and Count==5:
            print("Break !!!")
            break
    

    # Sélection du meilleur device (min EER)
    best_device = min(device_data, key=lambda d: device_data[d]["eer"])
    best = device_data[best_device]
    
    # Sauvegarde dans .npz
    np.savez("New_best_device_data_CLEA50.npz",
             X_train=best["X_train"],
             X_test=best["X_test"],
             X_imposter=best["X_imposter"],
             device=best_device)
    
    # Summary
    print('EERs for all devices:', array_EER)
    print('Min EER:', min(array_EER))
    print('Max EER:', max(array_EER))
    minn=min(array_EER)