In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import seaborn as sns
import time

import torch
from torch.utils.data import DataLoader
import torch.utils.data as data_utils
import matplotlib.pyplot as plt
import numpy as np
import os

import pandas as pd
from sklearn import preprocessing
from sklearn.metrics import f1_score
import torch
from torch.utils.data import Dataset

In [None]:
import os
import pandas as pd

# Set the path to the directory containing the datasets
dataset_path = '/kaggle/input/fault-detection'

# Get the list of files in the directory
files = os.listdir(dataset_path)

# Initialize an empty list to store DataFrames
dfs = []

# Loop through the files and read them into DataFrames
for file in files[:3]:  # Assuming you want to read the first 15 datasets
    file_path = os.path.join(dataset_path, file)
    df= pd.read_csv(file_path)
    # df['class'] = 'faultySignal'  # Change the file reading method based on your file format
    dfs.append(df)

# Concatenate all DataFrames into one
final_df = pd.concat(dfs, ignore_index=True)

final_df


In [None]:
final_df.to_csv('new_data.csv', index=False)

In [None]:
# Path of the dataset
datasetPath = '/content/drive/MyDrive/Dataset.csv'

# Path of the model (saved/to save)
modelPath = '/content/new-RNN (3).pth'

# When True, retrain the whole model
retrain = False

# Size of the split
trainSize = 0.85
valSize = 0.05
testSize = 0.1

# Specify number of seconds for the window. Default: 16
window_size = 16

# Model hyper-parameters
batch_size = 32
learning_rate = 1e-3
num_epochs = 80
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Seed for reproducibility
seed = 42
torch.manual_seed(seed)

# When True create graphs (takes more time)
makeGraphs = False

In [None]:
classes_to_drop=['faultLabel']
target_classes = final_df[['faultLabel']]

In [None]:
def min_max_norm(self,col):
    self._data[col]=(self._data[col]-self._data[col].min())/(self._data[col].max()-self._data[col].min())


def std_scaler(self,col):
    self._data[col]=(self._data[col]-self._data[col].mean())/(self._data[col].std())

def f1(test_loader, model):
    f1 = 0
    with torch.no_grad():
        for i, (data, labels) in enumerate(test_loader):
            outputs = model(data)
            pred = outputs.data.max(1, keepdim=True)[1]
            f1 += f1_score(labels, pred, average='macro')
    avg_f1 = f1/len(test_loader)
    return (avg_f1)


class CustomDataset(Dataset):
    def __init__(self, file_path='/kaggle/working/new_data.csv', classes_to_drop=classes_to_drop, window_size=window_size, normalize=True, normalize_method='mean_std'):


        self._window_size=window_size
        self._data=pd.read_csv(file_path)

        # The data is sorted by Class A,B,C the indexes of the dataframe have restarted by ignore index
        self._data = self._data.sort_values(by=['faultLabel'], inplace=False,ignore_index = True)

        # class_uniq contains the letters of the drivers A,B and it loops across all of them
        for class_uniq in list(self._data['faultLabel'].unique()):
            # Find the total number of elements belonging to a class
            tot_number=sum(self._data['faultLabel']==class_uniq)
            # Number of elements to drop so that the class element is divisible by window size
            to_drop=tot_number%window_size
            # Returns the index of the first element of the class
            index_to_start_removing=self._data[self._data['faultLabel']==class_uniq].index[0]
            # Drop element from first element to the element required
            self._data.drop(self._data.index[index_to_start_removing:index_to_start_removing+to_drop],inplace=True)


        # Resetting index of dataframe after dropping values
        self._data = self._data.reset_index()
        self._data = self._data.drop(['index'], axis=1)

        index_starting_class=[] # This array contains the starting index of each class in the df
        for class_uniq in list(self._data['faultLabel'].unique()):
            # Appending the index of first element of each clas
            index_starting_class.append(self._data[self._data['faultLabel']==class_uniq].index[0])

        # Create the sequence of indexs of the windows
        sequences=[]
        for i in range(len(index_starting_class)):
            # Check if beginning of next class is there
            if i!=len(index_starting_class)-1:
                ranges=np.arange(index_starting_class[i], index_starting_class[i+1])
            else:
                ranges = np.arange(index_starting_class[i], len(self._data))
            for j in range(0,len(ranges),int(self._window_size/2)):
                if len(ranges[j:j+self._window_size])==16:
                    sequences.append(ranges[j:j+self._window_size])
        self._sequences=sequences


        # Take only the 'Class' which are the actual labels and store it in the labels of self
        self._labels=self._data['faultLabel']
        # Dropping columns which have constant measurements because they would return nan in std
        self._data.drop(classes_to_drop, inplace=True, axis=1)

        # Function to normalize the data either with min_max or mean_std
        if normalize:
            for col in self._data.columns:
                if normalize_method=='min_max':
                    min_max_norm(self,col)
                elif normalize_method=="mean_std":
                    std_scaler(self,col)

        # Create the array holding the windowed multidimensional arrays
        X=np.empty((len(sequences), self._window_size, len(self._data.columns)))
        y=[]

        for n_row, sequence in enumerate(sequences):
            X[n_row,:,:]=self._data.iloc[sequence]
            # The corresponding driver of the sequence is the driver at first sequence
            y.append(self._labels[sequence[0]])


        assert len(y)==len(X)
        #Assing the windowed dataset to the X of self
        self._X= X

        # Targets is a transformed version of y with drivers are encoded into 0 to 9
        targets = preprocessing.LabelEncoder().fit_transform(y)
        targets = torch.as_tensor(targets)  # Just converting it to a pytorch tensor
        self._y=targets # Assign it to y of self


    def __len__(self):
        return len(self._X)


    def __getitem__(self, index):
        return torch.FloatTensor(self._X[index,:,:]), self._y[index]

In [None]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:
a = CustomDataset()

# Defining sizes
train_size = int(trainSize * len(a))
val_size = int(valSize * len(a))
test_size = len(a)-train_size-val_size

train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(
    a, [train_size, val_size, test_size])

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=4,
                                           shuffle=True,
                                           drop_last=True)

validation_loader = torch.utils.data.DataLoader(dataset=val_dataset,
                                                batch_size=4,
                                                shuffle=False,
                                                drop_last=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=4,
                                          shuffle=False,drop_last=True)

In [None]:
final_df.drop(classes_to_drop,inplace=True, axis=1)
final_df

In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

# Apply the scaler to your DataFrame
normalized_data = scaler.fit_transform(final_df)

# Create a new DataFrame with the normalized data
normalized_df = pd.DataFrame(normalized_data, columns=final_df.columns)

normalized_df

In [None]:
result_df = pd.concat([normalized_df, target_classes], axis=1)
result_df

In [None]:
from sklearn.preprocessing import LabelEncoder

encoder = LabelEncoder()
result_df['faultLabel'] = encoder.fit_transform(result_df['faultLabel'])

# Retrieve the mapping of numerical codes to original class labels
class_labels = encoder.classes_

# Display the mapping
for code, label in enumerate(class_labels):
    print(f'Code: {code} -> Label: {label}')
result_df

In [None]:

X = result_df.drop('faultLabel',axis=1)
y = result_df['faultLabel']

from sklearn.model_selection import train_test_split
X_train,X_test,y_train,y_test = train_test_split(X,y,test_size=0.1,random_state=42)

In [None]:
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split


# Create and train an XGBoost classification model
model = XGBClassifier(objective='multi:softmax', random_state=42)  # You may need to adjust the objective based on your problem
model.fit(X_train, y_train)

# Make predictions on the test data
y_pred = model.predict(X_test)

# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
conf_matrix = confusion_matrix(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print(f"Accuracy: {accuracy:.2f}")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Classification Report:\n{class_report}")


In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split

# Create and train a Random Forest classification model
model1 = RandomForestClassifier(n_estimators=100, random_state=42)  # You can adjust the number of trees (n_estimators) as needed
model1.fit(X_train, y_train)

# Make predictions on the test data
y_pred = model1.predict(X_test)

# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
conf_matrix = confusion_matrix(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print(f"Accuracy: {accuracy:.2f}")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Classification Report:\n{class_report}")


In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report


# Create and train a Decision Tree classification model
model3 = DecisionTreeClassifier(max_depth=None, random_state=42)  # You can adjust max_depth as needed
model3.fit(X_train, y_train)

# Make predictions on the test data
y_pred = model3.predict(X_test)

# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
conf_matrix = confusion_matrix(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print(f"Accuracy: {accuracy:.2f}")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Classification Report:\n{class_report}")


In [None]:
import tensorflow as tf
# Set seeds for reproducibility
seed_value = 42
np.random.seed(seed_value)
tf.random.set_seed(seed_value)
model = Sequential()
#model.add(Dense(int((len(data[1])*2)), activation='relu', input_shape=(48, ))
#model.add(Dense(y_train_encoded.shape[1]), activation='softmax', kernel_initializer=GlorotNormal())

model.add(Dense(64, activation='relu', input_shape=(X_train.shape[1],), kernel_initializer=GlorotNormal()))
#model.add(Dropout(0.2))
model.add(Dense(32, activation='relu', kernel_regularizer=L2())) #kernel_initializer=GlorotNormal()
model.add(Dropout(0.2))
#model.add(Dense(16, activation='relu', kernel_regularizer=L2())) #kernel_initializer=GlorotNormal()
#model.add(Dropout(0.2))
model.add(Dense(11, activation='softmax', kernel_initializer=GlorotNormal()))
model.summary()

model.compile(optimizer=Adam(learning_rate=1e-3),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.fit(X_train, y_train_encoded, epochs=500, batch_size=32, validation_split = 0.1)

In [None]:
res_type = model.evaluate(X_test, y_test_encoded)[1]
print(res_type)

In [None]:
def fgsm_attack(model, input_data, epsilon):
    with tf.GradientTape() as tape:
        tape.watch(input_data)
        predictions = model(input_data)
        loss = tf.keras.losses.categorical_crossentropy(y_test_encoded, predictions)

    # Get the gradients of the loss w.r.t. the input data
    gradient = tape.gradient(loss, input_data)
    # Get the sign of the gradients
    signed_grad = tf.sign(gradient)
    # Perturb the input data
    perturbed_data = input_data + epsilon * signed_grad
    # Clip the perturbed data to ensure it stays within the valid range
    perturbed_data = tf.clip_by_value(perturbed_data, 0, 1)
    return perturbed_data

# Define parameters for the FGSM attack
epsilon = 0.5

# Convert X_train DataFrame to a TensorFlow tensor
X_train_tensor = tf.constant(X_train.values, dtype=tf.float32)
X_test_tensor = tf.constant(X_test.values, dtype=tf.float32)
# Generate adversarial examples using FGSM
adversarial_X_test = fgsm_attack(model, X_test_tensor, epsilon)

# Evaluate the model on adversarial examples
model.evaluate(adversarial_X_test, y_test_encoded)

In [None]:
import matplotlib.pyplot as plt

# Given ASR values for two sets
asr_values_set1 = [0.793,0.333, 0.259, 0.206, 0.21, 0.147, 0.136, 0.127, 0.124, 0.124, 0.122]

# Create a list of feature numbers starting from one
feature_numbers = [0,0.05,0.1,0.15,0.20,0.25,0.30,0.35,0.40,0.45,0.50]

# Plotting the graphs
plt.ylim(0, 1.0)
plt.plot(feature_numbers, asr_values_set1, marker='o', linestyle='-', color='b', label='fgsm')
# Adding legend
plt.legend()
plt.xlabel('epsilon')
plt.ylabel('Accuracy')
plt.title('Accuracy vs epsilon')
plt.grid(True)
plt.tight_layout()
plt.savefig('/kaggle/working/model_reimplemented.pdf', format='pdf')
plt.show()


In [None]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:

class RNN(torch.nn.Module):
    def __init__(self, batch_size, window_size, num_features):
        super(RNN, self).__init__()
        self.rnn1 = torch.nn.GRU(num_features, 220, batch_first=True, bidirectional=True)
        self.dropout = torch.nn.Dropout(p=0.5)
        self.fc = torch.nn.Linear(440, 11)  # Adjust the input size to the linear layer
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x):
        rnn1_out, h_t1 = self.rnn1(x)
        rnn1_out1 = self.dropout(rnn1_out)
        fc_out = self.fc(rnn1_out1[:, -1, :])
        out = self.sigmoid(fc_out)
        return out
inputs, classes = next(iter(train_loader))
model5 = RNN(inputs.shape[0],inputs.shape[1],inputs.shape[2])

In [None]:
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model5 = model5.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model5.parameters(), lr=0.001)


n_epochs = 80
start_time = time.time()
for epoch in range(n_epochs):
    train_loss = 0.0
    train_acc = 0.0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        # Reset gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model5(inputs)
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        # Update metrics
        train_loss += loss.item() * inputs.size(0)
        _, preds = torch.max(outputs, 1)
        train_acc += torch.sum(preds == labels.data)

    # Calculate metrics for the epoch
    train_loss /= len(train_loader.dataset)
    train_acc = train_acc.double() / len(train_loader.dataset)

    print(f"Epoch {epoch+1}/{n_epochs} -- Train Loss: {train_loss:.4f} -- Train Accuracy: {train_acc:.4f}")
end_time = time.time()
training_time = end_time - start_time
print(training_time)
total_params = sum(p.numel() for p in model5.parameters())
print(f"Total Model Parameters: {total_params}")


In [None]:
save_path = '/kaggle/working/model_without_defence.pt'
torch.save(model5.state_dict(), save_path)

In [None]:
pip install torchattacks

In [None]:
# import torchvision
import torchattacks
# Load the pre-trained model
model5.to("cpu")
model5.eval()


# Attack parameters
epsilon_fgsm = 0.2  # Perturbation magnitude for FGSM
epsilon_bim = 0.2 # Perturbation magnitude for BIM
epsilon_cw = 0.2 # Perturbation magnitude for C&W

correct_original = 0
correct_adversarial_fgsm = 0
correct_adversarial_bim = 0
correct_adversarial_cw = 0
correct_adversarial_rfgsm=0
correct_adversarial_PGD=0
total = 0

# Iterate over the test set
for inputs, labels in test_loader:

    # Forward pass on the original input
    outputs_original = model5(inputs)
    _, predicted_original = torch.max(outputs_original, 1)

    # FGSM: Craft adversarial example
    attack_fgsm = torchattacks.FGSM(model5, eps=epsilon_fgsm)
    adversarial_inputs_fgsm = attack_fgsm(inputs, labels)

    # BIM: Craft adversarial example
    attack_bim = torchattacks.BIM(model5, eps=epsilon_bim)
    adversarial_inputs_bim = attack_bim(inputs, labels)

    # C&W: Craft adversarial example
    attack_cw = torchattacks.CW(model5, c=epsilon_cw, kappa=0)
    adversarial_inputs_cw = attack_cw(inputs, labels)

    attack_RFGSM = torchattacks.RFGSM(model5, eps=epsilon_bim)
    adversarial_inputs_RFGSM = attack_RFGSM(inputs, labels)

    attack_PGD = torchattacks.PGD(model5, eps=epsilon_bim)
    adversarial_inputs_PGD = attack_PGD(inputs, labels)



    # Forward pass on the adversarial inputs
    outputs_adversarial_fgsm = model5(adversarial_inputs_fgsm)
    outputs_adversarial_bim = model5(adversarial_inputs_bim)
    outputs_adversarial_cw = model5(adversarial_inputs_cw)
    outputs_adversarial_rfgsm = model5(adversarial_inputs_RFGSM)
    outputs_adversarial_PGD = model5(adversarial_inputs_PGD)


    # Update accuracy metrics
    total += labels.size(0)
    correct_original += (predicted_original == labels).sum().item()
    correct_adversarial_fgsm += (torch.argmax(outputs_adversarial_fgsm, dim=1) == labels).sum().item()
    correct_adversarial_bim += (torch.argmax(outputs_adversarial_bim, dim=1) == labels).sum().item()
    correct_adversarial_cw += (torch.argmax(outputs_adversarial_cw, dim=1) == labels).sum().item()
    correct_adversarial_rfgsm += (torch.argmax(outputs_adversarial_rfgsm, dim=1) == labels).sum().item()
    correct_adversarial_PGD += (torch.argmax(outputs_adversarial_PGD, dim=1) == labels).sum().item()


# Calculate and print accuracy
accuracy_original = correct_original / total
accuracy_adversarial_fgsm = correct_adversarial_fgsm / total
accuracy_adversarial_bim = correct_adversarial_bim / total
accuracy_adversarial_cw = correct_adversarial_cw / total
accuracy_adversarial_rfgsm = correct_adversarial_rfgsm / total
accuracy_adversarial_PGD = correct_adversarial_PGD / total

print(f'Accuracy on Original Data: {accuracy_original * 100:.2f}%')
print(f'Accuracy on Adversarial Data (FGSM): {accuracy_adversarial_fgsm * 100:.2f}%')
print(f'Accuracy on Adversarial Data (BIM): {accuracy_adversarial_bim * 100:.2f}%')
print(f'Accuracy on Adversarial Data (C&W): {accuracy_adversarial_cw * 100:.2f}%')
print(f'Accuracy on Adversarial Data (RFGSM): {accuracy_adversarial_rfgsm * 100:.2f}%')
print(f'Accuracy on Adversarial Data (PDG): {accuracy_adversarial_PGD * 100:.2f}%')



In [None]:
import matplotlib.pyplot as plt

# Given ASR values for two sets
asr_values_set1 = [0.604,0.18, 0.173, 0.159, 0.131, 0.131, 0.111, 0.111, 0.111, 0.104, 0.104]
asr_values_set2 = [0.604,0.208, 0.125, 0.097, 0.09,0.083, 0.083,  0.076, 0.076, 0.062, 0.055]
asr_values_set3 = [0.604,0.138, 0.131, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125, 0.125]
asr_values_set4 = [0.604,0.125, 0.062, 0.055, 0.041, 0.034,0.041, 0.048,  0.027, 0.027,0.027]
asr_values_set5 = [0.604,0.125, 0.083, 0.048, 0.034, 0.027, 0.041, 0.041, 0.013, 0.027, 0.041]

# Create a list of feature numbers starting from one
feature_numbers = [0,0.05,0.1,0.15,0.20,0.25,0.30,0.35,0.40,0.45,0.50]

# Plotting the graphs
plt.ylim(0, 1.0)
plt.plot(feature_numbers, asr_values_set1, marker='o', linestyle='-', color='b', label='fgsm')
plt.plot(feature_numbers, asr_values_set2, marker='s', linestyle='-', color='r', label='bim')
plt.plot(feature_numbers, asr_values_set3, marker='^', linestyle='-', color='g', label='cw')  # Different color for set3
plt.plot(feature_numbers, asr_values_set4, marker='*', linestyle='-', color='orange', label='rfgsm')  # Different color for set4
plt.plot(feature_numbers, asr_values_set5, marker='D', linestyle='-', color='purple', label='pgd')
# Adding legend
plt.legend()
plt.xlabel('epsilon')
plt.ylabel('Accuracy')
plt.title('Accuracy vs epsilon')
plt.grid(True)
plt.tight_layout()
plt.savefig('/kaggle/working/model_WB.pdf', format='pdf')
plt.show()


In [None]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:
class RNN(torch.nn.Module):
    def __init__(self, batch_size, window_size, num_features):
        super(RNN, self).__init__()
        self.rnn1 = torch.nn.GRU(num_features, 220, batch_first=True, bidirectional=True)
        self.dropout = torch.nn.Dropout(p=0.5)
        self.fc = torch.nn.Linear(440, 11)  # Adjust the input size to the linear layer
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x):
        rnn1_out, h_t1 = self.rnn1(x)
        rnn1_out1 = self.dropout(rnn1_out)
        fc_out = self.fc(rnn1_out1[:, -1, :])
        out = self.sigmoid(fc_out)
        return out
inputs, classes = next(iter(train_loader))
model5 = RNN(inputs.shape[0],inputs.shape[1],inputs.shape[2])

In [None]:

import torchattacks
# model5.to("cpu")

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model5.parameters(), lr=0.001)
epsilon_fgsm = 0.2  # FGSM perturbation magnitude
epsilon_bim = 0.2 # BIM perturbation magnitude
 # C&W perturbation magnitude

n_epochs = 80
start_time = time.time()
for epoch in range(n_epochs):
    model5.train()
    train_loss = 0.0
    train_acc = 0.0

    for inputs, labels in train_loader:
        # Reset gradients
        optimizer.zero_grad()

        # Forward pass on the original input
        outputs = model5(inputs)
        loss = criterion(outputs, labels)

        # Backward pass on the original input
        loss.backward()
        optimizer.step()
        # FGSM: Add perturbation to the input for FGSM attack

        fgsm_attack = torchattacks.FGSM(model5, eps=epsilon_fgsm)
        fgsm_adversarial_inputs = fgsm_attack(inputs, labels)

        # BIM: Add perturbation to the input for BIM attack
        bim_attack = torchattacks.BIM(model5, eps=epsilon_bim)
        bim_adversarial_inputs = bim_attack(inputs, labels)


        # Forward pass on adversarial inputs for all attacks
        fgsm_adversarial_outputs = model5(fgsm_adversarial_inputs)
        bim_adversarial_outputs = model5(bim_adversarial_inputs)

        # Calculate losses for all attacks
        fgsm_adversarial_loss = criterion(fgsm_adversarial_outputs, labels)
        bim_adversarial_loss = criterion(bim_adversarial_outputs, labels)

        # Backward pass on adversarial inputs for all attacks
        fgsm_adversarial_loss.backward()
        bim_adversarial_loss.backward()

        # Update gradients
        optimizer.step()

        # Update metrics
        train_loss += (fgsm_adversarial_loss.item() + bim_adversarial_loss.item()) * inputs.size(0)
        _, fgsm_preds = torch.max(fgsm_adversarial_outputs, 1)
        _, bim_preds = torch.max(bim_adversarial_outputs, 1)
        train_acc += (torch.sum(fgsm_preds == labels.data) + torch.sum(bim_preds == labels.data))

    # Calculate metrics for the epoch
    train_loss /= (2 * len(train_loader.dataset))
    train_acc = train_acc.double() / (2 * len(train_loader.dataset))

    print(f"Epoch {epoch+1}/{n_epochs} -- Train Loss: {train_loss:.4f} -- Train Accuracy: {train_acc:.4f}")

end_time = time.time()
training_time = end_time - start_time
print(training_time)
total_params = sum(p.numel() for p in model5.parameters())
print(f"Total Model Parameters: {total_params}")

In [None]:
save_path = '/kaggle/working/model_with_defence.pt'
torch.save(model5.state_dict(), save_path)

In [None]:
model5.load_state_dict(torch.load('/kaggle/input/test-data/model_with_defence.pt'))


In [None]:
print(end_time)
print(start_time)

In [None]:
### import torchvision
import torchattacks

# Load the pre-trained model
model5.eval()


# Attack parameters
epsilon_fgsm = 0.5  # Perturbation magnitude for FGSM
epsilon_bim = 0.5# Perturbation magnitude for BIM
epsilon_cw = 0.5# Perturbation magnitude for C&W

correct_original = 0
correct_adversarial_fgsm = 0
correct_adversarial_bim = 0
correct_adversarial_cw = 0
correct_adversarial_rfgsm=0
correct_adversarial_PGD=0
total = 0

# Iterate over the test set
for inputs, labels in test_loader:
    # Forward pass on the original input
    outputs_original = model5(inputs)
    _, predicted_original = torch.max(outputs_original, 1)

    # FGSM: Craft adversarial example
    attack_fgsm = torchattacks.FGSM(model5, eps=epsilon_fgsm)
    adversarial_inputs_fgsm = attack_fgsm(inputs, labels)

    # BIM: Craft adversarial example
    attack_bim = torchattacks.BIM(model5, eps=epsilon_bim)
    adversarial_inputs_bim = attack_bim(inputs, labels)

    # C&W: Craft adversarial example
    attack_cw = torchattacks.CW(model5, c=epsilon_cw, kappa=0,steps=100)
    adversarial_inputs_cw = attack_cw(inputs, labels)

    attack_RFGSM = torchattacks.RFGSM(model5, eps=epsilon_bim)
    adversarial_inputs_RFGSM = attack_RFGSM(inputs, labels)

    attack_PGD = torchattacks.PGD(model5, eps=epsilon_bim)
    adversarial_inputs_PGD = attack_PGD(inputs, labels)



    # Forward pass on the adversarial inputs
    outputs_adversarial_fgsm = model5(adversarial_inputs_fgsm)
    outputs_adversarial_bim = model5(adversarial_inputs_bim)
    outputs_adversarial_cw = model5(adversarial_inputs_cw)
    outputs_adversarial_rfgsm = model5(adversarial_inputs_RFGSM)
    outputs_adversarial_PGD = model5(adversarial_inputs_PGD)


    # Update accuracy metrics
    total += labels.size(0)
    correct_original += (predicted_original == labels).sum().item()
    correct_adversarial_fgsm += (torch.argmax(outputs_adversarial_fgsm, dim=1) == labels).sum().item()
    correct_adversarial_bim += (torch.argmax(outputs_adversarial_bim, dim=1) == labels).sum().item()
    correct_adversarial_cw += (torch.argmax(outputs_adversarial_cw, dim=1) == labels).sum().item()
    correct_adversarial_rfgsm += (torch.argmax(outputs_adversarial_rfgsm, dim=1) == labels).sum().item()
    correct_adversarial_PGD += (torch.argmax(outputs_adversarial_PGD, dim=1) == labels).sum().item()


# Calculate and print accuracy
accuracy_original = correct_original / total
accuracy_adversarial_fgsm = correct_adversarial_fgsm / total
accuracy_adversarial_bim = correct_adversarial_bim / total
accuracy_adversarial_cw = correct_adversarial_cw / total
accuracy_adversarial_rfgsm = correct_adversarial_rfgsm / total
accuracy_adversarial_PGD = correct_adversarial_PGD / total

print(f'Accuracy on Original Data: {accuracy_original * 100:.2f}%')
print(f'Accuracy on Adversarial Data (FGSM): {accuracy_adversarial_fgsm * 100:.2f}%')
print(f'Accuracy on Adversarial Data (BIM): {accuracy_adversarial_bim * 100:.2f}%')
print(f'Accuracy on Adversarial Data (C&W): {accuracy_adversarial_cw * 100:.2f}%')
print(f'Accuracy on Adversarial Data (RFGSM): {accuracy_adversarial_rfgsm * 100:.2f}%')
print(f'Accuracy on Adversarial Data (PDG): {accuracy_adversarial_PGD * 100:.2f}%')



In [None]:
import matplotlib.pyplot as plt

# Given ASR values for two sets
asr_values_set1 = [0.618,0.50, 0.472, 0.451, 0.437, 0.388, 0.347, 0.319, 0.284, 0.201, 0.187]
asr_values_set2 = [0.618,0.479, 0.444, 0.402, 0.381,0.354, 0.312, 0.25, 0.208, 0.173, 0.159]
asr_values_set3 = [0.618,0.458, 0.458, 0.451, 0.451, 0.437, 0.437, 0.437, 0.437, 0.437, 0.437]
asr_values_set4 = [0.618,0.451, 0.423, 0.409, 0.388, 0.368,0.347, 0.291,  0.284, 0.229, 0.194]
asr_values_set5 = [0.618,0.444, 0.444, 0.43, 0.388, 0.354, 0.326, 0.312, 0.236, 0.166, 0.18]

# Create a list of feature numbers starting from one
feature_numbers = [0,0.05,0.1,0.15,0.20,0.25,0.30,0.35,0.40,0.45,0.50]

# Plotting the graphs
plt.ylim(0, 1.0)
plt.plot(feature_numbers, asr_values_set1, marker='o', linestyle='-', color='b', label='fgsm')
plt.plot(feature_numbers, asr_values_set2, marker='s', linestyle='-', color='r', label='bim')
plt.plot(feature_numbers, asr_values_set3, marker='^', linestyle='-', color='g', label='cw')  # Different color for set3
plt.plot(feature_numbers, asr_values_set4, marker='*', linestyle='-', color='orange', label='rfgsm')  # Different color for set4
plt.plot(feature_numbers, asr_values_set5, marker='D', linestyle='-', color='purple', label='pgd')
# Adding legend
plt.legend()

plt.xlabel('epsilon')
plt.ylabel('Accuracy')
plt.title('Accuracy vs epsilon')
plt.grid(True)
plt.tight_layout()
plt.savefig('/kaggle/working/model_WB_Defence.pdf', format='pdf')
plt.show()


In [None]:
import torch
import numpy as np
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True


In [None]:
a = CustomDataset()
trainSize = 0.9
# valSize = 0.05
a = CustomDataset()

# Defining sizes
train_size = int(trainSize * len(a))
test_size = len(a)-train_size

train_dataset, test_dataset = torch.utils.data.random_split(
    a, [train_size, test_size])

train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=4,
                                           shuffle=True,
                                           drop_last=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=4,
                                          shuffle=False)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torchattacks

# Define the generator and discriminator for the GAN
class Generator(nn.Module):
    def __init__(self, batch_size,window_size,num_features,latent_dim=100):
        super(Generator, self).__init__()
        self.batch_size = batch_size
        self.num_features = num_features
        self.window_size = window_size
       # self.fc1 = nn.Linear(latent_dim, 128)
        self.fc1 = nn.Linear(num_features, latent_dim)
        self.fc2 = nn.Linear(latent_dim, 256)
        self.fc3 = nn.Linear(256,batch_size*window_size)
        self.fc4 = nn.Linear(batch_size*window_size,num_features)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        #return x.view(x.size(0), 1, -1)
        return x

class Discriminator(nn.Module):
    def __init__(self, batch_size, window_size, num_features):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(num_features, 160)
        self.fc2 = nn.Linear(160, 200)
        self.fc3 = nn.Linear(200, 256)
        self.fc4 = nn.Linear(256, 512)
        self.fc5 = nn.Linear(512, 1)
        self.window_size = window_size
        self.num_features = num_features
        self.batch_size = batch_size

    def forward(self, x):
      #x = x.view(-1, self.num_features)
      x = F.relu(self.fc1(x))
      x = F.relu(self.fc2(x))
      x = F.relu(self.fc3(x))
      x = F.relu(self.fc4(x))
      x = torch.sigmoid(self.fc5(x))
      return x



# Define the GAN training function

def train_gan(generator, discriminator, train_loader, num_epochs=100, lr=0.0002,
              device=torch.device('cpu')):

    generator.to(device)
    discriminator.to(device)

    # Define the loss functions and optimizers

    adversarial_loss = nn.BCELoss()
    generator_optimizer = torch.optim.Adam(generator.parameters(), lr=lr)
    discriminator_optimizer = torch.optim.Adam(discriminator.parameters(), lr=lr)


    for epoch in range(num_epochs):
        for i, (inputs,Labels) in enumerate(train_loader):
            # Move data to device
            inputs = inputs.to(device)
            Labels = Labels.to(device)

            # Train discriminator on real data
            discriminator_optimizer.zero_grad()
            real_labels = torch.ones(inputs.size(0), inputs.size(1), 1).to(device)
            real_outputs = discriminator(inputs)
            discriminator_loss_real = adversarial_loss(real_outputs, real_labels)
            discriminator_loss_real.backward()

            # Train discriminator on fake data generated by the generator
            generator_optimizer.zero_grad()
            latent_inputs = torch.randn(inputs.shape[0], inputs.shape[1], inputs.shape[2]).to(device)
            fake_inputs = generator(latent_inputs)
            fake_labels = torch.zeros(inputs.size(0), inputs.size(1), 1).to(device)
            fake_outputs = discriminator(fake_inputs)
            discriminator_loss_fake = adversarial_loss(fake_outputs, fake_labels)
            discriminator_loss_fake.backward()
            discriminator_optimizer.step()

            attack_fgsm = torchattacks.FGSM(model5, eps=0.05)
            adversarial_inputs_fgsm = attack_fgsm(inputs, torch.ones_like(Labels))
            attack_bim = torchattacks.BIM(model5, eps=0.05)
            adversarial_inputs_bim = attack_bim(inputs, torch.ones_like(Labels))
            adversarial_inputs_combined = torch.cat([adversarial_inputs_fgsm, adversarial_inputs_bim], dim=0)
            fake_labels = torch.zeros(adversarial_inputs_combined.size(0), adversarial_inputs_combined.size(1), 1).to(device)
            fake_outputs = discriminator(adversarial_inputs_combined)
            discriminator_loss_fake = adversarial_loss(fake_outputs, fake_labels)
            discriminator_loss_fake.backward()
            discriminator_optimizer.step()


            # Train generator to generate samples that increase the discriminator loss
            generator_optimizer.zero_grad()
            latent_inputs = torch.randn(inputs.shape[0], inputs.shape[1], inputs.shape[2]).to(device)
            fake_inputs = generator(latent_inputs)
            # @S: generator wants to maximize the probability of the discriminator being wrong. So loss is computed between discriminator's output to a fake image and the
            #fake label (if the fake label is 0)
            fake_labels = torch.zeros(inputs.size(0), inputs.size(1), 1).to(device)
            generator_loss =adversarial_loss(discriminator(fake_inputs), real_labels)
            generator_loss.backward()
            generator_optimizer.step()
            if i % 10 == 0:
              print('Epoch [{}/{}], Step [{}/{}], Discriminator Loss: {:.4f}, Generator Loss: {:.4f}'
                      .format(epoch, num_epochs, i, len(train_loader),
discriminator_loss_real.item() + discriminator_loss_fake.item(), generator_loss.item()))
                # Return the trained generator
    return generator,discriminator


# Define the number of epochs and learning rate for training the GAN
num_epochs = 100
lr = 0.0002
inputs, classes = next(iter(train_loader))
# Create the GAN models
batch_size, window_size, num_features = inputs.shape
generator = Generator(batch_size,window_size,num_features)
discriminator = Discriminator(batch_size, window_size, num_features)

# Train the GAN on your data
trained_generator_new,trained_discriminator = train_gan(generator, discriminator, train_loader, num_epochs=num_epochs, lr=lr)

In [None]:
torch.save(trained_generator_new.state_dict(), '/kaggle/working/trained_generator_Faulttype.pth')
torch.save(trained_discriminator.state_dict(), '/kaggle/working/trained_discriminator_Faulttype.pth')

In [None]:
noise4 = torch.randn(800, 16, 51)

generated_data4 = trained_generator_new(noise4)

surrogate_outputs = model5(generated_data4)

# Get the predicted class labels for each sample in the generated data
predicted_labels = torch.argmax(surrogate_outputs, dim=1)
print(predicted_labels)
# Print the predicted labels

In [None]:
# List to store predicted class labels
predicted_labels = []

# Set the discriminator to evaluation mode
trained_discriminator.eval()

with torch.no_grad():
    for inputs, _ in test_loader:

        # Get the discriminator's output for the current batch
        surrogate_outputs = trained_discriminator(inputs)

        # Apply thresholding to determine class labels
        threshold = 0.45
        batch_predicted_labels = (surrogate_outputs >= threshold).int()  # 1 if >= threshold, 0 otherwise
        predicted_labels.append(batch_predicted_labels)

# Convert the list of predicted labels to a single tensor
predicted_labels = torch.cat(predicted_labels, dim=0)

# Print the predicted labels
print(predicted_labels)


In [None]:
# True labels (all 1)
true_labels = torch.ones_like(predicted_labels)

# Calculate accuracy
correct_predictions = (predicted_labels == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")



In [None]:

noise4 = torch.randn(1500, 16, 51)

generated_data4 = trained_generator_new(noise4)

surrogate_outputs = trained_discriminator(generated_data4)
predicted_labels = []

# Apply thresholding to determine class labels
threshold = 0.45
batch_predicted_labels = (surrogate_outputs >= threshold).int()  # 1 if >= threshold, 0 otherwise
predicted_labels.append(batch_predicted_labels)

# Convert the list of predicted labels to a single tensor
predicted_labels = torch.cat(predicted_labels, dim=0)

# Print the predicted labels
print(predicted_labels)


In [None]:
# True labels (all 0)
true_labels = torch.zeros_like(predicted_labels)

# Calculate accuracy
correct_predictions = (predicted_labels == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")

In [None]:

import torchvision
import torchattacks

# Load the pre-trained model
model5.eval()
predicted_labels_fgsm= []
predicted_labels_bim= []
predicted_labels_cw= []
predicted_labels_RFGSM=[]
predicted_labels_PGD=[]

# Attack parameters
epsilon_fgsm = 0.5 # Perturbation magnitude for FGSM
epsilon_bim = 0.5# Perturbation magnitude for BIM
epsilon_cw = 0.5# Perturbation magnitude for C&W

threshold = 0.5
correct_original = 0
correct_adversarial_fgsm = 0
correct_adversarial_bim = 0
correct_adversarial_cw = 0
total = 0

# Iterate over the test set
for inputs, labels in test_loader:
    # Forward pass on the original input
    outputs_original = model5(inputs)
    _, predicted_original = torch.max(outputs_original, 1)

    # FGSM: Craft adversarial example
    attack_fgsm = torchattacks.FGSM(model5, eps=epsilon_fgsm)
    adversarial_inputs_fgsm = attack_fgsm(inputs, labels)

    # BIM: Craft adversarial example
    attack_bim = torchattacks.BIM(model5, eps=epsilon_bim)
    adversarial_inputs_bim = attack_bim(inputs, labels)

    # C&W: Craft adversarial example
    attack_cw = torchattacks.CW(model5, c=epsilon_cw, kappa=0)
    adversarial_inputs_cw = attack_cw(inputs, labels)


    attack_RFGSM = torchattacks.RFGSM(model5, eps=epsilon_bim)
    adversarial_inputs_RFGSM = attack_RFGSM(inputs, labels)

    attack_PGD = torchattacks.PGD(model5, eps=epsilon_bim)
    adversarial_inputs_PGD = attack_PGD(inputs, labels)

    # Forward pass on the adversarial inputs
    outputs_adversarial_fgsm = trained_discriminator(adversarial_inputs_fgsm)
    batch_predicted_labels1 = (outputs_adversarial_fgsm >= threshold).int()  # 1 if >= threshold, 0 otherwise
    predicted_labels_fgsm.append(batch_predicted_labels1)
    outputs_adversarial_bim = trained_discriminator(adversarial_inputs_bim)
    batch_predicted_labels2 = (outputs_adversarial_bim >= threshold).int()
    predicted_labels_bim.append(batch_predicted_labels2)
    outputs_adversarial_cw = trained_discriminator(adversarial_inputs_cw)
    batch_predicted_labels3 = (outputs_adversarial_cw >= threshold).int()
    predicted_labels_cw.append(batch_predicted_labels3)
    outputs_adversarial_RFGSM = trained_discriminator(adversarial_inputs_RFGSM)
    batch_predicted_labels4 = (outputs_adversarial_RFGSM >= threshold).int()
    predicted_labels_RFGSM.append(batch_predicted_labels4)
    outputs_adversarial_PGD = trained_discriminator(adversarial_inputs_PGD)
    batch_predicted_labels5 = (outputs_adversarial_PGD >= threshold).int()
    predicted_labels_PGD.append(batch_predicted_labels5)


predicted_labels = torch.cat(predicted_labels_fgsm, dim=0)
predicted_labels1 = torch.cat(predicted_labels_bim, dim=0)
predicted_labels2= torch.cat(predicted_labels_cw, dim=0)
predicted_labels3= torch.cat(predicted_labels_RFGSM, dim=0)
predicted_labels4= torch.cat(predicted_labels_PGD, dim=0)

print(predicted_labels)

In [None]:
# True labels (all 0)
true_labels = torch.zeros_like(predicted_labels)

# Calculate accuracy
correct_predictions = (predicted_labels == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")

In [None]:
# True labels (all 0)
true_labels = torch.zeros_like(predicted_labels1)

# Calculate accuracy
correct_predictions = (predicted_labels1 == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")

In [None]:
# True labels (all 0)
true_labels = torch.zeros_like(predicted_labels2)

# Calculate accuracy
correct_predictions = (predicted_labels2 == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")

In [None]:
###### True labels (all 0)
true_labels = torch.zeros_like(predicted_labels3)

# Calculate accuracy
correct_predictions = (predicted_labels3 == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")

In [None]:
# True labels (all 0)
true_labels = torch.zeros_like(predicted_labels4)

# Calculate accuracy
correct_predictions = (predicted_labels4 == true_labels).sum().item()
total_samples = true_labels.numel()  # Total number of elements in the tensor
accuracy = correct_predictions / total_samples * 100

print(f"Accuracy: {accuracy:.2f}%")