<a href="https://colab.research.google.com/github/SianC7/LAIDS/blob/main/Final_1D_CNN_Baseline_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#CICIDS2017 Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import pandas as pd

# --- Data Collection ---

# Set pandas display options for wide output
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)


# Get Data file path
file_path = '/content/drive/MyDrive/Colab Notebooks/Honours Project/Datasets/cicids2017_cleaned.csv'
#file_path = "/content/drive/MyDrive/Honours Project/Datasets/ADASYN_CICIDS2017_Dataset.csv" # for Claire
# file_path = "/content/drive/MyDrive/Honours Project/Datasets/CICIDS2017 ADASYN Dataset/ADASYN_CICIDS2017_Dataset.csv" #For Chris
cicids2017_df = pd.read_csv(file_path, sep=",", comment="#", header=0)
cicids2017_df.columns = cicids2017_df.columns.str.strip()  # Strip whitespace from column names


print("\nInitial samples:")
print(f"cicids2017_df shape: {cicids2017_df.shape}")
# print(cicids2017_df.head().to_string())
# print(cicids2017_df.info())

# Print unique values and their counts for 'Attack Type'
print("\nAttack Type Distribution:")
print(cicids2017_df['Attack Type'].value_counts())

# --- Label Encoding ---

# Get unique attack types
attack_types = cicids2017_df['Attack Type'].unique()

# Create a mapping from attack type to integer label
attack_type_map = {'Normal Traffic': 0, 'Port Scanning': 1, 'Web Attacks': 2, 'Brute Force': 3, 'DDoS': 4, 'Bots': 5, 'DoS': 6} # Use the specified mapping

# Apply label encoding
cicids2017_df['Attack Type'] = cicids2017_df['Attack Type'].map(attack_type_map)

print("\nLabel Encoding Mapping:")
print(attack_type_map)

# Train/val/test split

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

# --- Train/val/test split ---
# Split label from datafram
X = cicids2017_df.drop('Attack Type', axis=1)
y = cicids2017_df['Attack Type']

# Split Data
X_temp, X_test, y_temp, y_test= train_test_split(X, y, test_size=0.2, random_state=42, stratify = y)
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.2, random_state=42, stratify = y_temp)

# Shuffle the data
X_train, y_train = shuffle(X_train, y_train, random_state=42)
X_val, y_val = shuffle(X_val, y_val, random_state=42)
X_test, y_test = shuffle(X_test, y_test, random_state=42)


Visualisation of data split

In [None]:
import numpy as np
import matplotlib.pyplot as plt

print(f"Input Shapes -> Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")

# Get the counts for each attack type in each dataset
train_counts = np.bincount(y_train)
val_counts = np.bincount(y_val)
test_counts = np.bincount(y_test)

# Get the number of unique attack types (based on the maximum index found)
num_attack_types = max(len(train_counts), len(val_counts), len(test_counts))

# Pad counts with zeros if some attack types are missing in a dataset split
train_counts = np.pad(train_counts, (0, num_attack_types - len(train_counts)), 'constant')
val_counts = np.pad(val_counts, (0, num_attack_types - len(val_counts)), 'constant')
test_counts = np.pad(test_counts, (0, num_attack_types - len(test_counts)), 'constant')


print(f"Labels distribution -> Train: {train_counts}, Val: {val_counts}, Test: {test_counts}")

# Data for plotting
datasets = ['Train', 'Validation', 'Test']
all_counts = np.array([train_counts, val_counts, test_counts])

# Get the original attack type names from the mapping
# Need to reverse the mapping to get names from labels
reverse_attack_type_map = {v: k for k, v in attack_type_map.items()}
labels = [reverse_attack_type_map.get(i, f'Unknown {i}') for i in range(num_attack_types)]

x = np.arange(len(labels))  # the label locations for attack types
width = 0.25  # the width of the bars

fig, ax = plt.subplots(figsize=(10, 6)) # Increase figure size

rects1 = ax.bar(x - width, all_counts[0], width, label='Train')
rects2 = ax.bar(x, all_counts[1], width, label='Validation')
rects3 = ax.bar(x + width, all_counts[2], width, label='Test')

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_ylabel('Number of Samples')
ax.set_title('Attack Type Distribution Across Datasets')
ax.set_xticks(x)
ax.set_xticklabels(labels, rotation=45, ha="right") # Rotate labels for better readability
ax.legend()

# Add value labels (optional, can make the plot cluttered with many categories)
def autolabel(rects):
    """Attach a text label above each bar in *rects*, displaying its height."""
    for rect in rects:
        height = rect.get_height()
        ax.annotate('{}'.format(height),
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom', fontsize=8)

autolabel(rects1)
autolabel(rects2)
autolabel(rects3)


plt.tight_layout()
plt.show()

#Normalise the datasets

In [None]:
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
import numpy as np

# --- Normalize ---
scaler = StandardScaler() # Initialize the scaler

# Apply the scaler
X_train = scaler.fit_transform(X_train) # Standardise data features
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Print the shape of the scaled data to verify
print(f"Shape of X_train after scaling: {X_train.shape}")
print(f"Shape of X_val after scaling: {X_val.shape}")
print(f"Shape of X_test after scaling: {X_test.shape}")

# Data reshaping for 1D CNN input

In [None]:
import numpy as np

# Reshape the data for 1D CNN input
# 1D CNN expects input shape: (samples, timesteps, features). timesteps = number of features, features = 1 (per timestep)
X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_val = X_val.reshape((X_val.shape[0], X_val.shape[1], 1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))


# --- Final Shape Confirmation ---
print("Training input shape:", X_train.shape)
print("Validation input shape:", X_val.shape)
print("Test input shape:", X_test.shape)

# Perform Hyperparameter Tuning (Bayesian Optimisation) of 2 layer CNN model

In [None]:
# # Install tuner
# !pip install keras-tuner --quiet

In [None]:
# # Setup
# import keras_tuner as kt
# import tensorflow as tf
# from tensorflow.keras import layers
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization
# from tensorflow.keras import regularizers

In [None]:
# def build_base_model(hp):
#     model = Sequential()

#     # First Conv1D block
#     model.add(Conv1D(
#         filters=hp.Choice('conv1_filters', values=[8, 16, 32, 64, 128]),
#         kernel_size=hp.Choice('conv1_kernel_size', values=[2, 3, 5]),
#         activation='relu',
#         input_shape=(52, 1) # Define input shape here
#     ))

#     model.add(BatchNormalization())
#     model.add(MaxPooling1D(2))
#     model.add(Dropout(hp.Choice('dropout1_rate', [0.0,0.2, 0.25, 0.3, 0.5])))

#     # Second Conv1D block
#     model.add(Conv1D(
#         filters=hp.Choice('conv2_filters', values=[8,16, 32, 64, 128]),
#         kernel_size=hp.Choice('conv2_kernel_size', values=[2, 3, 5]),
#         activation='relu'
#     ))
#     model.add(BatchNormalization())
#     model.add(MaxPooling1D(2))

#     # Flatten previous layers
#     model.add(Flatten())

#     # Dense layer with L2 regularization
#     model.add(Dense(
#         hp.Choice('dense_units', [8, 12, 24, 64]),
#         activation='relu',
#         kernel_regularizer=regularizers.l2(hp.Choice('dense_L2', [0.0, 0.001, 0.0001, 0.01]))
#     ))

#     # Dropout layer
#     model.add(Dropout(hp.Choice('dropout2_rate', [0.0, 0.2, 0.25, 0.3, 0.5])))

#     # Dense layer for multiclass classification with softmax activation
#     model.add(Dense(7, activation='softmax')) # Assuming 7 classes based on the value_counts output

#     # Compile
#     model.compile(
#         optimizer= tf.keras.optimizers.Adam(learning_rate=hp.Choice('learning_rate', [0.001, 0.005, 0.01])), #'adam', #add optimiser choices?
#         loss='sparse_categorical_crossentropy', # Use sparse_categorical_crossentropy for integer labels
#         metrics=['accuracy']
#     )
#     return model

In [None]:
# # --- Create tuner ---
# tuner = kt.BayesianOptimization(
#     build_base_model,
#     objective= 'val_accuracy',#kt.Objective('val_f1_score', direction='max'),# Find the hyperparameters that give the highest possible F1 score on the validation set
#     max_trials=10,
#     directory='bayesian_tuning',
#     project_name='baseline_cnn'
# )

# # --- Implement callback ---
# early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', # Specify monitor='val_loss' to track the validation loss.
#                                patience=3, #the number of epochs to wait for an improvement
#                                restore_best_weights=True) # Revert the model to its state where it outputted the lowest validation loss

# # --- Start tuning ---
# tuner.search(X_train, y_train,
#              epochs=10,
#              batch_size=64,
#              validation_data=(X_val, y_val),
#              callbacks=[early_stop])

In [None]:
# # --- Display Best Variables ---
# best_hps = tuner.get_best_hyperparameters(1)[0]
# print("Best Hyperparameters:")
# for param in best_hps.values:
#     print(f"{param}: {best_hps.get(param)}")

In [None]:
# # --- Visulise tunning results ---
# import pandas as pd
# import matplotlib.pyplot as plt

# trials = tuner.oracle.get_best_trials(num_trials=20)
# val_accuracies = [t.metrics.get_last_value('val_accuracy') for t in trials]

# plt.plot(val_accuracies, marker='o')
# plt.title('Validation Accuracy per Trial')
# plt.xlabel('Trial')
# plt.ylabel('Val Accuracy')
# plt.grid(True)
# plt.show()

#1D CNN model

In [None]:
#%pip install tensorflow
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, Input

# Define the 1D CNN Model
# Best Hyperparameters:
# conv1_filters: 32
# conv1_kernel_size: 2
# dropout1_rate: 0.0
# conv2_filters: 16
# conv2_kernel_size: 3
# dense_units: 64
# dense_L2: 0.0
# dropout2_rate: 0.5
# learning_rate: 0.005

# Define input shape
input_shape = (X_train.shape[1], 1)  # (timesteps, features)
num_classes = len(attack_type_map) # Get the number of unique attack types for the output layer

# Build model
model = Sequential([
    Input(shape=input_shape),

    Conv1D(filters=32, kernel_size=2, activation='relu'), # Detect patterns in the network traffic data
    BatchNormalization(), # Normalizes the outputs of a the Conv1D layer before passing them to the MaxPool layer
    MaxPooling1D(pool_size=2), # Reduce the dimensions of the data without affecting key features
    Dropout(0.0), # Prevent overfitting by forcing the model to generalize - it does this by randomly deactivating a fraction of neurons during training

    Conv1D(filters=16, kernel_size=3, activation='relu'),
    BatchNormalization(),
    MaxPooling1D(pool_size=2),

    Flatten(), # Converts the output of the last Conv1D layer into a 1D vector for the fully connected layers
    Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0)), # Apply L2 regularisation to prevent overfitting (common in the dense layer) #https://medium.com/@bhatadithya54764118/day-49-overfitting-and-underfitting-in-dl-regularization-techniques-8ded20baa3d6
    Dropout(0.5), # Randomly drop 50% of the network's neurons to further prevent overfitting
    Dense(num_classes, activation='softmax')  # Final output layer for multiclass classification
])

# Compile
model.compile( optimizer = tf.keras.optimizers.Adam(learning_rate=0.005),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy']) # Loss function measures how well the model’s predictions match true labels #'adam'

# Summary
model.summary()

#Train 1D CNN

- Major problems with class imbalance

In [None]:
import matplotlib.pyplot as plt
import timeit
from tensorflow.keras.callbacks import EarlyStopping

# --- Early-stopping --- #TESTING
# https://medium.com/@piyushkashyap045/early-stopping-in-deep-learning-a-simple-guide-to-prevent-overfitting-1073f56b493e
# Early-stopping is a regularisation technique that prevents overfitting by stopping the training process when the model’s performance on the validation dataset starts degrading
# Stopping early reduces training time and computational costs

early_stopping = EarlyStopping(monitor='val_loss', # Specify monitor='val_loss' to track the validation loss.
                               patience=3, #the number of epochs to wait for an improvement
                               restore_best_weights=True) # Revert the model to its state where it outputted the lowest validation loss

# Train the model with early stopping
start_time = timeit.default_timer()
history = model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=64,
    validation_data=(X_val, y_val),
    callbacks=[early_stopping],
    verbose=1
)


end_time = timeit.default_timer()
print(f"Training time: {end_time - start_time:.2f} seconds")


# Plot Accuracy and Loss
plt.figure(figsize=(12,5))

# Accuracy plot
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Loss plot
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()



#K-fold Cross Validation 1
https://media.datacamp.com/legacy/v1718738336/image_0bb32b40f1.jpg


Stratified K-fold Cross Validation

In [None]:
# from sklearn.model_selection import StratifiedKFold
# from statistics import mean, stdev
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, Input
# from tensorflow.keras.callbacks import EarlyStopping
# import matplotlib.pyplot as plt

# X = cicids2017_df.drop('Attack Type', axis=1)
# y = cicids2017_df['Attack Type']

# skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
# accuracy_scores = []

# for fold, (train_index, test_index) in enumerate(skf.split(X, y)):
#     print(f"\n--- Fold {fold+1} ---")

#     # Split data
#     x_train_fold, x_test_fold = X.iloc[train_index], X.iloc[test_index]
#     y_train_fold, y_test_fold = y.iloc[train_index], y.iloc[test_index]


#     #Normalise
#     x_train_fold = scaler.fit_transform(x_train_fold)
#     x_test_fold = scaler.transform(x_test_fold)

#     #Shape for CNN Input
#     x_train_fold = x_train_fold.reshape((x_train_fold.shape[0], x_train_fold.shape[1], 1))
#     x_test_fold = x_test_fold.reshape((x_test_fold.shape[0], x_test_fold.shape[1], 1))

#     # Define model inside loop
#     model = Sequential([
#         Input(shape=(x_train_fold.shape[1], 1)),
#         Conv1D(filters=32, kernel_size=2, activation='relu'),
#         BatchNormalization(),
#         MaxPooling1D(pool_size=2),
#         Dropout(0.0),
#         Conv1D(filters=16, kernel_size=3, activation='relu'),
#         BatchNormalization(),
#         MaxPooling1D(pool_size=2),
#         Flatten(),
#         Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0)),
#         Dropout(0.5),
#         Dense(len(attack_type_map), activation='softmax')
#     ])

#     model.compile(
#         optimizer=tf.keras.optimizers.Adam(learning_rate=0.005),
#         loss='sparse_categorical_crossentropy',
#         metrics=['accuracy']
#     )

#     # Early stopping
#     early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

#     # Train
#     history = model.fit(
#         x_train_fold, y_train_fold,
#         epochs=10,
#         batch_size=64,
#         validation_data=(x_test_fold, y_test_fold),
#         callbacks=[early_stopping],
#         verbose=0
#     )

#     # Evaluate
#     loss, acc = model.evaluate(x_test_fold, y_test_fold, verbose=0)
#     print(f"Fold {fold+1} Accuracy: {acc:.4f}")
#     accuracy_scores.append(acc)

# # Final results
# print('\nList of accuracy scores:', accuracy_scores)
# print('Maximum Accuracy: {:.2f}%'.format(max(accuracy_scores) * 100))
# print('Minimum Accuracy: {:.2f}%'.format(min(accuracy_scores) * 100))
# print('Mean Accuracy: {:.2f}%'.format(mean(accuracy_scores) * 100))
# print('Standard Deviation: {:.4f}'.format(stdev(accuracy_scores)))

# K-Fold Cross Validation 2

Imports

In [None]:
# from sklearn.model_selection import StratifiedKFold
# from sklearn.preprocessing import StandardScaler
# from statistics import mean, stdev
# import numpy as np
# import tensorflow as tf
# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization, Input
# from tensorflow.keras.callbacks import EarlyStopping
# import matplotlib.pyplot as plt
# from sklearn.utils.class_weight import compute_class_weight



Define model creation

In [None]:
# def create_cnn_model(input_shape, num_classes):
#     model = Sequential([
#         Input(shape=input_shape),
#         Conv1D(filters=32, kernel_size=2, activation='relu'),
#         BatchNormalization(),
#         MaxPooling1D(pool_size=2),
#         Dropout(0.0),
#         Conv1D(filters=16, kernel_size=3, activation='relu'),
#         BatchNormalization(),
#         MaxPooling1D(pool_size=2),
#         Flatten(),
#         Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.0)),
#         Dropout(0.5),
#         Dense(num_classes, activation='softmax')
#     ])
#     model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.005),
#                   loss='sparse_categorical_crossentropy',
#                   metrics=['accuracy'])
#     return model

Cross validation loop

In [None]:
# from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
# import numpy as np

# X = cicids2017_df.drop('Attack Type', axis=1).values
# y = cicids2017_df['Attack Type'].values

# # Calculate weights from the original label vector
# class_weights = compute_class_weight(
#     class_weight='balanced',
#     classes=np.unique(y),
#     y=y
# )
# class_weight_dict = dict(enumerate(class_weights))

# skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
# fold = 1
# results = []

# for train_index, test_index in skf.split(X, y):
#     print(f"\nFold {fold}")

#     X_train_fold, X_test_fold = X[train_index], X[test_index]
#     y_train_fold, y_test_fold = y[train_index], y[test_index]

#     # Scaling
#     scaler = StandardScaler()
#     X_train_scaled = scaler.fit_transform(X_train_fold)
#     X_test_scaled = scaler.transform(X_test_fold)

#     # Reshape for 1D CNN
#     X_train_scaled = X_train_scaled.reshape((X_train_scaled.shape[0], X_train_scaled.shape[1], 1))
#     X_test_scaled = X_test_scaled.reshape((X_test_scaled.shape[0], X_test_scaled.shape[1], 1))

#     # Build model
#     model = create_cnn_model(input_shape=(X_train_scaled.shape[1], 1), num_classes=len(attack_type_map))

#     # Early stopping
#     early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

#     # Train
#     history = model.fit(X_train_scaled, y_train_fold,
#                         validation_split=0.1,
#                         epochs=10,
#                         batch_size=64,
#                         callbacks=[early_stopping],
#                         class_weight=class_weight_dict,
#                         verbose=0)

#     # Predict & evaluate
#     y_pred_probs = model.predict(X_test_scaled)
#     y_pred = np.argmax(y_pred_probs, axis=1)

#     acc = accuracy_score(y_test_fold, y_pred)
#     print(f"Fold {fold} Accuracy: {acc:.4f}")
#     missing_preds = np.setdiff1d(np.unique(y_test_fold), np.unique(y_pred))
#     print(f"Classes not predicted in Fold {fold}: {missing_preds}")
#     results.append({
#         'fold': fold,
#         'accuracy': acc,
#         'report': classification_report(y_test_fold, y_pred, output_dict=True),
#         'confusion_matrix': confusion_matrix(y_test_fold, y_pred)
#     })

#     fold += 1

Summarise results

In [None]:
# import seaborn as sns

# # Average accuracy
# avg_acc = np.mean([r['accuracy'] for r in results])
# print(f"\nAverage Accuracy over 5 folds: {avg_acc:.4f}")

# # Summary of results
# for r in results:
#     fold = r['fold']
#     acc = r['accuracy']
#     f1 = r['report']['macro avg']['f1-score']
#     print(f"Fold {fold}: Accuracy = {acc:.4f}, Macro F1 = {f1:.4f}")

# # Print missing classes
# reverse_attack_type_map = {v: k for k, v in attack_type_map.items()}

# for r in results:
#     fold = r['fold']
#     cm = r['confusion_matrix']
#     pred_totals = cm.sum(axis=0)

#     missing_classes = [i for i, val in enumerate(pred_totals) if val == 0]
#     missing_class_names = [reverse_attack_type_map[i] for i in missing_classes]

#     if missing_class_names:
#         print(f"Fold {fold} missed predicting classes: {missing_class_names}")

# for r in results:
#     fold = r['fold']
#     cm = r['confusion_matrix']

#     # Plot the heatmap
#     plt.figure(figsize=(8, 6))
#     sns.heatmap(cm, annot=True, fmt="d", cmap="Blues",
#                 xticklabels=[reverse_attack_type_map[i] for i in range(cm.shape[0])],
#                 yticklabels=[reverse_attack_type_map[i] for i in range(cm.shape[0])])
#     plt.title(f"Fold {fold} Confusion Matrix")
#     plt.xlabel("Predicted")
#     plt.ylabel("True")
#     plt.tight_layout()
#     plt.show()

# # Plot accuracy across all folds
# folds = [r['fold'] for r in results]
# accuracies = [r['accuracy'] for r in results]

# plt.plot(folds, accuracies, marker='o')
# plt.title("Accuracy Across Folds")
# plt.xlabel("Fold")
# plt.ylabel("Accuracy")
# plt.grid(True)
# plt.show()

# Evaluate 1D CNN Malware Detection Results

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import seaborn as sns # Import seaborn for heatmap plotting
import numpy as np # Import numpy
import matplotlib.pyplot as plt # Import matplotlib for plotting
import tensorflow as tf # Import tensorflow

# Get models predictions of x_test dataset
y_pred_probs = model.predict(X_test)

# For multiclass classification, the prediction is the class with the highest probability
y_pred = np.argmax(y_pred_probs, axis=1)

# --- Confusion matrix ---
# For multiclass, confusion_matrix directly handles the true and predicted labels
cm = confusion_matrix(y_test, y_pred)

print("\nConfusion Matrix:")

# --- Heatmap ---
# Get the original attack type names from the mapping
reverse_attack_type_map = {v: k for k, v in attack_type_map.items()}
labels = [reverse_attack_type_map.get(i, f'Unknown {i}') for i in range(cm.shape[0])] # Use matrix shape for label count

# Plot heatmap
plt.figure(figsize=(8, 6)) # Adjust figure size
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
plt.title("Multiclass Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.show()


# Accuracy scores
print("\nAccuracy:")
print("sklearn Accuracy:", accuracy_score(y_test, y_pred))

# --- Debugging step: Print the shape of X_test ---
print(f"Shape of X_test before model.evaluate: {X_test.shape}")

test_loss, test_acc = model.evaluate(X_test, y_test, verbose=0)# Evaluate model with test set
print(f"model.evaluate Accuracy: {test_acc:.4f}")

# Classification report (accuracy, precision, recall, F1)
print("\nClassification Report:")
# target_names should be the actual class names
report = classification_report(y_test, y_pred, target_names=labels, output_dict=True)
print(classification_report(y_test, y_pred, target_names=labels))

# --- Custom Metrics for Malware vs Benign (Normal Traffic) ---
normal_traffic_label = attack_type_map.get('Normal Traffic', None)

if normal_traffic_label is not None:
    # True Positives for Malware = sum of diagonals excluding 'Normal Traffic'
    malware_identified_count = np.sum(np.diag(cm)) - cm[normal_traffic_label, normal_traffic_label]

    # Total Malware = sum of all non-'Normal Traffic' samples
    total_malware_count = np.sum(cm) - np.sum(cm[normal_traffic_label, :])

    # % of Malware Identified
    percentage_malware_identified = (malware_identified_count / total_malware_count) * 100 if total_malware_count > 0 else 0

    # False Positives = Non-'Normal' samples predicted as 'Normal'
    benign_not_identified_count = np.sum(cm[normal_traffic_label, :]) - cm[normal_traffic_label, normal_traffic_label]
    total_benign_count = np.sum(cm[normal_traffic_label, :])

    # % of Benign Traffic Misclassified
    percentage_benign_not_identified = (benign_not_identified_count / total_benign_count) * 100 if total_benign_count > 0 else 0

    print(f"Total Malware Samples: {total_malware_count}")
    print(f"Malware Identified (True Positives): {malware_identified_count}")
    print(f"Percentage of Malware Identified: {percentage_malware_identified:.2f}%")
    print(f"Total Benign Samples: {total_benign_count}")
    print(f"Benign Misclassified as Malware (False Positives): {benign_not_identified_count}")
    print(f"Percentage of Benign Misclassified: {percentage_benign_not_identified:.2f}%")

    # --- Calculate and print TP, TN, FP, FN for Malware vs Benign ---
    # For binary classification (Malware vs Benign):
    # TP: Malware correctly predicted as Malware (malware_identified_count)
    # TN: Benign correctly predicted as Benign (cm[normal_traffic_label, normal_traffic_label])
    # FP: Benign incorrectly predicted as Malware (benign_not_identified_count)
    # FN: Malware incorrectly predicted as Benign (Total Malware Samples - Malware Identified)

    tp = malware_identified_count
    tn = cm[normal_traffic_label, normal_traffic_label]
    fp = benign_not_identified_count
    fn = total_malware_count - malware_identified_count

    print("\nTP, TN, FP, FN for Malware vs Benign:")
    print(f"True Positives (TP): {tp}")
    print(f"True Negatives (TN): {tn}")
    print(f"False Positives (FP): {fp}")
    print(f"False Negatives (FN): {fn}")

# --- Percentage of each malware class correctly identified ---
    print("\nPercentage of each Malware Class Correctly Identified:")
    for i, label in enumerate(labels):
        if i != normal_traffic_label: # Exclude 'Normal Traffic'
            correctly_identified = cm[i, i]
            total_in_class = np.sum(cm[i, :])
            percentage_identified = (correctly_identified / total_in_class) * 100 if total_in_class > 0 else 0
            print(f"{label}: {percentage_identified:.2f}%")

#Save best baseline model

In [None]:
# import pickle
# import os

# # Define the directory path
# save_dir = '/content/drive/MyDrive/Colab Notebooks/Honours Project'

# # Create the directory if it doesn't exist
# os.makedirs(save_dir, exist_ok=True)

# # Save the trained model
# model_path = os.path.join(save_dir, 'Best_Baseline.keras')
# model.save(model_path)  # Native Keras format
# print(f"Model saved as {model_path}")


#Quantisation of Baseline model

Load in Best Baseline model

In [None]:
import tensorflow as tf
from tensorflow.keras.models import load_model

# --- Load baseline model ---
model = tf.keras.models.load_model(
    '/content/drive/MyDrive/Colab Notebooks/Honours Project/Baseline CNN Models/Best_Baseline.keras'
)
model_name_prefix = '/content/drive/MyDrive/Colab Notebooks/Honours Project/Baseline CNN Models/Best_Baseline'

Weight-only quantisation

In [None]:
# --- Float32 baseline (no quantization) ---
converter = tf.lite.TFLiteConverter.from_keras_model(model)
quantModel_f32 = converter.convert()

# --- Weight-only quantization (int8 weights, float32 activations) ---
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]  # triggers int8 weight quantization
quantModel_int8_weights = converter.convert()

# --- Weight-only quantization (float16 weights, float32 activations) ---
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]  # store weights as float16
quantModel_fp16_weights = converter.convert()

# --- Save models ---
with open(model_name_prefix + '_float32.tflite', 'wb') as f:
    f.write(quantModel_f32)

with open(model_name_prefix + '_int8_weights.tflite', 'wb') as f:
    f.write(quantModel_int8_weights)

with open(model_name_prefix + '_fp16_weights.tflite', 'wb') as f:
    f.write(quantModel_fp16_weights)


Evaluation of weight-only quant models (Dynamic Range Quantization)

In [None]:
# --- Imports ---
import os
import psutil
import time
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import tensorflow as tf

In [None]:
# --- Resource usage measurement function ---
def measure_resources(tflite_model, X_sample, model_name, save_path="models"):

    os.makedirs(save_path, exist_ok=True)
    model_file = os.path.join(save_path, f"{model_name.replace(' ', '_')}.tflite")
    with open(model_file, "wb") as f:
        f.write(tflite_model)

    storage_size_mb = os.path.getsize(model_file) / (1024 * 1024) # Get size of model in megabytes

    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    process = psutil.Process(os.getpid())
    mem_before = process.memory_info().rss / (1024 * 1024)  # MB
    cpu_before = psutil.cpu_percent(interval=None)

    start_time = time.time()
    input_data = np.expand_dims(X_sample, axis=0).astype(input_details[0]['dtype']) # Input data is only a single sample from the test set
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    _ = interpreter.get_tensor(output_details[0]['index'])
    end_time = time.time()

    mem_after = process.memory_info().rss / (1024 * 1024)
    cpu_after = psutil.cpu_percent(interval=None)

    memory_used_mb = mem_after - mem_before
    cpu_usage_percent = cpu_after - cpu_before
    inference_time_sec = end_time - start_time

    print(f"\n--- Resource Usage for {model_name} ---")
    print(f"Storage size: {storage_size_mb:.2f} MB")s
    print(f"Memory used during inference: {memory_used_mb:.2f} MB")
    print(f"CPU usage change: {cpu_usage_percent:.2f}%")
    print(f"Inference time: {inference_time_sec:.4f} sec")

    # return {
    #     "model_name": model_name,
    #     "storage_mb": storage_size_mb,
    #     "memory_mb": memory_used_mb,
    #     "cpu_percent": cpu_usage_percent,
    #     "inference_time_sec": inference_time_sec
    # }

In [None]:
# --- Evaluate Quant Models ---

# List of TFLite models
tflite_models = [
    {"model": quantModel_f32, "name": "Float32 Model"},
    {"model": quantModel_int8_weights, "name": "Int8 Weights-Only Model"},
    {"model": quantModel_fp16_weights, "name": "Float16 Weights-Only Model"}
]

# Evaluate the models
for m in tflite_models:

    tflite_model = m["model"]
    model_name = m["name"]
    print(f"\n --- Evaluating: {model_name} ---")

    # Loads the TFLite model and gets it ready to make predictions
    interpreter = tf.lite.Interpreter(model_content=tflite_model) # Create interpreter object that will read and run the TFLite model
    interpreter.allocate_tensors() # Make the interpreter allocate memory
    input_details = interpreter.get_input_details() # Get expected shape and data type of the data the model needs to evaluate (built-in method)
    output_details = interpreter.get_output_details() #Sshape and data type the model will need to produce the results in

    # Get model predictions for test sample
    y_pred_probs = []

    for i in range(len(X_test)): # Interpreter object does not have a built-in .evaluate() method like the Keras Model object does, therefore need to go through every sample manually
        input_data = np.expand_dims(X_test[i], axis=0).astype(input_details[0]['dtype']) # Add an extra dimension to the input (model expects input in batches, even if the batch size is 1) so the data type matches what the model expects
        interpreter.set_tensor(input_details[0]['index'], input_data) #Feed a single sample of input data into the TFLite interpreter
        interpreter.invoke() # Tell the interpreter to run the model on the input data
        output_data = interpreter.get_tensor(output_details[0]['index']) # Get the output from the model
        y_pred_probs.append(output_data[0]) # Add the raw output to the list

    y_pred_probs = np.array(y_pred_probs) # Convert the list to a single NumPy array
    y_pred = np.argmax(y_pred_probs, axis=1) # Get the predicted class label for each sample


In [None]:
# --- Confusion Matrix ---
    cm = confusion_matrix(y_test, y_pred)
    reverse_attack_type_map = {v: k for k, v in attack_type_map.items()}
    labels = [reverse_attack_type_map.get(i, f'Unknown {i}') for i in range(cm.shape[0])]

    plt.figure(figsize=(8, 4)) # Create confusion matrix plot
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.title(f"{model_name} - Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    # Print accuracy & classification report
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Classification Report:")
    print(classification_report(y_test, y_pred, target_names=labels))

    # Calculate malware vs benign sample metrics
    normal_label = attack_type_map.get('Normal Traffic', None)
    if normal_label is not None:
        malware_identified = np.sum(np.diag(cm)) - cm[normal_label, normal_label]
        total_malware = np.sum(cm) - np.sum(cm[normal_label, :])
        percentage_malware_identified = (malware_identified / total_malware * 100) if total_malware > 0 else 0

        benign_misclassified = np.sum(cm[normal_label, :]) - cm[normal_label, normal_label]
        total_benign = np.sum(cm[normal_label, :])
        percentage_benign_misclassified = (benign_misclassified / total_benign * 100) if total_benign > 0 else 0

        print(f"Malware Identified: {malware_identified}/{total_malware} ({percentage_malware_identified:.2f}%)")
        print(f"Benign Misclassified: {benign_misclassified}/{total_benign} ({percentage_benign_misclassified:.2f}%)")
        print(f"TP: {malware_identified}, TN: {cm[normal_label, normal_label]}, FP: {benign_misclassified}, FN: {total_malware - malware_identified}")

    # execute resource usage measurement function
    # _ = measure_resources(tflite_model, X_test[0], model_name)
    measure_resources(tflite_model, X_test[0], model_name)


Full integer Quantisation of the model

In [None]:
# --- Imports ---

import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras import regularizers
import numpy as np # Import numpy for representative dataset

import os

In [None]:
# --- Representitive Dataset ---

def representative_dataset_gen():
    # Use a small subset of your training data as the representative dataset
    # The size of the subset can be adjusted (e.g., 100 samples)
    num_samples = 100
    for i in range(num_samples):
        # Ensure the data type matches the model's input type (usually float32 for the original model)
        # and the shape matches the model's input shape (excluding the batch dimension)
        yield [X_train[i:i+1].astype(np.float32)] # Yield a list of numpy arrays

In [None]:
# --- Full Integer Model Quantization ---

# Create a float32 optimized model (no quantization, just optimization)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] # Apply default optimizations
converter.target_spec.supported_types = [tf.float32] # Specify target data type
quantModel_f32 = converter.convert() # Convert the model


# Create an int16 quantized model (requires representative dataset)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] # Apply default optimizations
converter.target_spec.supported_types = [tf.int16] # Specify target data type as int16
converter.representative_dataset = representative_dataset_gen # Provide the representative dataset
quantModel_int16 = converter.convert() # Convert the model


# Create an int8 quantized model (requires representative dataset)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT] # Apply default optimizations
converter.target_spec.supported_types = [tf.int8] # Specify target data type as int8
converter.representative_dataset = representative_dataset_gen # Provide the representative dataset
quantModel_int8 = converter.convert() # Convert the model

# # Create an int4 quantized model -> NOT POSSIBLE
# converter = tf.lite.TFLiteConverter.from_keras_model(model)
# converter.optimizations = [tf.lite.Optimize.DEFAULT] # Apply default optimizations
# converter.target_spec.supported_types = [tf.int4] # Specify target data type as int4
# converter.inference_input_type = tf.int4 # Specify input type for inference
# converter.inference_output_type = tf.int4 # Specify output type for inference
# quantModel_int4 = converter.convert() # Convert the model


# --- Save models ---
os.makedirs('models', exist_ok=True)
with open(model_name_prefix + '_float32_optimized.tflite', 'wb') as f: # Added _optimized to filename
    f.write(quantModel_f32)
with open(model_name_prefix + '_int16_full.tflite', 'wb') as f: # Added _full to filename
    f.write(quantModel_int16)
with open(model_name_prefix + '_int8_full.tflite', 'wb') as f: # Added _full to filename
    f.write(quantModel_int8)

Saved artifact at '/tmp/tmpayxerxdm'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(None, 52, 1), dtype=tf.float32, name='input_layer')
Output Type:
  TensorSpec(shape=(None, 7), dtype=tf.float32, name=None)
Captures:
  133997749130000: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726000784: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726000400: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726000208: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726000592: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726001360: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726000976: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997725995408: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997726001744: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997725999056: TensorSpec(shape=(), dtype=tf.resource, name=None)
  133997725999248: Tens

ValueError: For full integer quantization, a `representative_dataset` must be specified.

Evaluation of quant models

In [None]:
# -- Imports ---
import os
import psutil
import time
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import tensorflow as tf

In [None]:
# --- List of TFLite models ---
tflite_models = [
    {"model": quantModel_f32, "name": "Float32 Optimized Model"},
    {"model": quantModel_int16, "name": "Int16 Quantized Model"},
    {"model": quantModel_int8, "name": "Int8 Quantized Model"}
]

# --- Evaluate all models with metrics + resource tracking ---
for m in tflite_models:
    tflite_model = m["model"]
    model_name = m["name"]
    print(f"\n=== Evaluating: {model_name} ===")

    interpreter = tf.lite.Interpreter(model_content=tflite_model)
    interpreter.allocate_tensors()
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()

    # Predictions
    y_pred_probs = []
    for i in range(len(X_test)):
        input_data = np.expand_dims(X_test[i], axis=0).astype(input_details[0]['dtype'])
        interpreter.set_tensor(input_details[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_details[0]['index'])
        y_pred_probs.append(output_data[0])

    y_pred_probs = np.array(y_pred_probs)
    y_pred = np.argmax(y_pred_probs, axis=1)

NameError: name 'quantModel_int8' is not defined

In [None]:
# --- Confusion Matrix ---
    cm = confusion_matrix(y_test, y_pred)
    reverse_attack_type_map = {v: k for k, v in attack_type_map.items()}
    labels = [reverse_attack_type_map.get(i, f'Unknown {i}') for i in range(cm.shape[0])]

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.title(f"{model_name} - Confusion Matrix")
    plt.xlabel("Predicted")
    plt.ylabel("True")
    plt.show()

    # Accuracy & classification report
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print("Classification Report:")
    print(classification_report(y_test, y_pred, target_names=labels))

    # Malware vs Benign metrics
    normal_label = attack_type_map.get('Normal Traffic', None)
    if normal_label is not None:
        malware_identified = np.sum(np.diag(cm)) - cm[normal_label, normal_label]
        total_malware = np.sum(cm) - np.sum(cm[normal_label, :])
        percentage_malware_identified = (malware_identified / total_malware * 100) if total_malware > 0 else 0

        benign_misclassified = np.sum(cm[normal_label, :]) - cm[normal_label, normal_label]
        total_benign = np.sum(cm[normal_label, :])
        percentage_benign_misclassified = (benign_misclassified / total_benign * 100) if total_benign > 0 else 0

        print(f"Malware Identified: {malware_identified}/{total_malware} ({percentage_malware_identified:.2f}%)")
        print(f"Benign Misclassified: {benign_misclassified}/{total_benign} ({percentage_benign_misclassified:.2f}%)")
        print(f"TP: {malware_identified}, TN: {cm[normal_label, normal_label]}, FP: {benign_misclassified}, FN: {total_malware - malware_identified}")

    # Track resources
    measure_tflite_resources(tflite_model, X_test[0], model_name)