In [9]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, losses
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_curve, auc
from utils.preprocess_data_KDD_plus_test import preprocess_data
from utils.plot import plot_roc_curve, plot_combined_roc_curve, save_loss_curve, save_combined_loss_curve

In [10]:
# Preprocess data
train_file_path = os.path.join('data', 'KDDTrain+.csv')
test_file_path = os.path.join('data', 'KDDTest+.csv')
preprocessing_models_folder = os.path.join('preprocessing_pipeline')
X_train, X_test, y_train, y_test = preprocess_data(
    train_file_path=train_file_path,
    test_file_path=test_file_path,
    preprocessing_models_folder=preprocessing_models_folder
)

# Set output folder
current_directory = os.path.abspath(os.getcwd())
output_folder = os.path.join(current_directory, "autoencoder")

In [11]:
def build_autoencoder(input_shape):
    """Builds a simple Autoencoder model."""
    input_layer = tf.keras.layers.Input(shape=input_shape)
    encoded = tf.keras.layers.Dense(64, activation='relu')(input_layer)
    encoded = tf.keras.layers.Dense(32, activation='relu')(encoded)
    decoded = tf.keras.layers.Dense(64, activation='relu')(encoded)
    decoded = tf.keras.layers.Dense(input_shape[0], activation='sigmoid')(decoded)
    
    autoencoder = tf.keras.models.Model(input_layer, decoded)
    autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
    return autoencoder

In [12]:

def autoencoder_kfold(X, k=5, epochs=10, batch_size=256, output_folder=output_folder):
    """Trains Autoencoder using K-Fold cross-validation, generates ROC curves, and saves combined loss curve."""
    os.makedirs(output_folder, exist_ok=True)
    kf = KFold(n_splits=k, shuffle=True, random_state=42)

    all_fpr = []
    all_tpr = []
    all_auc = []
    all_fold_losses = []
    fold_index = 0

    for train_index, val_index in kf.split(X):
        print(f"running fold-{fold_index + 1}")
        X_train_fold, X_val_fold = X[train_index], X[val_index]

        # Build and train the Autoencoder model
        input_shape = (X_train_fold.shape[1],)
        autoencoder = build_autoencoder(input_shape)

        # Store training loss history for each fold
        history = autoencoder.fit(X_train_fold, X_train_fold, epochs=epochs, batch_size=batch_size, shuffle=True, validation_data=(X_val_fold, X_val_fold), verbose=0)

        # Collect losses from the current fold
        all_fold_losses.append(history.history['loss'])

        # Predict reconstruction error for validation data
        X_val_reconstructed = autoencoder.predict(X_val_fold)
        reconstruction_error = np.mean(np.square(X_val_fold - X_val_reconstructed), axis=1)

        # Compute ROC curve and AUC for the current fold
        fpr, tpr, _ = roc_curve(y_train[val_index], reconstruction_error)
        roc_auc = auc(fpr, tpr)
        all_fpr.append(fpr)
        all_tpr.append(tpr)
        all_auc.append(roc_auc)

        # Plot ROC curve for the current fold
        plot_roc_curve(fpr, tpr, roc_auc, f'fold-{fold_index+1}', output_folder)

        print(f"fold-{fold_index + 1} complete")
        fold_index += 1

    # Plot combined ROC curves for all folds
    plot_combined_roc_curve(all_fpr, all_tpr, all_auc, k, output_folder)

    print(f"K-Fold cross-validation completed. Results saved in {output_folder}")

    return all_fold_losses  # Return all fold losses for further plotting


In [13]:
def final_evaluation(X, X_test, y_test, epochs=10, batch_size=256, output_folder=output_folder):
    """Trains the final Autoencoder model on the full training set and evaluates on the test set."""
    print("Final evaluation on test set:")
    
    # Build and train the final Autoencoder model
    input_shape = (X.shape[1],)
    autoencoder_final = build_autoencoder(input_shape)

    # Store training loss history for final evaluation
    history_final = autoencoder_final.fit(X, X, epochs=epochs, batch_size=batch_size, shuffle=True, verbose=0)

    # Predict reconstruction error for test data
    X_test_reconstructed = autoencoder_final.predict(X_test)
    reconstruction_error_test = np.mean(np.square(X_test - X_test_reconstructed), axis=1)

    # ROC Curve for test data
    fpr_test, tpr_test, _ = roc_curve(y_test, reconstruction_error_test)
    roc_auc_test = auc(fpr_test, tpr_test)
    plot_roc_curve(fpr_test, tpr_test, roc_auc_test, 'test_final', output_folder)

    print(f"Final test evaluation and plots saved in {output_folder}")

    return history_final.history['loss']  # Return final model loss

In [14]:
# Run K-Fold cross-validation and save combined loss curve
all_fold_losses = autoencoder_kfold(X_train, k=5, epochs=10, batch_size=256, output_folder=output_folder)


running fold-1
[1m784/784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 509us/step


ValueError: y should be a 1d array, got an array of shape (25084, 16) instead.

In [15]:

# Final evaluation on the test set and get final model loss
final_model_loss = final_evaluation(X_train, X_test, y_test, epochs=10, batch_size=256, output_folder=output_folder)



In [16]:
# Save combined loss curve for K-Fold and final model in one file
save_combined_loss_curve(all_fold_losses, final_model_loss, output_folder)