# Deep Learning Network traffic classifier
This notebook demonstrates how to train and evaluate two deep learning models, an MLP and a CNN, using datasets generated by LUCX. The key cells are located at the bottom of the notebook and enable both training and testing of the models.  
The training phase is performed with randomized search, which tunes the main parameters of the deep learning models. The final two cells implement the testing phase: the first uses the test set produced by the parser, while the second allows the user to evaluate the model on live traffic, either captured directly from a network interface or obtained from a pre-recorded traffic trace in `pcap` format.

## Python script
This notebook can be converted into a Python script by running the following command:
```jupyter nbconvert --to python NIDS.ipynb```.

Once exported, the resulting `NIDS.py` script can be executed to train and test the model using the arguments defined in the second cell of the notebook. E.g.,

```python NIDS.py --train sample_dataset/ --model_type mlp --model nids_model.keras```

or

```python NIDS.py --predict sample_dataset/ --model nids_model.keras --dataset_type DOS2019 --test_iterations 2```

**Note**: Before exporting the notebook, modify the code in the last three cells by commenting out the line that uses the static paths for training or prediction, and uncommenting the line that uses the args parameters. For example, the current training cell is written as follows (to allow testing directly within the notebook):

```train(dataset_path='./sample_dataset', model_type='mlp', model_path='./nids_model.keras')```

```#train(dataset_path=args.train, model_type=args.model_type, model_path=args.model)```

Before converting the notebook into a Python script, it should be updated as:

```#train(dataset_path='./sample_dataset', model_type='mlp', model_path='./nids_model.keras')```

```train(dataset_path=args.train, model_type=args.model_type, model_path=args.model)```

Apply the same modification to the other two cells.

In [None]:
# Copyright (c) 2025 @ FBK - Fondazione Bruno Kessler
# Author: Roberto Doriguzzi-Corin
# Project: LUCX: LUCID network traffic parser eXtended
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import argparse
import pyshark
import numpy as np
import pprint
from scipy.stats import uniform, randint
import tensorflow as tf
import matplotlib.pyplot as plt
from scikeras.wrappers import KerasClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import GridSearchCV,RandomizedSearchCV
from tensorflow.keras.models import Sequential,load_model, save_model
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Dense, Input, Conv2D, Dropout, GlobalMaxPooling2D, Flatten
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.utils import set_random_seed
from lucx_network_traffic_parser import *
from util_functions import *

# We need the following to get around “RuntimeError: This event loop is already running” when using Pyshark within Jupyter notebooks.
# Not needed in stand-alone Python projects
import nest_asyncio
nest_asyncio.apply()  

EPOCHS = 1000
TEST_ITERATIONS=10
MAX_FLOW_LEN = 10
TIME_WINDOW = 10

SEED = 0
np.random.seed(SEED)
set_random_seed(SEED)

# Argument parser
The following cell defines the arguments that are accepted by the `NIDS.py` Python script exported from this notebook. The arguments ```--train``` and ```--predict``` can be used to indicate the folder with the dataset. The script will load the ```hdf5``` files for training and testing respectively. The argument ```--predict_live``` can be used to perform predictions on live traffic captured from a network interface (e.g., ```eth0``` or ```lo```) or to make prediction using a pre-recorded traffic trace (e.g, ```ddos-chunk.pcap```). In both cases the argument is a string. In the first case, it is the name of the interface, in the second case, the path to the ```pcap``` file. The argument ```--model``` indicates the path to a trained model that will be used to make predictions (```--predict``` or ```predict_live```). 

In [None]:
parser = argparse.ArgumentParser(description="A DL-based NIDS for DDoS attack detection")
args = parser.parse_args(args=[])
parser.add_argument('-t', '--train', nargs='?', type=str,  default=None, help="Start the training process")
parser.add_argument('-p', '--predict', nargs='?', type=str,  default=None, help="Perform a prediction on pre-preprocessed data")
parser.add_argument('-pl', '--predict_live', nargs='?', type=str, default=None, help='Perform a prediction on live traffic or on a pre-recorded traffic trace in pcap format')
parser.add_argument('-m', '--model', type=str, default = 'nids_model.keras', help='File containing the model in keras format')
parser.add_argument('-n', '--model_type', nargs='?', type=str,  default='mlp', help="Model type: MLP or CNN")
parser.add_argument('-d', '--dataset_type', type=str, default = 'DOS2019', help='Identifier of the dataset to be used (e.g., "DOS2019")')
parser.add_argument('-i', '--test_iterations', type=int, default = TEST_ITERATIONS, help='Number of test iterations to be performed')

args, unknown = parser.parse_known_args()
print("see all args:", args)

# Get the format of the labels
The labels of the samples can be: (i) binary (0/1), (ii) multiclass one-hot encoded and (iii) multiclass integers. The following method is used before training and testing a model to tune the output layer, set the correct optimizer and to compare the prediction with the labels.

In [None]:
def get_labels_info(y):    
    # dimension of labels
    output_dim = 1
    labels = 'binary'
    if y.ndim == 1: # binary classification or multiclass integer labels
        if len(np.unique(y)) > 2 or max(y) > 1: # multiclass classification
            output_dim=max(y)+1
            labels = 'integer'
    elif y.ndim == 2: # one-hot encoding multiclass
        output_dim = y.shape[1]
        labels = 'one-hot'

    return output_dim, labels

# Classification metrics

In [None]:
def evaluate_metrics(y_true, y_pred, model, samples, labels=None, data_source=None, prediction_time=None):

    # --- Normalize predictions ---
    if y_pred.ndim > 1 and y_pred.shape[1] > 1:
        # Multiclass: take argmax along classes axis (softmax)
        if labels == 'BINARY':
            # Special case: binary classification with one-hot encoding
            y_pred_labels = np.minimum(1,np.argmax(y_pred, axis=1))
        else:
            y_pred_labels = np.argmax(y_pred, axis=1)
    else:
        # Binary: threshold at 0.5
        y_pred_labels = (y_pred.ravel() >= 0.5).astype(int)

    # --- Normalize ground truth ---
    if y_true.ndim > 1 and y_true.shape[1] > 1:
        # One-hot encoded ground truth
        y_true_labels = np.argmax(y_true, axis=1)
    else:
        y_true_labels = y_true.ravel().astype(int)

    # --- Compute metrics ---
    acc = accuracy_score(y_true_labels, y_pred_labels)
    prec = precision_score(y_true_labels, y_pred_labels, average='weighted', zero_division=0)
    rec = recall_score(y_true_labels, y_pred_labels, average='weighted', zero_division=0)
    f1 = f1_score(y_true_labels, y_pred_labels, average='weighted', zero_division=0)

    # --- Print results ---
    print ("----------------------------")
    print(f"Model    : {model}")
    print(f"Samples  : {samples}")
    if data_source is not None:
        print(f"Data Source.: {data_source}")
    print(f"Accuracy : {acc:.4f}")
    print(f"F1 Score : {f1:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall   : {rec:.4f}")
    if prediction_time is not None:
        print(f"Prediction Time: {prediction_time:.4f} seconds")
    print ("----------------------------")
    if len(np.unique(y_true_labels)) <= 2 and max(y_true_labels) < 2: # binary classification
        traffic_classes = DDOS_ATTACK_CLASSES['BINARY'] 
    else:    
        traffic_classes = DDOS_ATTACK_CLASSES[labels] if labels in DDOS_ATTACK_CLASSES and labels is not None else range(len(np.unique(y_true_labels)))
    
    y_unique, counts = np.unique(y_pred_labels, return_counts=True)
    for traffic_class in np.unique(y_true_labels):
        print("{} samples: {}".format(traffic_classes[traffic_class], sum(y_true_labels == traffic_class)))
    print ("----------------------------")

# Implementation of the DL models
The following cell implements two deep learning models: an MLP and a CNN. Both can be trained using the traffic flow representations generated by LUCX. The MLP is trained on the flattened version of the data, whereas the CNN is trained on the array-like version.

In [None]:
def create_mlp_model(input_shape,output_shape=1, labels='binary', optimizer=Adam, dense_layers=4, hidden_units=2, learning_rate = 0.01,dropout_rate=0):
    model = Sequential(name  = "mlp")

    model.add(Input(shape=(input_shape[1],)))
    model.add(Dense(hidden_units, activation='relu'))
    for layer in range(dense_layers):
        model.add(Dense(hidden_units, activation='relu', name='hidden-fc' + str(layer)))
        model.add(Dropout(dropout_rate))
    model.add(Dense(output_shape, activation='softmax' if output_shape > 1 else 'sigmoid', name='fc2'))

    if labels == 'one-hot':
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])
    elif labels == 'integer':
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    else:  # binary
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='binary_crossentropy', metrics=['accuracy'])

    model.summary()
    return model

def create_cnn_model(input_shape,output_shape=1,labels='binary', optimizer=Adam, filters = 100, kernel_size=(3,3), strides=(1,1), padding='same',learning_rate = 0.01,dropout_rate=0.1):
    model = Sequential(name  = "cnn")

    model.add(Input(shape=(input_shape[1], input_shape[2], 1)))
    model.add(Conv2D(filters=filters, kernel_size=kernel_size, data_format='channels_last', activation='relu', padding=padding, strides=strides))
    model.add(Dropout(dropout_rate))
    model.add(GlobalMaxPooling2D())
    model.add(Flatten())
    model.add(Dense(output_shape, activation='softmax' if output_shape > 1 else 'sigmoid', name='fc2'))

    if labels == 'one-hot':
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='categorical_crossentropy', metrics=['accuracy'])
    elif labels == 'integer':
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    else:  # binary
        model.compile(optimizer=optimizer(learning_rate=learning_rate), loss='binary_crossentropy', metrics=['accuracy'])

    model.summary()
    return model

# Prediction on static test set

In [None]:
def predict(dataset_path, model_path, dataset_type=None,test_iterations=TEST_ITERATIONS):
    if dataset_path is not None:
        X_test, y_test = load_dataset(dataset_path + "/*" + '-test.hdf5')

        if model_path == None or model_path.endswith('.keras') == False:
                print ("No valid model specified!")
                return

        if model_path is not None:
            model = load_model(model_path)
        else:
            print ("Invalid model path: ", model_path) 
            return

        pt0 = time.time()
        for i in range(test_iterations):
            y_pred = model.predict(X_test, batch_size=16,verbose=0)   
        pt1 = time.time()
        prediction_time = pt1 - pt0
        evaluate_metrics(y_test, y_pred,model.name,X_test.shape[0],dataset_type, dataset_path,prediction_time)

# Prediction on live traffic

In [None]:
def predict_live(source,model_path,dataset_type=None,test_iterations=TEST_ITERATIONS):
    if source is not None:
        if source.endswith('.pcap'):
            pcap_file = source
            cap = pyshark.FileCapture(pcap_file)
            data_source = pcap_file.split('/')[-1].strip()
        else:
            cap =  pyshark.LiveCapture(interface=source)
            data_source = args.predict_live

        print ("Prediction on network traffic from: ", source)

        if model_path is not None:
            model = load_model(model_path)
        else:
            print ("Invalid model path: ", model_path) 
            return

        # load the labels, if available
        labels = parse_labels(dataset_type)

        # Statistics on live traffic are computed considering benign vs malicious only
        mc_labels = multiclass_labels('BINARY')


        if model.name == 'mlp':
            mins, maxs = static_min_max(flatten=True,time_window=TIME_WINDOW,max_flow_len=MAX_FLOW_LEN)
        else:
            mins, maxs = static_min_max(flatten=False,time_window=TIME_WINDOW,max_flow_len=MAX_FLOW_LEN)

        iteration = 0
        while (iteration < test_iterations):
            samples = process_live_traffic(cap, labels, mc_labels, max_flow_len=MAX_FLOW_LEN, traffic_type="all",time_window=TIME_WINDOW)
            if len(samples) > 0:
                X,y_true,flow_ids = dataset_to_list_of_fragments(samples)
                if model.name == 'mlp':
                    X = flatten_samples(X)
                    X = np.array(normalize(X, mins, maxs))
                else:
                    X = np.array(normalize_and_padding(X, mins, maxs, MAX_FLOW_LEN))

                if labels is not None:
                    y_true = np.array(y_true)
                else:
                    y_true = None
                
                pt0 = time.time()
                y_pred = model.predict(X, batch_size=16,verbose=0)
                pt1 = time.time()
                prediction_time = pt1 - pt0
                iteration += 1
                # hf = h5py.File('./live-prediction-test.hdf5', 'w')
                # hf.create_dataset('set_x', data=X)
                # hf.create_dataset('set_y', data=y_true)
                # hf.create_dataset('set_y_pred', data=y_pred)
                # hf.close()

                print ("Time window ", iteration)
                evaluate_metrics(y_true, y_pred,model.name,X.shape[0],'BINARY', source, prediction_time)

# Hyperparameter tuning
Hyperparameter tuning with random search and k-fold cross-validation.

In [None]:
def train(dataset_path, model_type, model_path):
    if dataset_path is not None:
        X_train, y_train = load_dataset(dataset_path + "/*" + '-train.hdf5')
        X_val, y_val = load_dataset(dataset_path + "/*" + '-val.hdf5')

        output_dim, labels = get_labels_info(y_train)

        param_mlp_dist = {
            'optimizer': [SGD, Adam],
            'model__learning_rate': uniform(0.0001, 0.01),
            'model__hidden_units': randint(4,8),
            'model__dense_layers': randint(1,4)
        }

        param_cnn_dist = {
            ### ADD YOUR CODE HERE ###
            'model__learning_rate' : uniform(0.0001, 0.01),
            'model__filters' : randint(16,64),
            'optimizer' : [SGD,Adam],
            'model__kernel_size': [(2,2),(3,3),(2,3)],
            'model__strides': [(1,1),(2,2)],
            'model__padding' : ['same', 'valid']
            ##########################
        }

        if model_type == 'mlp':
            param_dist = param_mlp_dist
            create_model = create_mlp_model
        else:
            param_dist = param_cnn_dist
            create_model = create_cnn_model


        early_stopping = EarlyStopping(monitor='val_loos', mode='min', verbose=1, patience=5, restore_best_weights=True)
        model = KerasClassifier(model=create_model, input_shape=X_train.shape,  output_shape=output_dim, labels=labels, batch_size=32, verbose=1,callbacks=[early_stopping])
        random_search = RandomizedSearchCV(estimator=model, param_distributions=param_dist, n_iter=2, cv=2, random_state=SEED)
        random_search_result = random_search.fit(X_train, y_train,epochs=100, validation_data=(X_val, y_val))


        # Print the best parameters and corresponding accuracy
        print("Best parameters found: ", random_search.best_params_)
        print("Best cross-validated accuracy: {:.2f}".format(random_search.best_score_))


        # Save the best model
        best_model = random_search.best_estimator_.model_
        if model_path is not None:
            print ("Model saved as: " + model_path)
            best_model.save(model_path)
        else:
            model_path = './nids_model-' + model_type + '.keras'
            print ("Model saved as: " + model_path)
            best_model.save(model_path)

# Train your model
Train the model by calling the `train` method above with the appropriate arguments: `dataset_path`, `model_type`, and `model_path`.

The uncommented line in the cell below invokes `train` using the default values, allowing the command to run directly within the notebook. With the current defaults, the next cell will execute:

```train(dataset_path='./sample_dataset', model_type='mlp', model_path='./nids_model.keras')```

You can pass different arguments directly to the `predict` call below to specify alternative dataset paths, model types, or model locations.

Before exporting the notebook to a Python script, remember to comment out the first line and uncomment the second one, which calls the `train` method using the arguments provided through the command line.

In [None]:
# Train the model

train(dataset_path='./sample_dataset', model_type='mlp', model_path='./nids_model.keras')
#train(dataset_path=args.train, model_type=args.model_type, model_path=args.model)

# Make predictions on the test set
Test a pre‑trained model by calling the `predict` method with the appropriate arguments: `dataset_path`, `model_path`, `dataset_type`, and `test_iterations`.

The method call is already configured with default values so that the command can be executed directly within the notebook.

With the current defaults, the next cell will run:

```predict(dataset_path='./sample_dataset', model_path='./nids_model.keras', dataset_type='DOS2019', test_iterations=10)```

You can pass different arguments directly to the `predict` call below to specify alternative dataset paths, model locations, dataset types and number of test iterations.

Before exporting the notebook to a Python script, remember to comment out the first line and uncomment the second one, which calls the `predict` method using the arguments provided through the command line.

**Note:**  
- `dataset_type` is used to correctly compute the accuracy metrics.  
- `test_iterations` specifies how many times the test should be repeated to compute the average processing time.

In [None]:
# Predictions on the test set

predict(dataset_path='./sample_dataset', model_path='./nids_model.keras', dataset_type='DOS2019', test_iterations=10)
#predict(dataset_path=args.predict, model_path=args.model, dataset_type=args.dataset_type,test_iterations=args.test_iterations)

# Make predictions using a pcap file
The following cell allows testing a pre-trained model with live traffic by either collecting traffic from a network interface or by reading the packets from a pre-recorded traffic trace in `pcap` format. 

Call the `predict_live` method with the appropriate arguments: `source`, `model_path`, `dataset_type`, and `test_iterations`.

The method call is already  with default values so that the command can be executed directly within the notebook.

With the current defaults, the next cell will run:

```predict_live(source='./sample_dataset/DOS2019_Test_PCAPs/ddos-chunk.pcap', model_path='./nids_model.keras', dataset_type='DOS2019', test_iterations=10)```

You can pass different arguments directly to the `predict_live` call below to specify alternative pcap path, network interface, model locations, dataset types and number of test iterations.

Before exporting the notebook to a Python script, remember to comment out the first line and uncomment the second one, which calls the `predict` method using the arguments provided through the command line.

**Note:**  
- `source` is either the path to a pcap file or the name of a network interface (e.g., `eth0` or `lo`).
- `dataset_type` is used to correctly compute the accuracy metrics.  
- `test_iterations` specifies how many times the test should be repeated to compute the average processing time.

In [None]:
# Predictions on a pcap file

predict_live(source='./sample_dataset/DOS2019_Test_PCAPs/ddos-chunk.pcap', model_path='./nids_model.keras', dataset_type='DOS2019', test_iterations=2)
#predict_live(source=args.predict_live,model_path=args.model,dataset_type=args.dataset_type, test_iterations=args.test_iterations)