In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, ConvLSTM2D, BatchNormalization, TimeDistributed, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Nadam, Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import cv2
import os
from time import time
import pandas as pd

# Configuration
os.makedirs("model_comparison", exist_ok=True)
DATA_PATHS = {
    'UCSDped1': {'train': 'UCSDped1_train.npy', 'test': 'UCSDped1_test.npy'},
    'UCSDped2': {'train': 'UCSDped2_train.npy', 'test': 'UCSDped2_test.npy'}
}
SEQ_LENGTH = 10  # Augmenté pour meilleure capture temporelle
IMG_SIZE = (64, 64)
BATCH_SIZE = 16

# Modèles améliorés

def build_enhanced_cae()
    input_img = Input(shape=(*IMG_SIZE, 1))
    
    # Encoder avec régularisation
    x = Conv2D(32, (3, 3), activation='relu', padding='same', 
               kernel_regularizer=l2(0.001))(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same',
               kernel_regularizer=l2(0.001))(x)
    encoded = MaxPooling2D((2, 2), padding='same')(x)
    
    # Decoder
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(encoded)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(1, (3, 3), activation='sigmoid', padding='same')(x)
    
    model = Model(input_img, decoded)
    model.compile(optimizer=Nadam(learning_rate=0.0001), loss='mse')
    return model

def build_enhanced_convlstm():
    inputs = Input(shape=(SEQ_LENGTH, *IMG_SIZE, 1))
    
    # Encoder amélioré
    x = ConvLSTM2D(32, (3, 3), padding='same', return_sequences=True,
                  kernel_regularizer=l2(0.001))(inputs)
    x = BatchNormalization()(x)
    x = Dropout(0.2)(x)
    x = ConvLSTM2D(16, (3, 3), padding='same', return_sequences=True,
                  kernel_regularizer=l2(0.001))(x)
    
    # Decoder
    x = TimeDistributed(Conv2D(16, (3, 3), padding='same', activation='relu'))(x)
    outputs = TimeDistributed(Conv2D(1, (3, 3), padding='same', activation='sigmoid'))(x)
    
    model = Model(inputs, outputs)
    model.compile(optimizer=Adam(learning_rate=0.00001), loss='mse')
    return model

# Pipeline d'évaluation

def evaluate_model(model, X_train, X_test, model_name, dataset_name):
    history = model.fit(
        X_train, X_train,
        validation_split=0.2,
        epochs=50,
        batch_size=BATCH_SIZE,
        callbacks=[
            EarlyStopping(patience=5, monitor='val_loss', min_delta=0.001),
            ReduceLROnPlateau(factor=0.5, patience=3, min_lr=1e-6)
        ],
        verbose=0
    )
    
    # Calcul des erreurs
    reconstructions = model.predict(X_test, batch_size=BATCH_SIZE)
    errors = np.mean(np.square(X_test - reconstructions), axis=tuple(range(1, X_test.ndim)))
    threshold = np.percentile(errors, 90)  # Seuil à 90% pour plus de sensibilité
    
    return {
        'history': history,
        'errors': errors,
        'threshold': threshold,
        'anomaly_rate': np.mean(errors > threshold) * 100
    }

# Visualisation comparative

def plot_comparison(results):
    plt.figure(figsize=(15, 6))
    
    # Courbes d'apprentissage
    plt.subplot(1, 2, 1)
    for model in results:
        plt.plot(model['history'].history['val_loss'], 
                label=f"{model['name']} (final={model['history'].history['val_loss'][-1]:.4f})")
    plt.title('Comparaison des Loss de Validation')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    # Taux d'anomalies
    plt.subplot(1, 2, 2)
    model_names = [m['name'] for m in results]
    anomaly_rates = [m['anomaly_rate'] for m in results]
    plt.bar(model_names, anomaly_rates, color=['blue', 'orange'])
    for i, v in enumerate(anomaly_rates):
        plt.text(i, v, f"{v:.2f}%", ha='center')
    plt.title('Taux de Détection d\'Anomalies')
    plt.ylabel('Pourcentage')
    
    plt.tight_layout()
    plt.savefig("model_comparison/comparison_results.png")
    plt.close()

# Execution principale

def main():
    dataset_name = 'UCSDped1'  # À changer pour UCSDped2 si besoin
    train_data = np.load(DATA_PATHS[dataset_name]['train'], mmap_mode='r')[:200]
    test_data = np.load(DATA_PATHS[dataset_name]['test'], mmap_mode='r')[:50]
    
    # Préparation des données
    def preprocess(frames):
        processed = []
        for frame in frames:
            frame = cv2.resize(frame.astype('float32'), IMG_SIZE) / 255.0
            processed.append(frame[..., np.newaxis])
        return np.array(processed)
    
    # CAE
    X_train_cae = preprocess(train_data)
    X_test_cae = preprocess(test_data)
    cae = build_enhanced_cae()
    cae_results = evaluate_model(cae, X_train_cae, X_test_cae, 'CAE', dataset_name)
    cae_results['name'] = 'CAE'
    
    # ConvLSTM
    def create_sequences(frames):
        return np.array([frames[i:i+SEQ_LENGTH] for i in range(len(frames)-SEQ_LENGTH+1)])
    
    X_train_lstm = create_sequences(preprocess(train_data))
    X_test_lstm = create_sequences(preprocess(test_data))
    convlstm = build_enhanced_convlstm()
    lstm_results = evaluate_model(convlstm, X_train_lstm, X_test_lstm, 'ConvLSTM', dataset_name)
    lstm_results['name'] = 'ConvLSTM'
    
    # Visualisation
    plot_comparison([cae_results, lstm_results])
    
    # Sauvegarde des résultats
    pd.DataFrame({
        'Model': ['CAE', 'ConvLSTM'],
        'Validation Loss': [cae_results['history'].history['val_loss'][-1], 
                          lstm_results['history'].history['val_loss'][-1]],
        'Anomaly Rate (%)': [cae_results['anomaly_rate'], 
                           lstm_results['anomaly_rate']]
    }).to_csv("model_comparison/metrics.csv", index=False)

if __name__ == "__main__":
    main()

[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 61ms/step
