In [None]:
# Impostazioni globali
# --------------------------
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.ensemble import RandomForestClassifier


NSL_DATASET = False
BINARY = False
XGB = True
KFOLD_TRAINING = False
KFOLDSTRAT_TRAINING = True
GRID_SEARCH = False

models = [
    GaussianNB(),
    LogisticRegression(),
    LinearDiscriminantAnalysis(),
    RandomForestClassifier(),
    MLPClassifier(hidden_layer_sizes=(20, 20), activation="relu", solver="adam", random_state=42)
    ]
PARAM_GRID = {
    'hidden_layer_sizes': [(20,), (50,), (20, 20), (50, 20), (50, 50)],
    'activation': ['relu', 'tanh'],
    'solver': ['adam', 'sgd'],
}
# --------------------------
if NSL_DATASET:
    DATASET_PATH = "./datasets/nslkdd/"

    binary_mapping = {
        "normal": "BENIGN"
    }
    multiclass_mapping = {
        "normal": "BENIGN",
        "R2L": ["warezclient", "guess_passwd", "ftp_write", "multihop", "imap", "phf", "spy", "warezmaster"],
        "DoS":["neptune", "teardrop", "smurf", "pod", "back", "land"],
        "U2R":["rootkit", "buffer_overflow", "loadmodule", "perl"],
        "Probe":["ipsweep", "portsweep", "nmap", "satan"]
    }

    Y_LABEL = "attack"
else: # CICIDS
    DATASET_PATH = "./datasets/cicids2017/"

    binary_mapping = {
        "BENIGN": "BENIGN"
    }
    multiclass_mapping = {
        "BENIGN": "BENIGN",
        "Brute Force": ["FTP-Patator", "SSH-Patator"],
        "DoS":["DoS slowloris", "DoS Slowhttptest", "DoS Hulk", "DoS GoldenEye", "Heartbleed"],
        "Web Attack":["Web Attack � Brute Force", "Web Attack � XSS", "Web Attack � Sql Injection"],
        "Infiltration":"Infiltration",
        "DDoS":"DDoS",
        "PortScan":"PortScan",
        "Bot":"Bot"
    }
    
    Y_LABEL = "Label" 

if BINARY:
    MAPPING = binary_mapping
else:
    MAPPING = multiclass_mapping

In [None]:
# Import del dataset
# --------------------------
import os
import pandas as pd

def dataset_import(dataset_path, verbose=True):
    """
    Importa il dataset presente nella cartella specificata
    concatenando tutti i file in un unico data frame.
    """

    # Ottieni tutti i file presenti nella cartella del dataset
    file_names = os.listdir(dataset_path)

    # Leggi i file e salva i dataframe in una lista
    data_frames = []
    for file_name in file_names:
        file_path = os.path.join(dataset_path, file_name)
        df = pd.read_csv(file_path)
        data_frames.append(df)

    # Concatena i dataframe in un unico dataframe
    df = pd.concat(data_frames)

    # Stampa il numero di righe del dataset
    if verbose:
        print(f"Numero di record importati nel dataframe: {df.shape[0]}")

    return df


df = dataset_import(DATASET_PATH)

if NSL_DATASET:
    columns = (['duration','protocol_type','service','flag','src_bytes','dst_bytes','land','wrong_fragment','urgent','hot'
    ,'num_failed_logins','logged_in','num_compromised','root_shell','su_attempted','num_root','num_file_creations'
    ,'num_shells','num_access_files','num_outbound_cmds','is_host_login','is_guest_login','count','srv_count','serror_rate'
    ,'srv_serror_rate','rerror_rate','srv_rerror_rate','same_srv_rate','diff_srv_rate','srv_diff_host_rate','dst_host_count','dst_host_srv_count'
    ,'dst_host_same_srv_rate','dst_host_diff_srv_rate','dst_host_same_src_port_rate','dst_host_srv_diff_host_rate','dst_host_serror_rate'
    ,'dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate','attack','level'])
    df.columns = columns

In [None]:
# Dataset cleaning
# --------------------------
import numpy as np

def data_cleaning(df, verbose=True):
    """
    Pulisce il dataset rimuovendo le righe con valori nulli,
    valori infiniti e duplicati.
    """

    # Rimuovi lo spazio iniziale nel nome delle colonne
    df.columns = df.columns.str.strip()

    # Rimuovi le righe contenenti valori nulli
    null_counts = df.isnull().sum()
    df.dropna(inplace=True)

    if verbose:
        print(f"Rimosse {null_counts.sum()} righe con valori nulli")

    # Rimuovi le righe con valori infiniti nelle colonne float
    float_cols = df.select_dtypes(include=[np.float64])
    num_inf_rows = np.isinf(float_cols).any(axis=1).sum()
    df.replace([np.inf, -np.inf], np.nan, inplace=True)
    df.dropna(inplace=True)
    
    if verbose:
        print(f"Rimosse {num_inf_rows} righe con valori infiniti")

    # Rimuovi le righe duplicate
    duplicate_count = df.duplicated().sum()
    df.drop_duplicates(inplace=True)
    
    if verbose:
        print(f"Rimosse {duplicate_count} righe duplicate")

    # Resetta l"indice del dataset
    df.reset_index(drop=True, inplace=True)

    return df


df = data_cleaning(df)

In [None]:
# Estrai le feature e la variabile target
# --------------------------
import matplotlib.pyplot as plt
import seaborn as sns

def plot(y, verbose=True):
    """
    Visualizza la distribuzione delle classi target.
    """

    # Plotting
    plt.figure(figsize=(10, 5))
    sns.countplot(x=y, hue=y, palette="Set1", legend=False)
    plt.title("Distribuzione della variabile target")
    plt.xlabel("Classe Target")
    plt.ylabel("Conteggio")
    plt.xticks(rotation=45)
    plt.show()

    # Stampa la percentuale di ciascuna classe rispetto al totale
    class_counts = y.value_counts()
    class_ratios = class_counts / len(y)

    if verbose:
        print("Distribuzione delle classi target:")
        for class_name, class_ratio in class_ratios.items():
            print(f"{class_name}: {class_ratio*100:.2f}% ({class_counts[class_name]}/{len(y)})")

def extract_X_and_Y(df, mapping, y_label, verbose=True):
    """
    Estrai la colonna target (y) dalle feature dal dataset (X).
    """

    # Visualizza le categorie target e il numero di record per ciascuna
    if verbose:
        print("Conteggio dei record per ogni categoria target:\n")
        print(df[y_label].value_counts())

    # Estrai le training features
    X = df.copy().drop(y_label, axis=1)

    # Estrai la colonna target
    y = df[y_label].copy()

    # Raggruppa le classi target secondo la variabile mapping
    def expand_mapping(mapping):
        new_mapping = {}

        for key, value in mapping.items():
            if isinstance(value, list):
                for item in value:
                    new_mapping[item] = key
            else:
                new_mapping[key] = value

        return new_mapping

    y = y.map(expand_mapping(mapping)).fillna("Malicious")
    
    # Visualizza le categorie target e il numero di record per ciascuna
    if verbose:
        print("\n\nConteggio dei record per ogni categoria target dopo la trasformazione:\n")
        print(y.value_counts())

        plot(y)
        

    return X, y


X, y = extract_X_and_Y(df, MAPPING, Y_LABEL)

In [None]:
# OneHotEncoding
# --------------------------
from sklearn.preprocessing import OneHotEncoder

def one_hot_encoding(X, y, verbose=True):
    """
    Applica la codifica one-hot alle colonne di tipo object.
    """

    cols_to_ohe = X.select_dtypes("object").columns

    if verbose:
        print("Colonne da codificare con one-hot encoding:")
        print(cols_to_ohe)

    ohe = OneHotEncoder(sparse_output=False)

    num_cols = ohe.fit_transform(X[cols_to_ohe])
    num_cols_names = ohe.get_feature_names_out(cols_to_ohe)
    ohe_df = pd.DataFrame(num_cols, columns=num_cols_names)
    X = pd.concat([X.drop(columns=cols_to_ohe), ohe_df], axis=1)

    return X, y


X, y = one_hot_encoding(X, y)

In [None]:
# Undersampling
# --------------------------
from imblearn.under_sampling import RandomUnderSampler

def undersampling(X, y, verbose=True):
    """
    Effettua l'undersampling delle classi target.
    """

    X, y = RandomUnderSampler().fit_resample(X, y)

    if verbose:
        plot(y)

    return X, y


if BINARY:
    X, y = undersampling(X, y)

In [None]:
# Feature Selection con XGBoost
# --------------------------
from xgboost import XGBClassifier
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder


def dataset_training_split_with_validation(X, y, p_train, p_val, p_test, random_state=42, shuffle=True):
    """
    Suddivide il dataframe X (delle feature) in X_train, X_val e X_test 
    e il dataframe Y (target) in y_train, y_val, y_test secondo le percentuali fornite.
    """

    # La suddivisione iniziale tra train e test produce X_train e y_train
    # Vengono creati anche X_temp e y_temp, che produrranno gli altri sottoinsiemi
    X_train, X_temp, y_train, y_temp = train_test_split(X,y,                
                                                        stratify=y,
                                                        test_size=(1.0 - p_train),
                                                        random_state=random_state,
                                                        shuffle=shuffle)
    # Applicando la condizione stratify garantiamo una distribuzione omogenea delle feature nei target
    
    # Il parametro fraction descrive la dimensione rilevante della dimensione di test
    fraction = p_test / (p_val + p_test)
    X_val, X_test, y_val, y_test = train_test_split(X_temp,y_temp,stratify=y_temp,
                                                      test_size=fraction,
                                                      random_state=random_state,
                                                      shuffle=shuffle)
    
    return X_train, X_val, X_test, y_train, y_val, y_test

def get_feature_importances(original_data, clf_params, verbose=False):
    """
    Calcola l"importanza delle feature utilizzando un classificatore XGBoost addestrato su 
    dati di addestramento e di convalida.
    """

    # Estrai i dati originali
    X_train, X_val, _, y_train, y_val, _ = original_data
    
    # Crea un"istanza del classificatore XGBoost
    xgb_clf = XGBClassifier(**clf_params)

    # Addestra il classificatore
    xgb_clf.fit(X_train, LabelEncoder().fit_transform(y_train),
                 eval_set=[(X_val, LabelEncoder().fit_transform(y_val))], verbose=False)
    
    # Ottieni l'importanza delle feature
    feature_importances = xgb_clf.feature_importances_
    
    if verbose:
        # Determina la lunghezza massima dei nomi delle feature
        max_feature_name_length = max(len(name) for name in X_train.columns)
        print(f"{'Feature':<{max_feature_name_length}}\tImportanza")
        for name, importance in zip(X_train.columns, 
                                    feature_importances):
            print(f"{name:<{max_feature_name_length}}\t{importance*100:.2f}%")             
    
    return feature_importances

def select_features_by_threshold(X, y, threshold=0.01, verbose=True):
    """
    Seleziona le caratteristiche dal dataset originale in base a una data soglia di importanza.
    """

    # Inizializza parametri del classificatore
    if y.value_counts().shape[0] > 2:
        objective="multi:softprob"
        eval_metric="mlogloss"
    else:
        objective="binary:logistic"
        eval_metric="logloss"

    clf_params = dict(
        objective=objective,
        n_estimators=50,
        eval_metric=eval_metric,
        max_depth=3,
        learning_rate=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.1,
        early_stopping_rounds=10,
        random_state=42)

    # Split del dataset
    splitted_dataset = dataset_training_split_with_validation(X,y,0.75,0.10,0.15,42,True)

    # Estrai i dati
    X_train, X_val, X_test, _, _, _ = splitted_dataset
    
    # Ottieni l"importanza delle feature
    feature_importances = get_feature_importances(splitted_dataset, clf_params)
    
    # Ottieni i nomi delle feature
    feature_names = X_train.columns
    
    # Seleziona le feature in base alla soglia
    selected_features = np.where(feature_importances > threshold)[0]
    
    # Crea una lista di tuple contenenti i nomi delle feature e le importanze
    feature_tuples = [(name, importance) for name, importance in 
        zip(feature_names[selected_features], feature_importances[selected_features])]
    
    # Ordina le feature selezionate per importanza in ordine decrescente
    sorted_feature_tuples = sorted(feature_tuples, key=lambda x: x[1], reverse=True)
    
    # Estrai i nomi delle feature e le importanze ordinate
    sorted_feature_names = [tup[0] for tup in sorted_feature_tuples]
    sorted_feature_importances = [tup[1] for tup in sorted_feature_tuples]
    
    # Sottoseleziona i dati originali con le feature selezionate
    X_train_selected = X_train.loc[:, sorted_feature_names]
    X_val_selected   = X_val.loc[:, sorted_feature_names]
    X_test_selected  = X_test.loc[:, sorted_feature_names]

    # Stampa il numero di feature selezionate
    print(f"Il numero di feature selezionate è: {len(sorted_feature_names)}/{len(feature_names)}","\n")

    # Stampa i nomi delle feature selezionate e le importanze se printing è True
    if verbose:
        max_feature_name_length = max(len(name) for name in sorted_feature_names)
        print(f"{'Feature':<{max_feature_name_length}}\tImportanza\n")
        for name, importance in sorted_feature_tuples:
            print(f"{name:<{max_feature_name_length}}\t{importance*100:.2f}%")
    
    return sorted_feature_names


if XGB:
    # Seleziona le feature con una soglia di importanza del 1%
    feature_names = select_features_by_threshold(X, y, threshold=0.01)
    X = X[feature_names]
else:
    feature_names = X.columns

In [None]:
# Split del dataset
# --------------------------
training_split = train_test_split(X, y, test_size=0.33, shuffle=True, random_state=42)

In [None]:
# Standardizzazione delle feature
# --------------------------
from sklearn.preprocessing import StandardScaler

def standardize_features(training_split):
    """
    Standardizza le feature del dataset.
    """

    # Creating Object
    scaler = StandardScaler()
    # Standardizing the features
    training_split[0] = scaler.fit_transform(training_split[0])  
    training_split[1] = scaler.transform(training_split[1])


standardize_features(training_split)

In [None]:
# Training
# --------------------------
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import cross_validate
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
from sklearn.preprocessing import label_binarize

def accuracy_report(y_test, y_pred):
    # Calculate accuracy
    accuracy = accuracy_score(y_test, y_pred)
    print(f"Accuracy: {accuracy:.10f}")

    # Print confusion matrix
    conf_matrix = confusion_matrix(y_test, y_pred)
    print("Confusion Matrix:")
    print(conf_matrix)

    # Print classification report
    class_report = classification_report(y_test, y_pred)
    print("Classification Report:")
    print(class_report)

def feature_report(features, importances):
    importance_df = pd.DataFrame({"Feature": features, "Importance": importances})\
        .sort_values(by="Importance", ascending=False)

    # Stampa le feature più importanti
    print("\nFeature più importanti:")
    print(importance_df.head(10))

def plot_confusion_matrix(model, y_test, y_pred):
    """
    Visualizza la matrice di confusione del modello.
    """
    # Calcola la matrice di confusione
    conf_matrix = confusion_matrix(y_test, y_pred)

    # Plotting
    plt.figure(figsize=(5, 4))
    sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", cbar=False)
    plt.title("Matrice di confusione")
    plt.xlabel("Etichette Predette")
    plt.ylabel("Etichette Vere")
    plt.show()

def plot_roc_curve(model, X_test, y_test):
    """
    Visualizza la curva ROC del modello.
    """
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    
    # Calcola la ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba, pos_label="Malicious") 
    roc_auc = auc(fpr, tpr)

    # Disegna la ROC curve
    plt.figure()  
    plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend()
    plt.show()

def train(models, training_split, feature_names):
    X_train, X_test, y_train, y_test = training_split

    for model in models:
        print(f"\n\nTraining con modello: {model.__class__.__name__}")
        print("--------------------------------------------------")
        
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        accuracy_report(y_test, y_pred)

        if hasattr(model, "feature_importances_"):
            feature_report(feature_names, model.feature_importances_)

        # Visualizza la matrice di confusione
        plot_confusion_matrix(model, y_test, y_pred)

        # Visualizza la curva ROC
        plot_roc_curve(model, X_test, y_test)

def train_k_fold(models, kfold, training_data):
    X, y = training_data

    scoring = ["accuracy", "precision_weighted", "recall_weighted", "f1_weighted"]

    for model in models:
        print(f"\n\nTraining con modello: {model.__class__.__name__}")
        print("--------------------------------------------------")
    
        scores = cross_validate(model, X, y, cv=kfold, scoring=scoring)

        # Stampa un report sulle metriche di valutazione del modello
        print(f"Media delle metriche di valutazione dopo {kfold}-fold cross validation:")

        indexes = list(scores.keys())

        for index in indexes:
            print(f"\t{index}: %0.2f (+/- %0.2f)" % (scores[index].mean(), scores[index].std() * 2))

def train_k_fold_strat(models, skf, training_data):
    X, y = training_data
    target_names = np.unique(y)

    for model in models:
        print(f"\n\nTraining con modello: {model.__class__.__name__}")
        print("--------------------------------------------------")
    
        # Perform stratified k-fold cross-validation and collect results
        reports = []

        for train_index, test_index in skf.split(X, y):
            X_train, X_test = X.iloc[train_index], X.iloc[test_index]
            y_train, y_test = y.iloc[train_index], y.iloc[test_index]
            
            # Initialize and train the model
            model.fit(X_train, y_train)
            
            # Make predictions and evaluate the model
            y_pred = model.predict(X_test)
            report = classification_report(y_test, y_pred, target_names=target_names, output_dict=True)
            reports.append(report)

        # Initialize a dictionary to store average metrics
        avg_report = {label: {"precision": 0, "recall": 0, "f1-score": 0, "support": 0} for label in target_names}
        avg_report["accuracy"] = 0

        # Aggregate metrics
        for report in reports:
            for label in target_names:
                avg_report[label]["precision"] += report[label]["precision"]
                avg_report[label]["recall"] += report[label]["recall"]
                avg_report[label]["f1-score"] += report[label]["f1-score"]
                avg_report[label]["support"] += report[label]["support"]
            avg_report["accuracy"] += report["accuracy"]

        # Average the metrics
        num_folds = skf.get_n_splits()
        for label in target_names:
            avg_report[label]["precision"] /= num_folds
            avg_report[label]["recall"] /= num_folds
            avg_report[label]["f1-score"] /= num_folds
            avg_report[label]["support"] /= num_folds
        avg_report["accuracy"] /= num_folds

        # Print the average report
        print("Report finale dopo la cross-validation stratificata:")
        for label in target_names:
            print(f"Class: {label}")
            print(f"  Precision: {avg_report[label]['precision']:.4f}")
            print(f"  Recall: {avg_report[label]['recall']:.4f}")
            print(f"  F1-Score: {avg_report[label]['f1-score']:.4f}")
            print(f"  Support: {avg_report[label]['support']:.2f}\n")
        print(f"Overall Accuracy: {avg_report['accuracy']:.4f}")


if KFOLD_TRAINING:
    kfold = KFold(n_splits=5, shuffle=True, random_state=42)
    train_k_fold(models, kfold, [X, y])

if KFOLDSTRAT_TRAINING:
    skf = StratifiedKFold(n_splits=5)
    train_k_fold_strat(models, skf, [X, y])

if BINARY and not(GRID_SEARCH):
    train(models, training_split, feature_names)

In [None]:
import time
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPClassifier

def grid_search(training_split, param_grid, verbose=True):
    """
    Esegue una grid search per trovare i migliori parametri per la rete neurale 
    e stampa la performance e il tempo per ogni combinazione.
    """

    # Inizializza la rete neurale
    model = MLPClassifier(random_state=42, verbose=False)

    # Crea l'oggetto GridSearchCV
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=3, return_train_score=True)

    # Ottieni i dati di addestramento
    X_train, _, y_train, _ = training_split

    # Misura il tempo di inizio
    start_time = time.time()

    # Allena il modello con i parametri della grid search
    grid_search.fit(X_train, y_train)

    # Misura il tempo di fine
    end_time = time.time()

    # Calcola il tempo totale impiegato per la grid search
    total_time = end_time - start_time

    # Ottieni i risultati completi della grid search
    results = grid_search.cv_results_

    # Stampa i risultati per ogni combinazione di parametri
    if verbose:
        print(f"{'Params':<60} {'Mean Test Score':<20} {'Fit Time (s)':<15}")
        print("=" * 100)
        for mean_score, params, fit_time in zip(results['mean_test_score'], results['params'], results['mean_fit_time']):
            print(f"{str(params):<60} {mean_score:<20.4f} {fit_time:<15.4f}")

    # Ottieni i migliori parametri e il miglior punteggio
    best_params = grid_search.best_params_
    best_score = grid_search.best_score_

    # Stampa i migliori parametri e il miglior punteggio
    if verbose:
        print("\nBest Parameters:", best_params)
        print("Best Score:", best_score)
        print("Total Time for Grid Search: {:.2f} seconds".format(total_time))

    return best_params, best_score


if GRID_SEARCH:
    grid_search(training_split, PARAM_GRID)