In [1]:
import numpy as np
import pandas as pd

from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix
from sklearn.preprocessing import label_binarize

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
# from OpenMax import openmax
import libmr



import time

In [2]:
head = {
            "model" : '',
            "attack_model": '',
            'epsilon': '',
            'Accuracy': '',
            'Macro Precision': '',
            'Weighted Precision': '',
            'Macro Recall': '',
            'Weighted Recall': '',
            'Macro F1': '',
            'Weighted F1': '',
            # 'Macro AUC': '',
            # 'Weighted AUC': '',
            'TPR': '',
            'FNR': '',
            'TNR': '',
            'FPR': '',
        }
head = pd.DataFrame([head])
head.to_csv("/home/jovyan/Sample_Based_Extension/UNSW/UNSW_Defense/RGAN/RGAN_Openmax.csv", mode='a', index=False)


In [3]:
# def calculate_performance_metrics(X_test, y_test, model, model_name, attack_name, eps):
#     model.eval()
#     device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#     model.to(device)
    
#     all_preds = []
#     all_labels = []
#     probabilities = []

#     num_classes = len(np.unique(y_test))
    
#     X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
#     y_test_tensor = torch.tensor(y_test, dtype=torch.long)
#     test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
#     test_loader = DataLoader(dataset=test_dataset)

#     with torch.no_grad():
        
#         for inputs, labels in test_loader:
#             inputs, labels = inputs.to(device), labels.to(device)
#             outputs = model(inputs)
#             preds = torch.argmax(outputs, dim=1)
#             all_preds.extend(preds.cpu().numpy())
#             all_labels.extend(labels.cpu().numpy())
#             probabilities.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
        
#         all_preds = np.array(all_preds)
#         all_labels = np.array(all_labels)
#         probabilities = np.array(probabilities)

#         np.save(f"/home/jovyan/Sample_Based_Extension/UNSW/UNSW_Defense_Label/UNSW_Def11/y_pred_{attack_name}{eps}_Def11.npy", all_preds)


In [4]:
x_test = np.load('/home/jovyan/Sample_Based_Extension/UNSW/x_test.npy')
x_train = np.load('/home/jovyan/Sample_Based_Extension/UNSW/x_train.npy')
x_val = np.load('/home/jovyan/Sample_Based_Extension/UNSW/x_val.npy')
y_test = np.load('/home/jovyan/Sample_Based_Extension/UNSW/y_test.npy')
y_train = np.load('/home/jovyan/Sample_Based_Extension/UNSW/y_train.npy')
y_val = np.load('/home/jovyan/Sample_Based_Extension/UNSW/y_val.npy')

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cuda device


In [6]:
x_train_tensor = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)

x_val_tensor = torch.tensor(x_val, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(device)

train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)

val_dataset = TensorDataset(x_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=100, shuffle=True)

In [7]:
class DNNModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.bn1 = nn.BatchNorm1d(50)
        self.fc2 = nn.Linear(50, 30)
        self.bn2 = nn.BatchNorm1d(30)
        self.fc3 = nn.Linear(30, 20)
        self.bn3 = nn.BatchNorm1d(20)
        self.fc4 = nn.Linear(20, output_size)

    def forward(self, x,return_features=False):
        x = self.bn1(F.relu(self.fc1(x)))
        x = self.bn2(F.relu(self.fc2(x)))
        x = self.bn3(F.relu(self.fc3(x)))
        if return_features:
            return x
        logits = self.fc4(x)
        return logits

class Generator(nn.Module):
    def __init__(self, input_size):
        super(Generator, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, input_size)

    def forward(self, x):
        perturbation = torch.tanh(self.fc1(x))
        perturbation = self.fc2(perturbation)
        perturbation = torch.clamp(perturbation, -0.1, 0.1)
        return perturbation

class Discriminator(nn.Module):
    def __init__(self, input_size):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

def compute_mav(features, labels, num_classes):
    mavs = {}
    for i in range(num_classes):
        class_features = features[labels == i]
        mavs[i] = class_features.mean(axis=0)
    return mavs


def fit_weibull(mavs, features, labels, tailsize=20):
    weibull_model = {}
    for class_idx in mavs.keys():
        distances = [np.linalg.norm(f - mavs[class_idx]) for f in features[labels == class_idx]]
        mr = libmr.MR()
        mr.fit_high(distances, tailsize)
        weibull_model[class_idx] = mr
    return weibull_model


def compute_openmax(logits, mavs, weibull_model, alpha=10):
    num_classes = logits.shape[1]
    revised_scores = np.copy(logits)

    for i in range(num_classes):
        distance = np.linalg.norm(logits - mavs[i])
        weibull = weibull_model[i].w_score(distance)
        revised_scores[:, i] = logits[:, i] * (1 - weibull)

    openmax_scores = np.exp(revised_scores) / np.sum(np.exp(revised_scores), axis=1, keepdims=True)
    return openmax_scores

In [8]:
input_size = x_train.shape[1]
output_size = len(np.unique(y_train))

In [9]:
def train_rgan(model, G, D, train_loader, val_loader, device, epochs=10, lr=0.001, patience=3, min_delta=0.001):
    optimizer_G = optim.Adam(G.parameters(), lr=lr)
    optimizer_D = optim.Adam(D.parameters(), lr=lr)
    optimizer_M = optim.Adam(model.parameters(), lr=lr)
    loss_function = nn.CrossEntropyLoss()

    best_loss = float("inf")
    patience_counter = 0

    for epoch in range(epochs):
        model.train()
        G.train()
        D.train()

        train_loss = 0.0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            adv_inputs = inputs + G(inputs).detach().to(device)
            adv_inputs = torch.clamp(adv_inputs, -1, 1)

            optimizer_D.zero_grad()
            real_loss = -torch.mean(D(inputs))
            fake_loss = torch.mean(D(G(inputs)))
            loss_D = real_loss + fake_loss
            loss_D.backward()
            optimizer_D.step()

            optimizer_G.zero_grad()
            fake_loss = -torch.mean(D(G(inputs)))
            fake_loss.backward()
            optimizer_G.step()

            optimizer_M.zero_grad()
            outputs = model(adv_inputs)
            loss_M = loss_function(outputs, labels)
            loss_M.backward()
            optimizer_M.step()

            train_loss += loss_M.item()

        avg_train_loss = train_loss / len(train_loader)

        model.eval()
        val_loss = 0.0
        correct_predictions = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = loss_function(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                correct_predictions += (predicted == labels).sum().item()

        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = correct_predictions / len(val_loader.dataset)

        print(f"Epoch {epoch+1}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

        if best_loss - avg_val_loss > min_delta:
            best_loss = avg_val_loss
            patience_counter = 0
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print("Early stopping triggered")
            break


In [10]:
model = DNNModel(input_size, output_size).to(device)
G = Generator(input_size).to(device)
D = Discriminator(input_size).to(device)

train_rgan(model, G, D, train_loader, val_loader, device)



Epoch 1, Training Loss: 0.1736, Validation Loss: 0.6405, Validation Accuracy: 0.8966
Epoch 2, Training Loss: 0.1224, Validation Loss: 1.0305, Validation Accuracy: 0.8995
Epoch 3, Training Loss: 0.1131, Validation Loss: 1.6646, Validation Accuracy: 0.8123
Epoch 4, Training Loss: 0.1087, Validation Loss: 2.1404, Validation Accuracy: 0.8225
Early stopping triggered


In [11]:
x_train_tensor = torch.tensor(x_train, dtype=torch.float32, device=device)

with torch.no_grad():
    features_train = model(x_train_tensor, return_features=True).cpu().numpy()

# features_train = model.fc3(x_train_tensor).detach().cpu().numpy()


# features_train = model.fc3(x_train).detach().cpu().numpy()
mavs = compute_mav(features_train, y_train, output_size)
weibull_model = fit_weibull(mavs, features_train, y_train)

In [12]:

def calculate_performance_metrics(X_test, y_test, model, model_name, attack_name, eps, mavs=mavs, weibull_model=weibull_model):

    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    all_preds = []
    all_labels = []
    probabilities = []

    num_classes = len(np.unique(y_test))
    
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    test_loader = DataLoader(dataset=test_dataset)

    with torch.no_grad():
        
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            features_test = model(inputs, return_features=True)
            openmax_scores = compute_openmax(features_test.cpu().numpy(), mavs, weibull_model)
            preds = np.argmax(openmax_scores, axis=1)

            # outputs = model(inputs)
            
            # openmax_scores = compute_openmax(outputs.detach().cpu().numpy(), mavs, weibull_model)
            
            # preds = np.argmax(openmax_scores, axis=1)
            
            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())
            probabilities.extend(openmax_scores)
        
        all_preds = np.array(all_preds)
        all_labels = np.array(all_labels)
        probabilities = np.array(probabilities)
        
        accuracy = accuracy_score(all_labels, all_preds)

        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')

        print(f"Accuracy: {accuracy}")
        
        print("\nmacro")
        print(f"Precision: {precision_macro}\nRecall: {recall_macro}\nF1 Score: {f1_macro}")
    
        print("\nweighted")
        print(f"Precision: {precision_weighted}\nRecall: {recall_weighted}\nF1 Score: {f1_weighted}")
        print()
        
        new_row = {
            "model" : model_name,
            "attack_model" : attack_name,
            'epsilon': eps,
            'Accuracy': accuracy,
            'Macro Precision': precision_macro,
            'Weighted Precision': precision_weighted,
            'Macro Recall': recall_macro,
            'Weighted Recall': recall_weighted,
            'Macro F1': f1_macro,
            'Weighted F1': f1_weighted,
        }
        new_row_df = pd.DataFrame([new_row])
        new_row_df.to_csv("/home/jovyan/Sample_Based_Extension/UNSW/UNSW_Defense/RGAN/RGAN_Openmax.csv", mode='a', index=False, header=False)


In [13]:
calculate_performance_metrics(x_test, y_test, model, 'DNN', 'baseline', '0')

KeyError: 2

In [14]:
epsilon_values = [0.01, 0.1, 0.2, 0.3]

# Iterate over epsilon values
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_BIM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'BIM', epsilon)

for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_FGSM_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'FGSM', epsilon)

for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_PGD_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'PGD', epsilon)

Accuracy: 0.8778899895587474

macro
Precision: 0.8549099041005147
Recall: 0.8819129502557104
F1 Score: 0.865142826856663

weighted
Precision: 0.8876371896655947
Recall: 0.8778899895587474
F1 Score: 0.8801203792427222

Accuracy: 0.5505611076501917

macro
Precision: 0.528640168967216
Recall: 0.5324336920830133
F1 Score: 0.5224755554049633

weighted
Precision: 0.5926069254004709
Recall: 0.5505611076501917
F1 Score: 0.5643102251404299

Accuracy: 0.340727027050741

macro
Precision: 0.3407095928375604
Recall: 0.31680333178636344
F1 Score: 0.31848144126982303

weighted
Precision: 0.4059626829154792
Recall: 0.340727027050741
F1 Score: 0.36296072892880865

Accuracy: 0.220266559036948

macro
Precision: 0.23197275054800975
Recall: 0.1933738653252228
F1 Score: 0.2036853357554786

weighted
Precision: 0.29018698061225945
Recall: 0.220266559036948
F1 Score: 0.24519485440569003

Accuracy: 0.8781532144141931

macro
Precision: 0.855191213588397
Recall: 0.8822083892728689
F1 Score: 0.8654319263064946

we

In [15]:
epsilon_values = [0.01, 0.1, 0.2, 0.3]

print("start DF")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_DF_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'DF', epsilon)

print("start AutoPGD")
for epsilon in epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_AutoPGD_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'AutoPGD', epsilon)


print("start ZOO")
ZOO_epsilon_values = [0, 0.01, 0.1, 0.2, 0.3]
# Iterate over epsilon values
for epsilon in ZOO_epsilon_values:
    filename = f'/home/jovyan/Sample_Based_Extension/UNSW/transfer_attack/x_test_adv_ZOO_eps_{epsilon}.npy'
    x_test_adv = np.load(filename)

    calculate_performance_metrics(x_test_adv, y_test, model, 'DNN', 'ZOO', epsilon)

start DF
Accuracy: 0.16368198927797423

macro
Precision: 0.18935299747625028
Recall: 0.16459313080919416
F1 Score: 0.1609621176915273

weighted
Precision: 0.22684106467488907
Recall: 0.16368198927797423
F1 Score: 0.1782189882548816

Accuracy: 0.16572636898860235

macro
Precision: 0.19126642522433537
Recall: 0.16524927623909524
F1 Score: 0.16260885061765434

weighted
Precision: 0.22983553556734104
Recall: 0.16572636898860235
F1 Score: 0.1810660477488452

Accuracy: 0.16806907020206896

macro
Precision: 0.19338035710578427
Recall: 0.16597147405970936
F1 Score: 0.16445560324447048

weighted
Precision: 0.23318368750236432
Recall: 0.16806907020206896
F1 Score: 0.18430484248546936

Accuracy: 0.16978880592431408

macro
Precision: 0.19477145987155767
Recall: 0.16601730994830544
F1 Score: 0.16565215845609557

weighted
Precision: 0.2357157234622573
Recall: 0.16978880592431408
F1 Score: 0.18687457751080266

start AutoPGD
Accuracy: 0.8889717559730107

macro
Precision: 0.866804893030429
Recall: 0.89