# Load Dataset and Preprocess

In [7]:
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
import numpy as np
import pandas as pd
from sklearn.model_selection import StratifiedKFold, GridSearchCV, train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.metrics import accuracy_score



# Load your dataset
# Assuming your dataset is in a CSV file named 'Dataset.csv'
data = pd.read_csv('input/Dataset.csv')
# Process Data

# Preprocess Labels
label_column = ['NST_M_Label']
label_columns_all = ['IT_B_Label', 'IT_M_Label', 'NST_B_Label', 'NST_M_Label']

# Encode labels to numerical values
label_encoder_y = LabelEncoder()
y_encoded = label_encoder_y.fit_transform(data[label_column])

# Convert labels to one-hot encoding
y_one_hot = pd.get_dummies(y_encoded).values

# Drop all label columns
data = data.drop(columns=label_columns_all)


# Preprocess Data

from ColumnDropperTransformer import ColumnDropperTransformer
from sklearn.compose import ColumnTransformer

unused_columns = ['sAddress', 'rAddress', 'sMACs', 'rMACs', 'sIPs', 'rIPs', 'startDate', 'endDate', 'start', 'end', 'startOffset', 'endOffset']
categorical_columns = ['protocol']

categorial_transformer = ColumnTransformer(
    [
        # Due to some indexing problem categorical_columns = ['protocol'] transfered via [0] index of 0
        ('ohe_encoder', OneHotEncoder(sparse_output=False, handle_unknown='ignore'),[0])
    ], remainder='passthrough'
    )

preprocess = Pipeline([
    ('drop_unused' , ColumnDropperTransformer(unused_columns)),
    ('fill_missing', SimpleImputer(strategy='constant', fill_value=0)),
    ('transform_categorial',categorial_transformer),
    ('normalizer', StandardScaler())
])

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(data, y_one_hot, test_size=0.2, random_state=42, stratify=y_encoded)

# Preprocessing pipeline
X_train = preprocess.fit_transform(X_train)
X_test = preprocess.transform(X_test)

  y = column_or_1d(y, warn=True)


# Simple Test with predefined model

In [8]:
from sklearn.metrics import classification_report, confusion_matrix
from scikeras.wrappers import KerasClassifier
from ModelCreator import create_model

model = KerasClassifier(model=create_model, input_dim=X_train.shape[1], output_dim=y_train.shape[1], hidden_layer_size = 128, epochs=10, batch_size=32, verbose=0)
model.fit(X_train, y_train)


# Evaluate the model on the test set
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)
accuracy = accuracy_score(y_test.argmax(axis=1), y_pred_classes)
conf_matrix = confusion_matrix(y_test.argmax(axis=1), y_pred_classes)
print(f'Test Accuracy: {accuracy}')
print("Confusion Matrix:")
print(conf_matrix)

Test Accuracy: 0.9782370953630796
Confusion Matrix:
[[7327    0    3    6    5    0]
 [   2  385    0    0    0    0]
 [   0    0   32    0    6    0]
 [   5    0   18  484    9    1]
 [  16    0   32    1  340    0]
 [  46    0    7   39    3  377]]


# Search for best parameters

In [3]:
from scikeras.wrappers import KerasClassifier
from ModelCreator import create_model


model = KerasClassifier(model=create_model, input_dim=X_train.shape[1], output_dim=y_train.shape[1], epochs=10, batch_size=32, verbose=2)
param_grid = {
    'model__hidden_layer_size': [32, 64, 128],
    'model__activation': ['relu', 'tanh', 'sigmoid'],
    # 'model__alpha': [0.0001, 0.001, 0.01]
}

grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=3, verbose=2)
grid_result = grid_search.fit(X_train, y_train)

print(f"Best Hyperparameters: {grid_result.best_params_}")
best_model = grid_result.best_estimator_


# Make predictions on the test set
y_pred = best_model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

# Inverse transform one-hot encoded predictions to original labels
# y_pred_labels = label_encoder.inverse_transform(y_pred_classes)

# Evaluate the model on the test set
accuracy = accuracy_score(y_test.argmax(axis=1), y_pred_classes)
print(f'Test Accuracy: {accuracy}')

Fitting 3 folds for each of 9 candidates, totalling 27 fits
Epoch 1/10
762/762 - 2s - loss: 0.3382 - accuracy: 0.9044 - 2s/epoch - 2ms/step
Epoch 2/10
762/762 - 1s - loss: 0.1442 - accuracy: 0.9544 - 1s/epoch - 2ms/step
Epoch 3/10
762/762 - 1s - loss: 0.1152 - accuracy: 0.9624 - 1s/epoch - 2ms/step
Epoch 4/10
762/762 - 1s - loss: 0.0973 - accuracy: 0.9650 - 1s/epoch - 2ms/step
Epoch 5/10
762/762 - 1s - loss: 0.0858 - accuracy: 0.9665 - 1s/epoch - 2ms/step
Epoch 6/10
762/762 - 1s - loss: 0.0770 - accuracy: 0.9719 - 1s/epoch - 2ms/step
Epoch 7/10
762/762 - 2s - loss: 0.0712 - accuracy: 0.9754 - 2s/epoch - 2ms/step
Epoch 8/10
762/762 - 1s - loss: 0.0665 - accuracy: 0.9767 - 1s/epoch - 2ms/step
Epoch 9/10
762/762 - 1s - loss: 0.0637 - accuracy: 0.9778 - 1s/epoch - 2ms/step
Epoch 10/10
762/762 - 1s - loss: 0.0627 - accuracy: 0.9775 - 1s/epoch - 2ms/step
381/381 - 1s - 615ms/epoch - 2ms/step
[CV] END model__activation=relu, model__hidden_layer_size=32; total time=  14.7s
Epoch 1/10
762/762 -

In [4]:
import joblib

# Assuming 'model' is your Keras model
joblib.dump(best_model, 'output/keras_classifier_model.joblib')
predict_model_pipeline = Pipeline([
    ('preprocess',preprocess),
    ('prediction' , best_model)
])
joblib.dump(predict_model_pipeline, 'output/predict_model.joblib')


INFO:tensorflow:Assets written to: C:\Users\ALIREZ~1\AppData\Local\Temp\tmp40pf16tv\assets


INFO:tensorflow:Assets written to: C:\Users\ALIREZ~1\AppData\Local\Temp\tmp40pf16tv\assets


INFO:tensorflow:Assets written to: C:\Users\ALIREZ~1\AppData\Local\Temp\tmp70ks0rje\assets


INFO:tensorflow:Assets written to: C:\Users\ALIREZ~1\AppData\Local\Temp\tmp70ks0rje\assets


['output/predict_model.joblib']

# Test Saved Model

In [9]:
import joblib
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder, OneHotEncoder
from sklearn.metrics import accuracy_score, confusion_matrix
from ModelCreator import create_model

# Load dataset
data = pd.read_csv('input/Dataset.csv')
label_columns_all = ['IT_B_Label', 'IT_M_Label', 'NST_B_Label', 'NST_M_Label']
# Encode labels to numerical values
label_encoder_y = LabelEncoder()
y_encoded = label_encoder_y.fit_transform(data['NST_M_Label'])
y_one_hot = pd.get_dummies(y_encoded).values

data = data.drop(columns=label_columns_all)

model = joblib.load('output/predict_model.joblib')

y_pred = model.predict(data)
y_pred_classes = np.argmax(y_pred, axis=1)

# Evaluate the model on the test set
accuracy = accuracy_score(y_one_hot.argmax(axis=1), y_pred_classes)
conf_matrix = confusion_matrix(y_one_hot.argmax(axis=1), y_pred_classes)
print(f'Test Accuracy: {accuracy}')
print("Confusion Matrix:")
print(conf_matrix)

1429/1429 - 2s - 2s/epoch - 2ms/step
Test Accuracy: 0.9800516208058095
Confusion Matrix:
[[36667     0     8    17     7     7]
 [    1  1933     0     0     0     0]
 [    0     0   144    24    24     0]
 [   26     0    97  2437    16     8]
 [  119     0   128    21  1676     0]
 [  143     0    51   207     8  1949]]
