In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# Charger la dataset
file_path = "SWaT_Dataset_Attack_v0.csv"
df = pd.read_csv(file_path)

In [None]:
# Afficher un aperçu
print(df.head())

# Nombre de colonnes
print(f"Nombre de colonnes : {df.shape[1]}")

# Vérifier la distribution des classes
print(df.iloc[:, -1].value_counts())  # Dernière colonne = label (Attack/Normal)

# Graphique de la distribution des classes (Correction warning)
plt.figure(figsize=(6, 4))
sns.countplot(x=df.iloc[:, -1], hue=df.iloc[:, -1], palette="coolwarm", legend=False)
plt.xlabel("Classe")
plt.ylabel("Nombre d'échantillons")
plt.title("Distribution des classes (Attack vs Normal)")
plt.show()

In [None]:
# Séparation des features (X) et des labels (y)
X = df.iloc[:, :-1].values  # Toutes les colonnes sauf la dernière (features)
y = df.iloc[:, -1].values   # La dernière colonne (label)

# Encoder les labels: "Normal" -> 0, "Attack" -> 1
y = np.where(y == "Normal", 0, 1)

In [None]:
# Diviser en ensembles d'entraînement, de validation et de test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.1, random_state=42)

# Normalisation des données
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

# Convertir en tensors PyTorch
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
y_val_tensor = torch.tensor(y_val, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# Créer DataLoader
train_data = TensorDataset(X_train_tensor, y_train_tensor)
val_data = TensorDataset(X_val_tensor, y_val_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

In [None]:
# Définir l'architecture du modèle
class MLP(nn.Module):
    def __init__(self, input_size, hidden1, hidden2, hidden3):
        super(MLP, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, hidden1),
            nn.BatchNorm1d(hidden1),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(hidden1, hidden2),
            nn.BatchNorm1d(hidden2),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(hidden2, hidden3),
            nn.BatchNorm1d(hidden3),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(hidden3, 2)  # 2 classes: Normal (0) et Attack (1)
        )

    def forward(self, x):
        return self.model(x)

In [None]:
# Hyperparamètres
input_size = X_train.shape[1]  # Nombre de features
hidden1 = 256
hidden2 = 128
hidden3 = 64
learning_rate = 0.001
num_epochs = 10
patience = 5  # Pour l'early stopping

In [None]:
# Initialiser le modèle, la perte et l'optimiseur
model = MLP(input_size, hidden1, hidden2, hidden3)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Early Stopping
best_val_loss = float('inf')
epochs_no_improve = 0

# Historique des pertes et des précisions
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

In [None]:
# Entraînement du modèle
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_predictions += labels.size(0)
        correct_predictions += (predicted == labels).sum().item()

    train_loss = running_loss / len(train_loader)
    train_accuracy = correct_predictions / total_predictions
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Validation
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = val_correct / val_total
    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    # Early Stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model, "best_model_full.pt")  # Sauvegarde du modèle complet
    else:
        epochs_no_improve += 1
        if epochs_no_improve == patience:
            print("Early stopping!")
            break

In [None]:
# Affichage des courbes de perte et de précision
epochs = np.arange(1, len(train_losses) + 1)
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label="Train Loss", color="red")
plt.plot(epochs, val_losses, label="Val Loss", color="green")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Loss vs Epochs")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label="Train Accuracy", color="blue")
plt.plot(epochs, val_accuracies, label="Val Accuracy", color="orange")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.title("Accuracy vs Epochs")
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
# Évaluation sur l'ensemble de test
model.eval()
test_correct = 0
test_total = 0

with torch.no_grad():
    for inputs, labels in test_loader:
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.4f}")

In [None]:
# Sauvegarde uniquement des poids du modèle
torch.save(model.state_dict(), "swat_mlp.pt")
print("Poids du modèle sauvegardés sous le nom 'swat_mlp.pt'")