In [1]:
import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix
from sklearn.preprocessing import label_binarize

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

import time

In [2]:
# head = {
#             "model" : '',
#             "attack_model": '',
#             'epsilon': '',
#             'Accuracy': '',
#             'Macro Precision': '',
#             'Weighted Precision': '',
#             'Macro Recall': '',
#             'Weighted Recall': '',
#             'Macro F1': '',
#             'Weighted F1': '',
#             # 'Macro AUC': '',
#             # 'Weighted AUC': '',
#             'TPR': '',
#             'FNR': '',
#             'TNR': '',
#             'FPR': '',
#         }
# head = pd.DataFrame([head])
# head.to_csv("/home/jovyan/Sample_Based_Extension/WUSTL/WUSTL_Defense/Adversarial_Training/interpolated.csv", mode='a', index=False)


In [3]:
# def calculate_performance_metrics(X_test, y_true, model, model_name, attack_name, eps):
#     model.eval()
#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     model.to(device)
    
#     all_preds = []
#     all_labels = []
#     probabilities = []

#     num_classes = len(np.unique(y_true))
    
#     X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
#     y_test_tensor = torch.tensor(y_test, dtype=torch.long)
#     test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
#     test_loader = DataLoader(dataset=test_dataset)

#     with torch.no_grad():
        
#         for inputs, labels in test_loader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)
#             preds = torch.argmax(outputs, dim=1)
#             all_preds.extend(preds.cpu().numpy())
#             all_labels.extend(labels.cpu().numpy())
#             probabilities.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
        
#         all_preds = np.array(all_preds)
#         all_labels = np.array(all_labels)
#         probabilities = np.array(probabilities)
        
#         accuracy = accuracy_score(all_labels, all_preds)

#         precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro')
#         precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    
#         # macro_auc = roc_auc_score(label_binarize(all_labels, classes=range(num_classes)), probabilities[:,1], average='macro')
#         # weighted_auc = roc_auc_score(label_binarize(all_labels, classes=range(num_classes)), probabilities[:,1], average='weighted')

#         cm = confusion_matrix(all_labels, all_preds)

#         def calculate_class_metrics_macro(cm, class_index):
#             TP = cm[class_index, class_index]
#             FP = cm[:, class_index].sum() - TP
#             FN = cm[class_index, :].sum() - TP
#             TN = cm.sum() - (TP + FP + FN)
            
#             TPR = TP / (TP + FN) if (TP + FN) != 0 else 0  
#             TNR = TN / (TN + FP) if (TN + FP) != 0 else 0  
#             FPR = FP / (FP + TN) if (FP + TN) != 0 else 0  
#             FNR = FN / (FN + TP) if (FN + TP) != 0 else 0  
            
#             return TPR, TNR, FPR, FNR
            
#         metrics = np.array([calculate_class_metrics_macro(cm, i) for i in range(num_classes)])
#         TPR_macro, TNR_macro, FPR_macro, FNR_macro = np.mean(metrics, axis=0)

#         print(f"Accuracy: {accuracy}")
        
#         print("\nmacro")
#         print(f"Precision: {precision_macro}\nRecall: {recall_macro}\nF1 Score: {f1_macro}")
    
#         print("\nweighted")
#         print(f"Precision: {precision_weighted}\nRecall: {recall_weighted}\nF1 Score: {f1_weighted}")
#         print()
        
#         print(f"Mean FNR: {FNR_macro}\nMean TNR: {TNR_macro}\nMean FPR: {FPR_macro}\nMean TPR: {TPR_macro}")

#         new_row = {
#             "model" : model_name,
#             "attack_model" : attack_name,
#             'epsilon': eps,
#             'Accuracy': accuracy,
#             'Macro Precision': precision_macro,
#             'Weighted Precision': precision_weighted,
#             'Macro Recall': recall_macro,
#             'Weighted Recall': recall_weighted,
#             'Macro F1': f1_macro,
#             'Weighted F1': f1_weighted,
#             # 'Macro AUC': macro_auc,
#             # 'Weighted AUC': weighted_auc,
#             'TPR': TPR_macro,
#             'FNR': FNR_macro,
#             'TNR': TNR_macro,
#             'FPR': FPR_macro,
#         }
#         new_row_df = pd.DataFrame([new_row])
#         new_row_df.to_csv("/home/jovyan/Sample_Based_Extension/WUSTL/WUSTL_Defense/Adversarial_Training/interpolated.csv", mode='a', index=False, header=False)



In [4]:
def calculate_performance_metrics(X_test, y_test, model, model_name, attack_name, eps):
    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    all_preds = []
    all_labels = []
    probabilities = []

    num_classes = len(np.unique(y_test))
    
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    test_loader = DataLoader(dataset=test_dataset)

    with torch.no_grad():
        
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            probabilities.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
        
        all_preds = np.array(all_preds)
        all_labels = np.array(all_labels)
        probabilities = np.array(probabilities)

        np.save(f"/home/jovyan/Sample_Based_Extension/WUSTL/WUSTL_Defense_Label/WUSTL_Def2/y_pred_{attack_name}{eps}_Def2.npy", all_preds)

        

In [5]:
x_test = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/x_test.npy')
x_train = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/x_train.npy')
x_val = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/x_val.npy')
y_test = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/y_test.npy')
y_train = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/y_train.npy')
y_val = np.load('/home/jovyan/Sample_Based_Extension/WUSTL/y_val.npy')

In [6]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cuda device


In [7]:
input_shape = x_train.shape[1]
output_shape = len(np.unique(y_train))

In [8]:
x_train_tensor = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)

x_val_tensor = torch.tensor(x_val, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(device)

train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)

val_dataset = TensorDataset(x_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=100, shuffle=True)

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class DNNModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, 30)
        self.fc3 = nn.Linear(30, 20)
        self.fc4 = nn.Linear(20, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x


In [10]:
# Adversarial training function with interpolation
def interpolated_adversarial_training(model, train_loader, optimizer, epoch, device, eps=0.3, alpha=2/255, iters=40):
    model.train()
    loss_fn = nn.CrossEntropyLoss()

    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        # Create adversarial examples
        data_adv = data.clone().detach().requires_grad_(True).to(device)
        outputs = model(data_adv)
        loss = loss_fn(outputs, target)
        loss.backward()
        data_adv = data_adv + alpha * data_adv.grad.sign()
        eta = torch.clamp(data_adv - data, min=-eps, max=eps)
        data_adv = torch.clamp(data + eta, min=0, max=1).detach_()

        # Interpolation
        lam = np.random.beta(1.0, 1.0)
        data_mixed = lam * data + (1 - lam) * data_adv
        target_mixed = lam * target + (1 - lam) * target

        optimizer.zero_grad()
        output = model(data_mixed)
        target_mixed = target_mixed.long()
        
        loss = loss_fn(output, target_mixed)
        loss.backward()
        optimizer.step()

        if batch_idx % 1000 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}'
                  f' ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')

# Model initialization
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
input_size = input_shape  # Replace with your input shape
output_size = output_shape  # Replace with your output shape
model = DNNModel(input_size=input_size, output_size=output_size).to(device)

# Compile model
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_function = nn.CrossEntropyLoss()

# Early stopping variables
min_delta = 0.001
patience = 5
patience_counter = 0
best_loss = float('inf')



In [11]:
# Training loop with early stopping and validation evaluation
for epoch in range(10):
    interpolated_adversarial_training(model, train_loader, optimizer, epoch, device, eps=0.3, alpha=2/255, iters=40)

    # Validation
    model.eval()
    val_train_loss = 0.0
    correct_predictions = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = loss_function(outputs, labels)
            val_train_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            correct_predictions += (predicted == labels).sum().item()

    avg_val_loss = val_train_loss / len(val_loader)
    val_accuracy = correct_predictions / len(val_dataset)

    print(f"Epoch {epoch+1}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    # Early stopping check using min_delta
    if best_loss - avg_val_loss > min_delta:
        best_loss = avg_val_loss
        patience_counter = 0
    else:
        patience_counter += 1

    if patience_counter >= patience:
        print("Early stopping triggered")
        break


Epoch 1, Validation Loss: 0.0010, Validation Accuracy: 0.9999
Epoch 2, Validation Loss: 0.0006, Validation Accuracy: 0.9999
Epoch 3, Validation Loss: 0.0005, Validation Accuracy: 0.9999
Epoch 4, Validation Loss: 0.0007, Validation Accuracy: 0.9999
Epoch 5, Validation Loss: 0.0006, Validation Accuracy: 0.9999
Epoch 6, Validation Loss: 0.0005, Validation Accuracy: 0.9999
Early stopping triggered


In [12]:
calculate_performance_metrics(x_test, y_test, model, 'DNN', 'baseline', '0')

In [13]:
epsilon_values = [0.01, 0.1, 0.2, 0.3]

# Iterate over epsilon values
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_BIM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'BIM', epsilon)

for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_FGSM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'FGSM', epsilon)

for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_PGD_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'PGD', epsilon)

In [14]:
epsilon_values = [0.01, 0.1, 0.2, 0.3]

print("start DF")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_DF_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'DF', epsilon)

print("start AutoPGD")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_AutoPGD_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'AutoPGD', epsilon)




start DF
start AutoPGD
start ZOO


FileNotFoundError: [Errno 2] No such file or directory: '/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_ZOO_eps_0.npy'

In [15]:
epsilon_values = [0.01, 0.1, 0.2, 0.3]

print("start ZOO")
# Iterate over epsilon values
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_ZOO_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'ZOO', epsilon)
    
print("start CaFA")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_CaFA_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'CaFA', epsilon)

print("start SINIFGSM")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_SINIFGSM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'SINIFGSM', epsilon)




start ZOO
start CaFA
start SINIFGSM


In [16]:
print("start VNIFGSM")
# Iterate over epsilon values
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_VNIFGSM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'VNIFGSM', epsilon)

start VNIFGSM


In [17]:
torch.save(model.state_dict(), "/home/jovyan/Sample_Based_Extension/WUSTL/WUSTL_Defense/Adversarial_Training/Adversarial_Training_interpolated.pt")

In [18]:
model.load_state_dict(torch.load("/home/jovyan/Sample_Based_Extension/WUSTL/WUSTL_Defense/Adversarial_Training/Adversarial_Training_interpolated.pt"))

  model.load_state_dict(torch.load("/home/jovyan/Sample_Based_Extension/UNSW/UNSW_Defense/Adversarial_Training/Adversarial_Training_interpolated.pt"))


<All keys matched successfully>

In [19]:
# import time

# epsilon_values = [0.01, 0.1, 0.2, 0.3]

# start_time = time.time()

# for epsilon in epsilon_values:
#     filename = f'/home/jovyan/Sample_Based_Extension/WUSTL/transfer_attack/x_test_adv_BIM_eps_{epsilon}.npy'
#     x_test_adv = np.load(filename)

#     calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'BIM', epsilon)

# end_time = time.time()
# result = end_time - start_time
# print(f"Execution Time: {result:.6f} seconds")