multi output LSTM (single) v1.2 (fix for automatic dataset, change of loss functions)

In [None]:
import pandas as pd
import numpy as np
from numpy import array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.utils import to_categorical
import time
import seaborn as sns
import matplotlib.pyplot as plt


# parameters
num_features = 10
scenario_length = 1000
num_attack_types = 3 + 1
num_attack_targets = 10 + 1

num_scenarios = 50

# hyper-parameter
seq_len = 40
seq_overlap = seq_len - 1
lstm_blocks = 128
epoch_val = 100
batch_size_val = 128

# 1. data preprocessing
data = pd.read_csv('dataset.csv')

# 1.1 normalization
labels = data[['label', 'type', 'target']]
data = data.drop(columns=['time', 'label', 'type', 'target'])
scaler = StandardScaler()
data_normalized = scaler.fit_transform(data)
data_normalized = pd.DataFrame(data_normalized, columns=data.columns)
data_normalized = pd.concat([data_normalized, labels], axis=1)


# 1.2 sequence generation
sequences = []
step_len = seq_len - seq_overlap

for s in range(0, num_scenarios):
    scenario_start = s * scenario_length
    for i in range(scenario_start, scenario_start + scenario_length - seq_len, step_len):
        sequence = data_normalized[i:i + seq_len]
        sequences.append(sequence)
data_sequences = array(sequences)

# 2. data preparation
# 2.1 train-test split
data_reshaped = data_sequences.reshape(data_sequences.shape[0], -1)
X_train, X_test = train_test_split(data_reshaped, test_size=0.25, random_state=42)
X_train = X_train.reshape(X_train.shape[0], data_sequences.shape[1], data_sequences.shape[2])
X_test = X_test.reshape(X_test.shape[0], data_sequences.shape[1], data_sequences.shape[2])

# 2.2 reshape data for LSTM network & label sequences
y_train_detection = X_train[:, -1, -3]
y_train_identification = X_train[:, -1, -2]
y_train_isolation = X_train[:, -1, -1]
X_train = X_train[:, :, :-3]

y_test_detection = X_test[:, -1, -3]
y_test_identification = X_test[:, -1, -2]
y_test_isolation = X_test[:, -1, -1]
X_test = X_test[:, :, :-3]

# convert to one-hot encoding
y_train_identification = to_categorical(y_train_identification, num_classes=num_attack_types)
y_test_identification = to_categorical(y_test_identification, num_classes=num_attack_types)
y_train_isolation = to_categorical(y_train_isolation, num_classes=num_attack_targets)
y_test_isolation = to_categorical(y_test_isolation, num_classes=num_attack_targets)

# 3. model creation
input_layer = Input(shape=(seq_len, num_features))
shared_lstm = LSTM(lstm_blocks)(input_layer)
output_detection = Dense(1, activation='sigmoid', name='detection_output')(shared_lstm)
output_identification = Dense(num_attack_types, activation='softmax', name='identification_output')(shared_lstm)
output_isolation = Dense(num_attack_targets, activation='softmax', name='isolation_output')(shared_lstm)

model = Model(inputs=input_layer, outputs=[output_detection, output_identification, output_isolation])

# 3.2 model compile
model.compile(
    loss={
        'detection_output': 'binary_crossentropy',
        'identification_output': 'kl_divergence',
        'isolation_output': 'kl_divergence'
    },
    optimizer='adam',
    metrics={
        'detection_output': ['accuracy'],
        'identification_output': ['accuracy'],
        'isolation_output': ['accuracy']
    }
)
print(model.summary())

# 3.3 model train
start_time = time.time()
model.fit(X_train, [y_train_detection, y_train_identification, y_train_isolation],
          epochs=epoch_val, batch_size=batch_size_val)
end_time = time.time()

# 3.4 model test
y_pred_detection, y_pred_identification, y_pred_isolation = model.predict(X_test)

# Convert predictions for binary and multi-class
y_pred_detection = (y_pred_detection > 0.5).astype(int).reshape(-1)
y_pred_identification = np.argmax(y_pred_identification, axis=1)
y_pred_isolation = np.argmax(y_pred_isolation, axis=1)

# convert from one-hot encode to classes
y_test_identification = np.argmax(y_test_identification, axis=1)
y_test_isolation = np.argmax(y_test_isolation, axis=1)

# 3.5 print results for detection
accuracy_det = accuracy_score(y_test_detection, y_pred_detection)
precision_det = precision_score(y_test_detection, y_pred_detection)
recall_det = recall_score(y_test_detection, y_pred_detection)
f1_det = f1_score(y_test_detection, y_pred_detection)

# 3.6 print results for identification
accuracy_id = accuracy_score(y_test_identification, y_pred_identification)

# 3.7 print results for localization
accuracy_iso = accuracy_score(y_test_isolation, y_pred_isolation)

# 3.8 print training time
training_time = end_time - start_time

# print section
print(f'Detection - Accuracy: {accuracy_det:.3f}')
print(f'Detection - Precision: {precision_det:.3f}')
print(f'Detection - Recall: {recall_det:.3f}')
print(f'Detection - F1-score: {f1_det:.3f}')

print(f"Training Time: {training_time:.1f}")

print(f'Identification - Accuracy: {accuracy_id:.3f}')
print(f'Isolation - Accuracy: {accuracy_iso:.3f}')

print(classification_report(y_test_detection, y_pred_detection))
print(classification_report(y_test_identification, y_pred_identification))
print(classification_report(y_test_isolation, y_pred_isolation))

v1.2.3 (plost classification outputs as heatmap)

In [None]:
import pandas as pd
import numpy as np
from numpy import array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.utils import to_categorical
import time
import seaborn as sns
import matplotlib.pyplot as plt


# parameters
num_features = 10
scenario_length = 1000
num_attack_types = 3 + 1
num_attack_targets = 10 + 1

num_scenarios = 50

# hyper-parameter
seq_len = 40
seq_overlap = seq_len - 1
lstm_blocks = 128
epoch_val = 100
batch_size_val = 128

# 1. data preprocessing
data = pd.read_csv('dataset.csv')

# 1.1 normalization
labels = data[['label', 'type', 'target']]
data = data.drop(columns=['time', 'label', 'type', 'target'])
scaler = StandardScaler()
data_normalized = scaler.fit_transform(data)
data_normalized = pd.DataFrame(data_normalized, columns=data.columns)
data_normalized = pd.concat([data_normalized, labels], axis=1)


# 1.2 sequence generation
sequences = []
step_len = seq_len - seq_overlap

for s in range(0, num_scenarios):
    scenario_start = s * scenario_length
    for i in range(scenario_start, scenario_start + scenario_length - seq_len, step_len):
        sequence = data_normalized[i:i + seq_len]
        sequences.append(sequence)
data_sequences = array(sequences)

# 2. data preparation
# 2.1 train-test split
data_reshaped = data_sequences.reshape(data_sequences.shape[0], -1)
X_train, X_test = train_test_split(data_reshaped, test_size=0.25, random_state=42)
X_train = X_train.reshape(X_train.shape[0], data_sequences.shape[1], data_sequences.shape[2])
X_test = X_test.reshape(X_test.shape[0], data_sequences.shape[1], data_sequences.shape[2])

# 2.2 reshape data for LSTM network & label sequences
y_train_detection = X_train[:, -1, -3]
y_train_identification = X_train[:, -1, -2]
y_train_isolation = X_train[:, -1, -1]
X_train = X_train[:, :, :-3]

y_test_detection = X_test[:, -1, -3]
y_test_identification = X_test[:, -1, -2]
y_test_isolation = X_test[:, -1, -1]
X_test = X_test[:, :, :-3]

# convert to one-hot encoding
y_train_identification = to_categorical(y_train_identification, num_classes=num_attack_types)
y_test_identification = to_categorical(y_test_identification, num_classes=num_attack_types)
y_train_isolation = to_categorical(y_train_isolation, num_classes=num_attack_targets)
y_test_isolation = to_categorical(y_test_isolation, num_classes=num_attack_targets)

# 3. model creation
input_layer = Input(shape=(seq_len, num_features))
shared_lstm = LSTM(lstm_blocks)(input_layer)
output_detection = Dense(1, activation='sigmoid', name='detection_output')(shared_lstm)
output_identification = Dense(num_attack_types, activation='softmax', name='identification_output')(shared_lstm)
output_isolation = Dense(num_attack_targets, activation='softmax', name='isolation_output')(shared_lstm)

model = Model(inputs=input_layer, outputs=[output_detection, output_identification, output_isolation])

# 3.2 model compile
model.compile(
    loss={
        'detection_output': 'binary_crossentropy',
        'identification_output': 'kl_divergence',
        'isolation_output': 'kl_divergence'
    },
    optimizer='adam',
    metrics={
        'detection_output': ['accuracy'],
        'identification_output': ['accuracy'],
        'isolation_output': ['accuracy']
    }
)
print(model.summary())

# 3.3 model train
start_time = time.time()
model.fit(X_train, [y_train_detection, y_train_identification, y_train_isolation],
          epochs=epoch_val, batch_size=batch_size_val)
end_time = time.time()

# 3.4 model test
y_pred_detection, y_pred_identification, y_pred_isolation = model.predict(X_test)

# Convert predictions for binary and multi-class
y_pred_detection = (y_pred_detection > 0.5).astype(int).reshape(-1)
y_pred_identification = np.argmax(y_pred_identification, axis=1)
y_pred_isolation = np.argmax(y_pred_isolation, axis=1)

# convert from one-hot encode to classes
y_test_identification = np.argmax(y_test_identification, axis=1)
y_test_isolation = np.argmax(y_test_isolation, axis=1)

# Function to convert classification report to DataFrame
def report_to_df(report):
    df = pd.DataFrame(report).transpose()
    # Remove 'support' if not needed
    df = df.drop(columns=['support'], errors='ignore')
    return df

# Plotting function
def plot_and_save_heatmap(df, title, filename):
    plt.figure(figsize=(10, 6))
    sns.heatmap(df.iloc[:, :], annot=True, cmap='YlGnBu', fmt='.2f')  # Remove 'accuracy' row
    plt.title(title)
    # plt.savefig(filename, format='png')
    plt.show()

# Sample classification reports
# Assuming y_test and y_pred are already defined
report_detection = classification_report(y_test_detection, y_pred_detection, output_dict=True)
report_identification = classification_report(y_test_identification, y_pred_identification, output_dict=True)
report_isolation = classification_report(y_test_isolation, y_pred_isolation, output_dict=True)

# Convert reports to DataFrames
df_detection = report_to_df(report_detection)
df_identification = report_to_df(report_identification)
df_isolation = report_to_df(report_isolation)

# Create heatmaps for each report
plot_and_save_heatmap(df_detection, "Detection Report (Binary Classification)", "detection_report.png")
plot_and_save_heatmap(df_identification, "Identification Report (Multi-Class 3 Classes)", "identification_report.png")
plot_and_save_heatmap(df_isolation, "Isolation Report (Multi-Class 10 Classes)", "isolation_report.png")

multi output LSTM (single) v1.2.2 (add threshold based labeling)(just as epxeriment)

In [None]:
import pandas as pd
import numpy as np
from numpy import array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from tensorflow.keras.utils import to_categorical
import time


# parameters
num_features = 10
scenario_length = 1000
num_attack_types = 3 + 1
num_attack_targets = 10 + 1

num_scenarios = 50

# hyper-parameter
seq_len = 40
seq_overlap = seq_len - 1
lstm_blocks = 128
epoch_val = 100
batch_size_val = 128

threshold = 0.1


# 1. data preprocessing
data = pd.read_csv('dataset.csv')

# 1.1 normalization
labels = data[['label', 'type', 'target']]
data = data.drop(columns=['time', 'label', 'type', 'target'])
scaler = StandardScaler()
data_normalized = scaler.fit_transform(data)
data_normalized = pd.DataFrame(data_normalized, columns=data.columns)
data_normalized = pd.concat([data_normalized, labels], axis=1)


# 1.2 sequence generation
sequences = []
step_len = seq_len - seq_overlap

for s in range(0, num_scenarios):
    scenario_start = s * scenario_length
    for i in range(scenario_start, scenario_start + scenario_length - seq_len, step_len):
        sequence = data_normalized[i:i + seq_len]
        sequences.append(sequence)
data_sequences = array(sequences)

# 2. data preparation
# 2.1 train-test split
data_reshaped = data_sequences.reshape(data_sequences.shape[0], -1)
X_train, X_test = train_test_split(data_reshaped, test_size=0.25, random_state=42)
X_train = X_train.reshape(X_train.shape[0], data_sequences.shape[1], data_sequences.shape[2])
X_test = X_test.reshape(X_test.shape[0], data_sequences.shape[1], data_sequences.shape[2])

# 2.2 reshape data for LSTM network & label sequences

num_sequences = X_train.shape[0]
y_train_detection = np.zeros(num_sequences)
y_train_identification = np.zeros(num_sequences)
y_train_isolation = np.zeros(num_sequences)

for i in range(num_sequences):
    attack_count = np.sum(X_train[i, :, -3])
    attack_ratio = attack_count / X_train[i].shape[0]
    if attack_ratio >= threshold:
        first_attack_index = np.where(X_train[i, :, -3] == 1)[0][0]
        y_train_detection[i] = 1
        y_train_identification[i] = X_train[i, first_attack_index, -2]
        y_train_isolation[i] = X_train[i, first_attack_index, -1]

X_train = X_train[:, :, :-3]


num_sequences = X_test.shape[0]
y_test_detection = np.zeros(num_sequences)
y_test_identification = np.zeros(num_sequences)
y_test_isolation = np.zeros(num_sequences)

for i in range(num_sequences):
    attack_count = np.sum(X_test[i, :, -3])
    attack_ratio = attack_count / X_test[i].shape[0]
    if attack_ratio >= threshold:
        first_attack_index = np.where(X_test[i, :, -3] == 1)[0][0]
        y_test_detection[i] = 1
        y_test_identification[i] = X_test[i, first_attack_index, -2]
        y_test_isolation[i] = X_test[i, first_attack_index, -1]

X_test = X_test[:, :, :-3]

# convert to one-hot encoding
y_train_identification = to_categorical(y_train_identification, num_classes=num_attack_types)
y_test_identification = to_categorical(y_test_identification, num_classes=num_attack_types)
y_train_isolation = to_categorical(y_train_isolation, num_classes=num_attack_targets)
y_test_isolation = to_categorical(y_test_isolation, num_classes=num_attack_targets)

# 3. model creation
input_layer = Input(shape=(seq_len, num_features))
shared_lstm = LSTM(lstm_blocks)(input_layer)
output_detection = Dense(1, activation='sigmoid', name='detection_output')(shared_lstm)
output_identification = Dense(num_attack_types, activation='softmax', name='identification_output')(shared_lstm)
output_isolation = Dense(num_attack_targets, activation='softmax', name='isolation_output')(shared_lstm)

model = Model(inputs=input_layer, outputs=[output_detection, output_identification, output_isolation])

# 3.2 model compile
model.compile(
    loss={
        'detection_output': 'binary_crossentropy',
        'identification_output': 'kl_divergence',
        'isolation_output': 'kl_divergence'
    },
    optimizer='adam',
    metrics={
        'detection_output': ['accuracy'],
        'identification_output': ['accuracy'],
        'isolation_output': ['accuracy']
    }
)
print(model.summary())

# 3.3 model train
start_time = time.time()
model.fit(X_train, [y_train_detection, y_train_identification, y_train_isolation],
          epochs=epoch_val, batch_size=batch_size_val)
end_time = time.time()

# 3.4 model test
y_pred_detection, y_pred_identification, y_pred_isolation = model.predict(X_test)

# Convert predictions for binary and multi-class
y_pred_detection = (y_pred_detection > 0.5).astype(int).reshape(-1)
y_pred_identification = np.argmax(y_pred_identification, axis=1)
y_pred_isolation = np.argmax(y_pred_isolation, axis=1)

# convert from one-hot encode to classes
y_test_identification = np.argmax(y_test_identification, axis=1)
y_test_isolation = np.argmax(y_test_isolation, axis=1)

# 3.5 print results for detection
accuracy_det = accuracy_score(y_test_detection, y_pred_detection)
precision_det = precision_score(y_test_detection, y_pred_detection)
recall_det = recall_score(y_test_detection, y_pred_detection)
f1_det = f1_score(y_test_detection, y_pred_detection)

# 3.6 print results for identification
accuracy_id = accuracy_score(y_test_identification, y_pred_identification)

# 3.7 print results for localization
accuracy_iso = accuracy_score(y_test_isolation, y_pred_isolation)

# 3.8 print training time
training_time = end_time - start_time


# print section
print(f'Detection - Accuracy: {accuracy_det:.3f}')
print(f'Detection - Precision: {precision_det:.3f}')
print(f'Detection - Recall: {recall_det:.3f}')
print(f'Detection - F1-score: {f1_det:.3f}')

print(f"Training Time: {training_time:.1f}")

print(f'Identification - Accuracy: {accuracy_id:.3f}')
print(f'Isolation - Accuracy: {accuracy_iso:.3f}')

print(classification_report(y_test_detection, y_pred_detection))
print(classification_report(y_test_identification, y_pred_identification))
print(classification_report(y_test_isolation, y_pred_isolation))


multi output LSTM (single)

In [None]:
import pandas as pd
import numpy as np
from numpy import array
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dense, Input
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler

# parameters
num_features = 10
num_scenarios = 12
scenario_length = 1000
attack_type = 0        # 0: all, 1: DoS, 2: FDI, 3: Replay
num_attack_types = 3 + 1
num_attack_targets = 6 + 1

# hyper-parameter
seq_len = 20
seq_overlap = seq_len - 1
lstm_blocks = 32
epoch_val = 20
batch_size_val = 4

# 1. data preprocessing
data = pd.read_csv('dataset.csv')

# 1.1 normalization
labels = data[['label', 'type', 'target']]
data = data.drop(columns=['time', 'label', 'type', 'target'])
scaler = StandardScaler()
data_normalized = scaler.fit_transform(data)
data_normalized = pd.DataFrame(data_normalized, columns=data.columns)
data_normalized = pd.concat([data_normalized, labels], axis=1)

# 1.2 sequence generation
sequences = []
step_len = seq_len - seq_overlap

if attack_type == 0:
    scenario_range = range(0, 12)
elif attack_type == 1:
    scenario_range = range(0, 4)
elif attack_type == 2:
    scenario_range = range(4, 8)
elif attack_type == 3:
    scenario_range = range(8, 12)

for s in scenario_range:
    scenario_start = s * scenario_length
    for i in range(scenario_start, scenario_start + scenario_length - seq_len, step_len):
        sequence = data_normalized[i:i + seq_len]
        sequences.append(sequence)
data_sequences = array(sequences)

# 2. data preparation
# 2.1 train-test split
data_reshaped = data_sequences.reshape(data_sequences.shape[0], -1)
X_train, X_test = train_test_split(data_reshaped, test_size=0.25, random_state=42)
X_train = X_train.reshape(X_train.shape[0], data_sequences.shape[1], data_sequences.shape[2])
X_test = X_test.reshape(X_test.shape[0], data_sequences.shape[1], data_sequences.shape[2])

# 2.2 reshape data for LSTM network
y_train_detection = X_train[:, -1, -3]
y_train_identification = X_train[:, -1, -2]
y_train_isolation = X_train[:, -1, -1]
X_train = X_train[:, :, :-3]

y_test_detection = X_test[:, -1, -3]
y_test_identification = X_test[:, -1, -2]
y_test_isolation = X_test[:, -1, -1]
X_test = X_test[:, :, :-3]

# 3. model creation
input_layer = Input(shape=(seq_len, num_features))
shared_lstm = LSTM(lstm_blocks)(input_layer)
output_detection = Dense(1, activation='sigmoid', name='detection_output')(shared_lstm)
output_identification = Dense(num_attack_types, activation='softmax', name='identification_output')(shared_lstm)
output_isolation = Dense(num_attack_targets, activation='softmax', name='isolation_output')(shared_lstm)

model = Model(inputs=input_layer, outputs=[output_detection, output_identification, output_isolation])

# 3.2 model compile
model.compile(
    loss={
        'detection_output': 'binary_crossentropy',
        'identification_output': 'sparse_categorical_crossentropy',
        'isolation_output': 'sparse_categorical_crossentropy'
    },
    optimizer='adam',
    metrics={
        'detection_output': ['accuracy'],
        'identification_output': ['accuracy'],
        'isolation_output': ['accuracy']
    }
)
print(model.summary())

# 3.3 model train
model.fit(X_train, [y_train_detection, y_train_identification, y_train_isolation],
          epochs=epoch_val, batch_size=batch_size_val)

# 3.4 model test
y_pred_detection, y_pred_identification, y_pred_isolation = model.predict(X_test)

# Convert predictions for binary and multi-class
y_pred_detection = (y_pred_detection > 0.5).astype(int).reshape(-1)
y_pred_identification = np.argmax(y_pred_identification, axis=1)
y_pred_isolation = np.argmax(y_pred_isolation, axis=1)

# 3.5 print results for detection
accuracy_det = accuracy_score(y_test_detection, y_pred_detection)
precision_det = precision_score(y_test_detection, y_pred_detection)
recall_det = recall_score(y_test_detection, y_pred_detection)
f1_det = f1_score(y_test_detection, y_pred_detection)

print(f'Detection - Accuracy: {accuracy_det:.3f}')
print(f'Detection - Precision: {precision_det:.3f}')
print(f'Detection - Recall: {recall_det:.3f}')
print(f'Detection - F1-score: {f1_det:.3f}')

# 3.6 print results for identification
accuracy_id = accuracy_score(y_test_identification, y_pred_identification)
print(f'Identification - Accuracy: {accuracy_id:.3f}')

# 3.7 print results for localization
accuracy_iso = accuracy_score(y_test_isolation, y_pred_isolation)
print(f'Localization - Accuracy: {accuracy_iso:.3f}')
