In [None]:
import pandas as pd
import numpy as np
import time
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Conv1D, MaxPooling1D, Flatten, LSTM, GRU, Bidirectional, Average, Input
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from tensorflow.keras.utils import plot_model

In [None]:
df = pd.read_csv("Edge-IIoTset_112.csv", low_memory = False)

In [None]:
df.info()

In [None]:
print(df['Attack_type'].value_counts())

In [None]:
# Creating a dictionary of Types
attacks = {'Normal': 0,'MITM': 1, 'Uploading': 2, 'Ransomware': 3, 'SQL_injection': 4,
       'DDoS_HTTP': 5, 'Password': 6, 'Backdoor': 7, 'XSS': 8}
df['Attack_type'] = df['Attack_type'].map(attacks)

In [None]:
X = df.drop(columns=['Attack_label', 'Attack_type'])
y = df['Attack_label']

In [None]:
X = X.select_dtypes(include=[np.number])
# Apply the Chi-Squared test
chi_selector = SelectKBest(chi2, k='all')  # Set k to the desired number of features
X_kbest = chi_selector.fit_transform(X, y)

In [None]:
# Get the scores for each feature
chi_scores = chi_selector.scores_

# Combine scores with feature names
chi_scores = pd.DataFrame({'feature': X.columns, 'score': chi_scores})

# Sort the features by their scores
chi_scores = chi_scores.sort_values(by='score', ascending=False)

print(chi_scores)
print(len(chi_scores))

In [None]:
selected_features = chi_scores['feature'].tolist()[:79]  # Select top k features

In [None]:
# Split the data into train (80%) and test (20%) sets
X_train, X_test, y_train, y_test = train_test_split(X[selected_features], y, test_size=0.2, random_state=42)

# Split the training data further into train (80%) and validation (20%) sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.125, random_state=42)

In [None]:
print("X_train shape:", X_train.shape)
print("X_val shape:", X_val.shape)
print("X_test shape:", X_test.shape)

print("y_train shape:", y_train.shape)

In [None]:
scaler = StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

In [None]:
def cnn_lstm_gru_model(input_shape, num_classes):
    
    model_cnn_lstm_gru = Sequential([
        Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=input_shape),        
        MaxPooling1D(pool_size=2),
        
        Conv1D(filters=64, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        
        LSTM(64, return_sequences=True),
        GRU(64, return_sequences=False),
        
        Flatten(),
        
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='sigmoid')
    ])

    model_cnn_lstm_gru.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model_cnn_lstm_gru

input_shape = (X_train.shape[1], 1)
num_classes = 1
model_cnn_lstm_gru = cnn_lstm_gru_model(input_shape, num_classes)
model_cnn_lstm_gru.summary()
plot_model(model_cnn_lstm_gru)

In [None]:
train_start_time = time.time()
# Train the model
history_cnn_lstm_gru = model_cnn_lstm_gru.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=6, batch_size=32)
# Record the ending time
train_end_time = time.time()

# Record the starting time for testing
test_start_time = time.time()
# Evaluate the model
loss, accuracy = model_cnn_lstm_gru.evaluate(X_test, y_test, batch_size=32)
# Record the ending time for testing
test_end_time = time.time()

print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

# Calculate and print the training time
train_time = train_end_time - train_start_time
print(f"Training time: {train_time:.2f} seconds")

# Calculate and print the testing time
test_time = test_end_time - test_start_time
print(f"Testing time: {test_time:.2f} seconds")

In [None]:
def cnn_lstm_bigru_model(input_shape, num_classes):
    
    model = Sequential([
        Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=input_shape),        
        MaxPooling1D(pool_size=2),
        
        Conv1D(filters=64, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        
        LSTM(64, return_sequences=True),
        Bidirectional(GRU(64, return_sequences=False)),
        
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='sigmoid')
    ])

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

input_shape = (X_train.shape[1], 1)
num_classes = 1
model_cnn_lstm_bigru = cnn_lstm_bigru_model(input_shape, num_classes)
model_cnn_lstm_bigru.summary()
plot_model(model_cnn_lstm_bigru)

In [None]:
train_start_time = time.time()
# Train the model
history_cnn_lstm_bigru = model_cnn_lstm_bigru.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=6, batch_size=32)
# Record the ending time
train_end_time = time.time()

# Record the starting time for testing
test_start_time = time.time()
# Evaluate the model
loss, accuracy = model_cnn_lstm_bigru.evaluate(X_test, y_test, batch_size=32)
# Record the ending time for testing
test_end_time = time.time()

print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

# Calculate and print the training time
train_time = train_end_time - train_start_time
print(f"Training time: {train_time:.2f} seconds")

# Calculate and print the testing time
test_time = test_end_time - test_start_time
print(f"Testing time: {test_time:.2f} seconds")

In [None]:
def cnn_lstm_model(input_shape, num_classes):
    
    model = Sequential([
        Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=input_shape),        
        MaxPooling1D(pool_size=2),
        
        Conv1D(filters=64, kernel_size=3, activation='relu'),
        MaxPooling1D(pool_size=2),
        
        LSTM(64, return_sequences=False),
        
        Dense(128, activation='relu'),
        Dropout(0.5),
        Dense(num_classes, activation='sigmoid')
    ])

    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

input_shape = (X_train.shape[1], 1)
num_classes = 1
model_cnn_lstm = cnn_lstm_model(input_shape, num_classes)
model_cnn_lstm.summary()
plot_model(model_cnn_lstm)

In [None]:
train_start_time = time.time()
# Train the model
history_cnn_lstm = model_cnn_lstm.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=6, batch_size=32)
# Record the ending time
train_end_time = time.time()

# Record the starting time for testing
test_start_time = time.time()
# Evaluate the model
loss, accuracy = model_cnn_lstm.evaluate(X_test, y_test, batch_size=32)
# Record the ending time for testing
test_end_time = time.time()

print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

# Calculate and print the training time
train_time = train_end_time - train_start_time
print(f"Training time: {train_time:.2f} seconds")

# Calculate and print the testing time
test_time = test_end_time - test_start_time
print(f"Testing time: {test_time:.2f} seconds")

In [None]:
# Plot the training and validation accuracy over the epochs

# CNN-LSTM-GRU
plt.plot(history_cnn_lstm_gru.history['accuracy'])
plt.plot(history_cnn_lstm_gru.history['val_accuracy'])
plt.title('CNN-LSTM-GRU Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# CNN-LSTM-BiGRU
plt.plot(history_cnn_lstm_bigru.history['accuracy'])
plt.plot(history_cnn_lstm_bigru.history['val_accuracy'])
plt.title('CNN-LSTM-BiGRU Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# CNN-LSTM
plt.plot(history_cnn_lstm.history['accuracy'])
plt.plot(history_cnn_lstm.history['val_accuracy'])
plt.title('CNN-LSTM Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
# Plot the training and validation loss over the epochs

# CNN-LSTM-GRU
plt.plot(history_cnn_lstm_gru.history['loss'])
plt.plot(history_cnn_lstm_gru.history['val_loss'])
plt.title('CNN-LSTM-GRU Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# CNN-LSTM-BiGRU
plt.plot(history_cnn_lstm_bigru.history['loss'])
plt.plot(history_cnn_lstm_bigru.history['val_loss'])
plt.title('CNN-LSTM-BiGRU Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

# CNN-LSTM
plt.plot(history_cnn_lstm.history['loss'])
plt.plot(history_cnn_lstm.history['val_loss'])
plt.title('CNN-LSTM Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()

In [None]:
# Make predictions for each model
y_pred_cnn_lstm_gru = model_cnn_lstm_gru.predict(X_test)
y_pred_cnn_lstm_bigru = model_cnn_lstm_bigru.predict(X_test)
y_pred_cnn_lstm = model_cnn_lstm.predict(X_test)

In [None]:
# Print classification reports for each model
print(classification_report(y_test, np.round(y_pred_cnn_lstm_gru), target_names=['No Intrusion', 'Intrusion']))
print(classification_report(y_test, np.round(y_pred_cnn_lstm_bigru), target_names=['No Intrusion', 'Intrusion']))
print(classification_report(y_test, np.round(y_pred_cnn_lstm), target_names=['No Intrusion', 'Intrusion']))

In [None]:
# CNN-LSTM-GRU---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Compute the confusion matrix
conf_mat = confusion_matrix(y_test, np.round(y_pred_cnn_lstm_gru))
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Create a heatmap plot of the confusion matrix
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
# Set the plot labels and title
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('CNN-LSTM-GRUConfusion Matrix')
# Show the plot
plt.show()


# CNN-LSTM-BiGRU---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Compute the confusion matrix
conf_mat = confusion_matrix(y_test, np.round(y_pred_cnn_lstm_bigru))
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Create a heatmap plot of the confusion matrix
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
# Set the plot labels and title
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('CNN-LSTM-BiGRU Confusion Matrix')
# Show the plot
plt.show()


# CNN-LSTM---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Compute the confusion matrix
conf_mat = confusion_matrix(y_test, np.round(y_pred_cnn_lstm))
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Create a heatmap plot of the confusion matrix
sns.heatmap(conf_mat, annot=True, fmt='d', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
# Set the plot labels and title
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('CNN-LSTM Confusion Matrix')
# Show the plot
plt.show()
plt.savefig('con_max.jpg')

In [None]:
# CNN-LSTM-GRU---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Predict the test set
y_pred_gru = (y_pred_cnn_lstm_gru > 0.5).astype(int)
# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred_gru)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Print confusion matrix as heatmap with percentages
plt.figure(figsize=(8, 6))
sns.heatmap(cm_norm, annot=True, cmap='Blues', xticklabels=class_labels, yticklabels=class_labels, fmt='.2%')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('CNN-LSTM-GRU Normalized Confusion Matrix as Percentages')
plt.show()


# CNN-LSTM-BiGRU---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Predict the test set
y_pred_bigru = (y_pred_cnn_lstm_bigru > 0.5).astype(int)
# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred_bigru)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Print confusion matrix as heatmap with percentages
plt.figure(figsize=(8, 6))
sns.heatmap(cm_norm, annot=True, cmap='Blues', xticklabels=class_labels, yticklabels=class_labels, fmt='.2%')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('CNN-LSTM-BiGRU Normalized Confusion Matrix as Percentages')
plt.show()


# CNN-LSTM---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


# Predict the test set
y_pred_lstm = (y_pred_cnn_lstm > 0.5).astype(int)
# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred_lstm)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Print confusion matrix as heatmap with percentages
plt.figure(figsize=(8, 6))
sns.heatmap(cm_norm, annot=True, cmap='Blues', xticklabels=class_labels, yticklabels=class_labels, fmt='.2%')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('CNN-LSTM Normalized Confusion Matrix as Percentages')
plt.show()

In [None]:
num_classes = 1

# Get predictions for training sets
pred_cnn_lstm_bigru_train = model_cnn_lstm_bigru.predict(X_train)
pred_cnn_lstm_gru_train = model_cnn_lstm_gru.predict(X_train)

# Get predictions for validation sets
pred_cnn_lstm_bigru_val = model_cnn_lstm_bigru.predict(X_val)
pred_cnn_lstm_gru_val = model_cnn_lstm_gru.predict(X_val)

# Concatenate predictions for training sets
X_conc_train = np.concatenate([pred_cnn_lstm_bigru_train, pred_cnn_lstm_gru_train], axis=1)

# Concatenate predictions for validation sets
X_conc_val = np.concatenate([pred_cnn_lstm_bigru_val, pred_cnn_lstm_gru_val], axis=1)

# Concatenate predictions for test sets
X_conc_test = np.concatenate([y_pred_cnn_lstm_bigru, y_pred_cnn_lstm_gru], axis=1)

In [None]:
# Define the ensemble model
ensemble_model = Sequential([
    Dense(64, activation='relu', input_shape=(X_conc_train.shape[1],)),
    Dropout(0.5),
    Dense(num_classes, activation='sigmoid')
])

ensemble_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
# Train the ensemble model

train_start_time = time.time()
# Train the model
ensemble_model.fit(X_conc_train, y_train, validation_data=(X_conc_val, y_val), epochs=6, batch_size=32)
# Record the ending time
train_end_time = time.time()

# Record the starting time for testing
test_start_time = time.time()
# Evaluate the model
loss, accuracy = ensemble_model.evaluate(X_conc_test, y_test, batch_size=32)
# Record the ending time for testing
test_end_time = time.time()

print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')

# Calculate and print the training time
train_time = train_end_time - train_start_time
print(f"Training time: {train_time:.2f} seconds")

# Calculate and print the testing time
test_time = test_end_time - test_start_time
print(f"Testing time: {test_time:.2f} seconds")

In [None]:
# Get predictions from the ensemble model
y_prob = ensemble_model.predict(X_conc_test)

# Convert probabilities to binary predictions
final_pred = (y_prob > 0.5).astype(int).flatten()

# Print classification report for the ensemble model
print("Classification Report:")
print(classification_report(y_test, final_pred))

# confusion matrix
print("Confusion Matrix:")
cm = confusion_matrix(y_test, final_pred)
cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
# Define the class labels
class_labels = ['No Intrusion', 'Intrusion']
# Print confusion matrix as heatmap with percentages
plt.figure(figsize=(8, 6))
sns.heatmap(cm_norm, annot=True, cmap='Blues', xticklabels=class_labels, yticklabels=class_labels, fmt='.2%')
plt.xlabel('Predicted labels')
plt.ylabel('True labels')
plt.title('Ensemble Model Normalized Confusion Matrix as Percentages')
plt.show()