In [None]:
import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from torch.utils.data import DataLoader, TensorDataset
import scipy.io
import matplotlib.pyplot as plt
from torchinfo import summary

In [None]:
data = scipy.io.loadmat('sr_125.mat')# Your data
data2 = scipy.io.loadmat('sr_125_QI.mat')# Your data

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Flatten and reshape the data to (100, 300001)
f = np.array([np.squeeze(item[0]) for item in data['f_75']])
V = np.array([np.squeeze(item[0]) for item in data['V_75']])
#P = np.array([np.squeeze(item[0]) for item in data['P_75']])
Q = np.array([np.squeeze(item[0]) for item in data2['Q_75']])
#I = np.array([np.squeeze(item[0]) for item in data2['I_75']])
batch = 32

print("Reshaped data shape:", f.shape)
combined_features = np.stack([f, V, Q], axis=2).astype(np.float32)
#combined_features = np.stack([f, V, P], axis=2).astype(np.float32)


In [None]:
labels = []
for i in range(250):
    if i<50:
        labels.append('normal')
    elif 50<=i<100:
        labels.append('fdia')
    elif 100<=i<150:
        labels.append('dos')
    elif 150<=i<200:
        labels.append('hijacking')
    else:
        labels.append('fault')
labels = np.array(labels)


In [None]:
from sklearn.model_selection import train_test_split

def split_data(X, y, train_size=0.8, val_size=0.1, test_size=0.1, random_state=25):
    """
    Splits data into training, validation, and testing sets.

    Parameters:
        X (numpy.ndarray): Features data (shape: [num_samples, ...]).
        y (numpy.ndarray): Labels (shape: [num_samples]).
        train_size (float): Proportion of data for training.
        val_size (float): Proportion of data for validation.
        test_size (float): Proportion of data for testing.
        random_state (int): Random seed for reproducibility.

    Returns:
        X_train, X_val, X_test, y_train, y_val, y_test
    """
    # Ensure the sizes add up to 1
    if train_size + val_size + test_size != 1.0:
        raise ValueError("train_size + val_size + test_size must equal 1.0")

    # Split into training and temp (validation + testing)
    X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=(1 - train_size),stratify=y, random_state=random_state)

    # Adjust validation size relative to remaining data
    val_size_adjusted = val_size / (val_size + test_size)
    
    # Split temp into validation and testing
    X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=(1 - val_size_adjusted),stratify=y_temp, random_state=random_state)
    
    return X_train, X_val, X_test, y_train, y_val, y_test



In [None]:
# Encode labels into one-hot format
new_signal_data = combined_features
label_encoder = LabelEncoder()
integer_encoded = label_encoder.fit_transform(labels)
onehot_encoder = OneHotEncoder(sparse_output=False)
labels_onehot = onehot_encoder.fit_transform(integer_encoded.reshape(-1, 1))
# Split data into training and testing sets
X_train, X_val, X_test, y_train, y_val, y_test = split_data(new_signal_data, labels_onehot, train_size=0.8, val_size=0.1, test_size=0.1)
# Verify the splits
print("Training data shape:", X_train.shape, y_train.shape)
print("Validation data shape:", X_val.shape, y_val.shape)
print("Testing data shape:", X_test.shape, y_test.shape)


In [None]:
# Ensure X_train and X_test are combined with features correctly
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)  # Already in (200, 2000, 3)
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)

# Labels remain unchanged
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)

# Verify the shape
print("X_train_tensor shape:", X_train_tensor.shape)  
print("X_test_tensor shape:", X_test_tensor.shape)   

# Create DataLoader for training and testing
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor,y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)


In [None]:
class CustomLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, hidden_size3,num_classes, dropout_rate=0.4):
        super(CustomLSTMModel, self).__init__()
        self.lstm1 = nn.LSTM(input_size, hidden_size1, batch_first=True)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.lstm2 = nn.LSTM(hidden_size1, hidden_size2, batch_first=True)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.lstm3 = nn.LSTM(hidden_size2, hidden_size3, batch_first=True)
        self.dropout3 = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_size3, num_classes)
        self.softmax = nn.Softmax(dim=1)  # For inference

    def forward(self, x, inference=False):
        # First LSTM layer
        h0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
        c0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
        out, _ = self.lstm1(x, (h0_1, c0_1))
        out = self.dropout1(out)

        # Second LSTM layer
        h0_2 = torch.zeros(1, x.size(0), hidden_size2).to(x.device)
        c0_2 = torch.zeros(1, x.size(0), hidden_size2).to(x.device)
        out, _ = self.lstm2(out, (h0_2, c0_2))
        out = self.dropout2(out)

        # Thrid LSTM layer
        h0_3 = torch.zeros(1, x.size(0), hidden_size3).to(x.device)
        c0_3 = torch.zeros(1, x.size(0), hidden_size3).to(x.device)
        out, _ = self.lstm3(out, (h0_3, c0_3))
        out = self.dropout3(out)
        # Fully connected layer
        out = self.fc(out[:, -1, :])  # Take the output from the last time step

        # Softmax for probabilities during inference
        if inference:
            out = self.softmax(out)
        return out

# class OneLSTMModel(nn.Module):
#     def __init__(self, input_size, hidden_size1,num_classes, dropout_rate=0.4):
#         super(OneLSTMModel, self).__init__()
#         self.lstm1 = nn.LSTM(input_size, hidden_size1, batch_first=True)
#         self.dropout1 = nn.Dropout(dropout_rate)
#         self.fc = nn.Linear(hidden_size1, num_classes)
#         self.softmax = nn.Softmax(dim=1)  # For inference
#     def forward(self, x, inference=False):
#         # First LSTM layer
#         h0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
#         c0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
#         out, _ = self.lstm1(x, (h0_1, c0_1))
#         out = self.dropout1(out)
#         out = self.fc(out[:, -1, :])  # Take the output from the last time step

#         # Softmax for probabilities during inference
#         if inference:
#             out = self.softmax(out)
#         return out

# class TwoLSTMModel(nn.Module):
#     def __init__(self, input_size, hidden_size1,hidden_size2,num_classes, dropout_rate=0.4):
#         super(TwoLSTMModel, self).__init__()
#         self.lstm1 = nn.LSTM(input_size, hidden_size1, batch_first=True)
#         self.dropout1 = nn.Dropout(dropout_rate)
#         self.lstm2 = nn.LSTM(hidden_size1, hidden_size2, batch_first=True)
#         self.dropout2 = nn.Dropout(dropout_rate)
#         self.fc = nn.Linear(hidden_size2, num_classes)
#         self.softmax = nn.Softmax(dim=1)
#     def forward(self, x, inference=False):
#         # First LSTM layer
#         h0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
#         c0_1 = torch.zeros(1, x.size(0), hidden_size1).to(x.device)
#         out, _ = self.lstm1(x, (h0_1, c0_1))
#         out = self.dropout1(out)
#         # Second LSTM layer
#         h0_2 = torch.zeros(1, x.size(0), hidden_size2).to(x.device)
#         c0_2 = torch.zeros(1, x.size(0), hidden_size2).to(x.device)
#         out, _ = self.lstm2(out, (h0_2, c0_2))
#         out = self.dropout2(out)
#         out = self.fc(out[:, -1, :])  # Take the output from the last time step

#         # Softmax for probabilities during inference
#         if inference:
#             out = self.softmax(out)
#         return out

In [None]:
import torch.optim as optim
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score
# CustomLstm
# Initialize the model
input_size = 3  # One feature per time step
hidden_size1 = 256
hidden_size2 = 128
hidden_size3 = 64
num_classes = 5
dropout_rate = 0.4
num_runs = 5
num_epochs = 1000
all_f1_scores = []
for run in range(num_runs):
    seed = 42 + run  # or use random.randint(0, 10000)
    print(f"\n=== Training Run {run+1}/{num_runs} with seed {seed} ===")
    torch.manual_seed(seed)

    train_loader = DataLoader(train_dataset, batch_size=batch, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch, shuffle=False)

    model = CustomLSTMModel(input_size, hidden_size1, hidden_size2,hidden_size3, num_classes, dropout_rate).to(device)
    print(f"This model use ({hidden_size1},{hidden_size2},{hidden_size3}) with batchsize = {batch}")

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.00001)
    #scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)#For decrease lr overtime
    # Training loop
    
    
    history = {'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': []}
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        with torch.set_grad_enabled(True):
            for X_batch, y_batch in train_loader:
                optimizer.zero_grad()
                outputs = model(X_batch)
                loss = criterion(outputs, torch.argmax(y_batch, dim=1))
                loss.backward()
                optimizer.step()
                # Track metrics
                running_loss += loss.item() * X_batch.size(0)
                _, predicted = torch.max(outputs, 1)
                _, labels = torch.max(y_batch, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)
        train_loss = running_loss / total
        train_acc = correct / total
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                
                # Forward pass
                outputs = model(X_batch)
                loss = criterion(outputs, torch.argmax(y_batch, dim=1))
                
                # Track metrics
                val_loss += loss.item() * X_batch.size(0)
                _, predicted = torch.max(outputs, 1)
                _, labels = torch.max(y_batch, 1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)
        val_loss /= val_total
        val_acc = val_correct / val_total

        # Store metrics
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['train_acc'].append(train_acc)
        history['val_acc'].append(val_acc)
        print(f"Epoch [{epoch+1}/{num_epochs}] - "
                f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
    
    # Evaluate the model
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            outputs = model(X_batch)
            _, predicted = torch.max(outputs, 1)  # Predicted class
            _, labels = torch.max(y_batch, 1)    # True labels
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
     # Compute per-class F1-score
    f1_per_class = f1_score(all_labels, all_preds, average=None)  # array: (num_classes,)
    all_f1_scores.append(f1_per_class)
########################################################
epochs = range(1, len(history['train_loss']) + 1)
# Plot loss
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, history['train_loss'], label='Training Loss')
plt.plot(epochs, history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.grid(True)
plt.title('Training and Validation Loss')
plt.legend()

# Plot accuracy
plt.subplot(1, 2, 2)
plt.plot(epochs, history['train_acc'], label='Training Accuracy')
plt.plot(epochs, history['val_acc'], label='Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracy')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# plt.plot(range(epoch+1),his_loss)
# plt.title("History loss")
# plt.grid(True)
# plt.show()


In [None]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import seaborn as sns
import matplotlib.pyplot as plt

# Evaluate the model
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)  # Predicted class
        _, labels = torch.max(y_batch, 1)    # True labels
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Compute confusion matrix
conf_matrix = confusion_matrix(all_labels, all_preds)

# Print classification report
print(classification_report(all_labels, all_preds, target_names=label_encoder.classes_))

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.title('Confusion Matrix')
plt.show()

# Calculate accuracy
overall_accuracy = accuracy_score(all_labels, all_preds)
print(f"Overall Accuracy: {overall_accuracy:.2f}")


In [None]:
import numpy as np
import scipy.io as sio
# Convert to NumPy array for mean/std calculation
all_f1_scores = np.array(all_f1_scores)  # shape: (num_runs, num_classes)

# Compute mean and std for each class
f1_mean = np.mean(all_f1_scores, axis=0)
f1_std = np.std(all_f1_scores, axis=0)

# Save for MATLAB plotting
import scipy.io as sio
sio.savemat('f1_scores_errorbar_case1.mat', {
    'f1_mean': f1_mean,
    'f1_std': f1_std,
    'class_indices': np.arange(len(f1_mean)) + 1
})

print("✅ Saved f1_scores_errorbar.mat plotting")

In [None]:
# Save the model's state_dict
name = "128_64_32_75s_b16.pth"
torch.save(model.state_dict(), name)
print(f"Model saved to {name}")


In [None]:
import csv
with open("128_64_32_75s_b16_fVQ.csv","w") as outfile:
    # pass the csv file to csv.writer.
    writer = csv.writer(outfile)
     
    # convert the dictionary keys to a list
    key_list = list(history.keys())
    
    
    # find the length of the key_list
    limit = len(history['train_acc'])
    writer.writerow(history.keys())
     
    # iterate each column and assign the
    # corresponding values to the column
    for i in range(limit):
        writer.writerow([history[x][i] for x in key_list])