<a href="https://colab.research.google.com/github/NefChr/Test7/blob/main/IPS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.models import Sequential
from tensorflow.keras.utils import plot_model, to_categorical

epochs = 100
nclass = 12

In [28]:
def loadDataset():
    filename = 'https://raw.githubusercontent.com/kdemertzis/EKPA/main/Data/pcap_data.csv'
    trainfile = pd.read_csv(filename)
    data = pd.DataFrame(trainfile).to_numpy()
    data = data[data[:, 67] != 'DrDoS_LDAP']
    np.random.shuffle(data)

    label = data[:, 67].astype('str')
    label_map = {
        'WebDDoS': 0,
        'BENIGN': 1,
        'UDP-lag': 2,
        'DrDoS_NTP': 3,
        'Syn': 4,
        'DrDoS_SSDP': 5,
        'DrDoS_UDP': 6,
        'DrDoS_NetBIOS': 7,
        'DrDoS_MSSQL': 8,
        'DrDoS_SNMP': 9,
        'TFTP': 10,
        'DrDoS_DNS': 11
    }
    label = np.array([label_map[l] for l in label])

    inx_sel = np.array([38, 47, 37, 48, 11, 9, 7, 52, 10, 36, 1, 34, 4, 17, 19, 57, 21,
                        18, 22, 24, 32, 50, 23, 55, 51, 5, 3, 39, 40, 43, 58, 12, 25,
                        20, 2, 35, 67, 33, 6, 53]) - 1

    data = data[:, inx_sel]
    dmin = data.min(axis=0)
    dmax = data.max(axis=0)
    data = (data - dmin) / (dmax - dmin)

    train_data, test_data, train_label, test_label = \
        train_test_split(data, label, test_size=0.20, stratify=label)

    train_data, val_data, train_label, val_label = \
        train_test_split(train_data, train_label, test_size=0.125, stratify=train_label)

    return train_data.astype('float32'), train_label.astype('int32'), \
        val_data.astype('float32'), val_label.astype('int32'), \
        test_data.astype('float32'), test_label.astype('int32')


In [None]:
train_data, train_labelp, val_data, val_labelp, test_data, test_labelp = loadDataset()

train_label = to_categorical(train_labelp, nclass)
val_label = to_categorical(val_labelp, nclass)
test_label = to_categorical(test_labelp, nclass)

In [None]:
inshape = train_data.shape[1]

class_weights = class_weight.compute_class_weight(class_weight='balanced',
                                                  classes=np.unique(train_labelp),
                                                  y=train_labelp)
class_weights = {i: class_weights[i] for i in range(len(class_weights))}

earlyStopping = EarlyStopping(monitor='val_loss', patience=30, verbose=0, mode='min')

modelCheckPoint = ModelCheckpoint('./savemodels/model5class.weights.{epoch:03d}-{val_acc:.4f}.hdf5',
                                  save_best_only=True, monitor='val_acc', mode='max')

model = Sequential([
    Conv1D(64, 3, activation='relu', input_shape=(inshape, 1)),
    MaxPooling1D(2),
    Conv1D(64, 3, activation='relu'),
    MaxPooling1D(2),
    Flatten(),
    Dense(64, activation='relu'),
    Dropout(0.5),
    Dense(nclass, activation='softmax')
])

In [None]:
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

history = model.fit(train_data.reshape(-1, inshape, 1),
                    train_label,
                    shuffle=True,
                    epochs=epochs,
                    batch_size=256,
                    validation_data=(val_data.reshape(-1, inshape, 1), val_label),
                    callbacks=[modelCheckPoint],
                    class_weight=class_weights,
                    workers=3)

str_models = os.listdir('./savemodels')
str_models = np.sort(str_models)
best_model = str_models[-1]
model.load_weights('./savemodels/' + best_model)

print('TEST DATA-Confusion matrix:')
pred = model.predict(test_data.reshape(-1, inshape, 1))
pred_y = pred.argmax(axis=-1)

cm = confusion_matrix(test_labelp.astype('int32'), pred_y)
print(cm)

label = np.array(["WebDDoS", "BENIGN", "UDP-lag", "DrDoS_NTP", "Syn",
                  "DrDoS_SSDP", "DrDoS_UDP", "DrDoS_NetBIOS", "DrDoS_MSSQL",
                  "DrDoS_SNMP", "TFTP", "DrDoS_DNS"])

print('Accuracy ratios for each class')
for i in range(nclass):
    print(label[i], '=', cm[i, i] / np.sum(cm[i, :]))

In [None]:
# Blocking traffic simulation
threshold = 0.5
malicious_indices = np.where(pred.max(axis=1) > threshold)[0]
print("Malicious traffic indices:", malicious_indices)

In [None]:
# Simulate blocking action
for idx in malicious_indices:
    print("Blocking traffic with index:", idx)

In [None]:
# Plotting
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']

In [None]:
plt.figure()
epochs = range(len(acc))
plt.plot(epochs, acc, 'b', label='Training acc')
plt.plot(epochs, val_acc, 'r.', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.figure()
plt.plot(epochs, loss, 'b', label='Training loss')
plt.plot(epochs, val_loss, 'r.', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()