In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import TensorDataset, DataLoader
import torch

def load_data(batch_size=32):
    # data = pd.read_excel('/content/drive/MyDrive/SWaT/SWaT.A1 & A2_Dec 2015/Physical/SWaT_Dataset_Attack_v0.xlsx')
    data = pd.read_excel('SWaT_Dataset_Attack_v0.xlsx')
    data = data.drop(columns=['Timestamp'])

    sequence_length = 10
    temp_data = []

    for column in data.columns[:-1]:
        for i in range(sequence_length):
            shifted = data[column].shift(i)
            temp_data.append(shifted.rename(column+'_'+str(i)))

    new_data = pd.concat(temp_data, axis=1).dropna()
    X = new_data.values
    X = X.reshape((X.shape[0], sequence_length, -1))

    Y = data["Normal/Attack"]
    encoder = LabelEncoder()
    Y = encoder.fit_transform(Y)
    Y = Y[new_data.index]

    X = X.reshape(-1, X.shape[-1])
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    X = X.reshape(-1, sequence_length, X.shape[-1])

    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

    train_dataset = TensorDataset(torch.tensor(X_train).float(), torch.tensor(y_train).float().unsqueeze(1))
    test_dataset = TensorDataset(torch.tensor(X_test).float(), torch.tensor(y_test).float().unsqueeze(1))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    return X_train, X_test, y_train, y_test, train_loader, test_loader




X_train, X_test, y_train, y_test, train_loader, test_loader = load_data()


FileNotFoundError: [Errno 2] No such file or directory: 'SWaT_Dataset_Attack_v0.xlsx'

In [None]:
import torch
import torch.nn as nn
from torch import optim
from torch.nn import BCELoss
from sklearn.metrics import accuracy_score
from tqdm import tqdm
import os
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import itertools


def plot_confusion_matrix(conf_mat, classes, title='Confusion Matrix', cmap=plt.cm.Blues):
    plt.imshow(conf_mat, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = 'd'
    thresh = conf_mat.max() / 2.
    for i, j in itertools.product(range(conf_mat.shape[0]), range(conf_mat.shape[1])):
        plt.text(j, i, format(conf_mat[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if conf_mat[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

class LSTMModel(nn.Module):
    def __init__(self, input_dim):
        super(LSTMModel, self).__init__()
        self.lstm1 = nn.LSTM(input_dim, 100, batch_first=True)
        self.dropout1 = nn.Dropout(0.2)
        self.lstm2 = nn.LSTM(100, 100, batch_first=True)
        self.dropout2 = nn.Dropout(0.2)
        self.lstm3 = nn.LSTM(100, 50, batch_first=True)
        self.dropout3 = nn.Dropout(0.2)
        self.fc = nn.Linear(50, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x, _ = self.lstm1(x)
        x = self.dropout1(x)
        x, _ = self.lstm2(x)
        x = self.dropout2(x)
        x, _ = self.lstm3(x)
        x = self.dropout3(x)
        x = self.fc(x[:, -1, :])
        return self.sigmoid(x)


def train_model():
    # X_train, X_test, y_train, y_test, train_loader, test_loader = load_data()

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    model = LSTMModel(train_loader.dataset.tensors[0].shape[2]).to(device)

    criterion = BCELoss()
    optimizer = optim.Adam(model.parameters())
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    for epoch in tqdm(range(20), desc="Training Progress"):
        model.train()
        running_loss = 0.0
        for i, data in enumerate(train_loader):
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        scheduler.step()

        if epoch % 1 == 0:
            model.eval()
            with torch.no_grad():
                val_loss = 0.0
                correct = 0
                total = 0

                all_labels = []
                all_predictions = []

                for i, data in enumerate(test_loader):
                    inputs, labels = data[0].to(device), data[1].to(device)
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    predicted = torch.round(outputs)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()

                    # collect all labels and predictions for confusion matrix
                    all_labels.extend(labels.cpu().numpy())
                    all_predictions.extend(predicted.cpu().numpy())


                accuracy = correct / total
                conf_mat = confusion_matrix(all_labels, all_predictions)

            print(f'Epoch {epoch}, loss: {running_loss/len(train_loader)}, val_loss: {val_loss/len(test_loader)}, accuracy: {accuracy}')
            print(f'Confusion Matrix: \n {conf_mat}')

            # Plot confusion matrix
            classes = ['Class 0', 'Class 1'] # replace with your class names if different
            plt.figure()
            plot_confusion_matrix(conf_mat, classes=classes)
            plt.savefig('confusion_matrix.png')

        # Save the model
        if not os.path.exists('model'):
            os.makedirs('model')
        torch.save(model.state_dict(), 'model/model.ckpt')
        print('Model saved to model/model.ckpt')


train_model()