In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score, confusion_matrix, accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split
import torch

import time


import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

from art.attacks.evasion import SimBA, SpatialTransformation, DeepFool, BasicIterativeMethod, FastGradientMethod, ProjectedGradientDescent
from art.estimators.classification import PyTorchClassifier

import time

In [4]:
# from a2pm import A2PMethod
# from a2pm.callbacks import BaseCallback, MetricCallback, TimeCallback
# from a2pm.patterns import BasePattern, CombinationPattern, IntervalPattern
# from a2pm.wrappers import BaseWrapper, KerasWrapper, SklearnWrapper, TorchWrapper

In [5]:
X_test = np.load('/home/jovyan/Wustl_iiot/x_test.npy')
X_train = np.load('/home/jovyan/Wustl_iiot/x_train.npy')
X_val = np.load('/home/jovyan/Wustl_iiot/x_val.npy')
y_test = np.load('/home/jovyan/Wustl_iiot/y_test.npy')
y_train = np.load('/home/jovyan/Wustl_iiot/y_train.npy')
y_val = np.load('/home/jovyan/Wustl_iiot/y_val.npy')

In [5]:
ratio = 0.3
xtext = f"/home/jovyan/A2PM/A2PM_adv_sample/A2PM_{ratio}_fullattack_X.npy"
X_adv = np.load(xtext)
ytext = f"/home/jovyan/A2PM/A2PM_adv_sample/A2PM_{ratio}_fullattack_y.npy"
y_adv = np.load(ytext)

X_train_adv, X_test_adv, y_train_adv, y_test_adv = train_test_split(X_adv, y_adv, test_size=0.2, random_state=42)

In [None]:
def calculate_performance_metrics(X_test, y_true, model, model_name, attack_name, eps):
    model.eval()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    
    all_preds = []
    all_labels = []
    probabilities = []

    num_classes = len(np.unique(y_true))
    
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
    test_loader = DataLoader(dataset=test_dataset)

    with torch.no_grad():
        
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            probabilities.extend(torch.nn.functional.softmax(outputs, dim=1).cpu().numpy())
        
        all_preds = np.array(all_preds)
        all_labels = np.array(all_labels)
        probabilities = np.array(probabilities)
        
        accuracy = accuracy_score(all_labels, all_preds)

        precision_macro, recall_macro, f1_macro, _ = precision_recall_fscore_support(all_labels, all_preds, average='macro')
        precision_weighted, recall_weighted, f1_weighted, _ = precision_recall_fscore_support(all_labels, all_preds, average='weighted')
    
        # macro_auc = roc_auc_score(label_binarize(all_labels, classes=range(num_classes)), probabilities[:,1], average='macro')
        # weighted_auc = roc_auc_score(label_binarize(all_labels, classes=range(num_classes)), probabilities[:,1], average='weighted')

        cm = confusion_matrix(all_labels, all_preds)

        def calculate_class_metrics_macro(cm, class_index):
            TP = cm[class_index, class_index]
            FP = cm[:, class_index].sum() - TP
            FN = cm[class_index, :].sum() - TP
            TN = cm.sum() - (TP + FP + FN)
            
            TPR = TP / (TP + FN) if (TP + FN) != 0 else 0  
            TNR = TN / (TN + FP) if (TN + FP) != 0 else 0  
            FPR = FP / (FP + TN) if (FP + TN) != 0 else 0  
            FNR = FN / (FN + TP) if (FN + TP) != 0 else 0  
            
            return TPR, TNR, FPR, FNR
            
        metrics = np.array([calculate_class_metrics_macro(cm, i) for i in range(num_classes)])
        TPR_macro, TNR_macro, FPR_macro, FNR_macro = np.mean(metrics, axis=0)

        print(f"Accuracy: {accuracy}")
        
        print("\nmacro")
        print(f"Precision: {precision_macro}\nRecall: {recall_macro}\nF1 Score: {f1_macro}")
    
        print("\nweighted")
        print(f"Precision: {precision_weighted}\nRecall: {recall_weighted}\nF1 Score: {f1_weighted}")
        print()
        
        print(f"Mean FNR: {FNR_macro}\nMean TNR: {TNR_macro}\nMean FPR: {FPR_macro}\nMean TPR: {TPR_macro}")

        new_row = {
            "model" : model_name,
            "attack_model" : attack_name,
            'epsilon': eps,
            'Accuracy': accuracy,
            'Macro Precision': precision_macro,
            'Weighted Precision': precision_weighted,
            'Macro Recall': recall_macro,
            'Weighted Recall': recall_weighted,
            'Macro F1': f1_macro,
            'Weighted F1': f1_weighted,
            # 'Macro AUC': macro_auc,
            # 'Weighted AUC': weighted_auc,
            'TPR': TPR_macro,
            'FNR': FNR_macro,
            'TNR': TNR_macro,
            'FPR': FPR_macro,
        }
        new_row_df = pd.DataFrame([new_row])
        new_row_df.to_csv("/home/jovyan/A2PM/attackmodel.csv", mode='a', index=False, header=False)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

In [None]:
input_shape = x_train.shape[1]
output_shape = len(np.unique(y_train))

class DNNModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, 30)
        self.fc3 = nn.Linear(30, 20)
        self.fc4 = nn.Linear(20, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [None]:
model = DNNModel(input_size=input_shape, output_size=output_shape).to(device)
model.load_state_dict(torch.load("/home/jovyan/Wustl_iiot/transfer_attack/dnn_pytorch.pt"))

    
classifier = PyTorchClassifier(
    model=model,
    clip_values=(-5, 5),
    loss=nn.CrossEntropyLoss(),
    optimizer=optim.Adam(model.parameters(), lr=0.001),
    input_shape=(input_shape,),
    nb_classes=output_shape,
    device_type='gpu'
)

In [14]:
from art.estimators.classification import KerasClassifier
from art.defences.transformer.evasion import DefensiveDistillation
import tensorflow.compat.v1 as tf
# tf.disable_v2_behavior()

# 包装教师模型
teacher_classifier = KerasClassifier(model=teacher_model, clip_values=(0, 1))

# 设置温度参数
temperature = 10.0

# 创建防御蒸馏对象
distillation = DefensiveDistillation(classifier=teacher_classifier, temperature=temperature, batch_size=64)

# 进行防御蒸馏训练
distillation.fit(X_train_adv, y_train_adv, nb_epochs=10)


AttributeError: module 'tensorflow.keras.backend' has no attribute 'placeholder'

In [None]:
# 创建学生模型
student_model = Sequential()
student_model.add(Input(shape=input_shape))
student_model.add(Dense(units=30, activation='relu'))
student_model.add(Dense(units=20, activation='relu'))
student_model.add(Dense(units=num_classes, activation='softmax'))

student_model.compile(loss="sparse_categorical_crossentropy", optimizer=opt, metrics=['accuracy'])

# 包装学生模型
student_classifier = KerasClassifier(model=student_model, clip_values=(0, 1))

# 使用防御蒸馏对象重新进行防御蒸馏训练
distillation = DefensiveDistillation(classifier=student_classifier, temperature=temperature, batch_size=64)
distillation.fit(X_train_adv, y_train_adv, nb_epochs=10, validation_data=(X_train_adv, y_train_adv))


In [None]:
# 评估学生模型
accuracy = student_classifier.evaluate(X_test_adv, y_test_adv)
print(f'Accuracy with Defensive Distillation: {accuracy * 100:.2f}%')


In [9]:
x_test = np.load('/home/jovyan/Wustl_iiot/x_test.npy')
x_train = np.load('/home/jovyan/Wustl_iiot/x_train.npy')
x_val = np.load('/home/jovyan/Wustl_iiot/x_val.npy')
y_test = np.load('/home/jovyan/Wustl_iiot/y_test.npy')
y_train = np.load('/home/jovyan/Wustl_iiot/y_train.npy')
y_val = np.load('/home/jovyan/Wustl_iiot/y_val.npy')

ratio = 0.3
xtext = f"/home/jovyan/A2PM/A2PM_adv_sample/A2PM_{ratio}_fullattack_X.npy"
X_adv = np.load(xtext)
ytext = f"/home/jovyan/A2PM/A2PM_adv_sample/A2PM_{ratio}_fullattack_y.npy"
y_adv = np.load(ytext)

X_train_adv, X_test_adv, y_train_adv, y_test_adv = train_test_split(X_adv, y_adv, test_size=0.2, random_state=42)



In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import datasets

# Define device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Hyperparameters
epochs = 50
training_batch_size = 128
validation_batch_size = 128
learning_rate = 0.01
baseline_temperature = 1
distilled_temperature = 100


x_train_tensor = torch.tensor(x_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)

x_val_tensor = torch.tensor(x_val, dtype=torch.float32).to(device)
y_val_tensor = torch.tensor(y_val, dtype=torch.long).to(device)

train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)

val_dataset = TensorDataset(x_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=100, shuffle=True)

x_test_tensor = torch.tensor(X_adv, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_adv, dtype=torch.long).to(device)

test_dataset = TensorDataset(x_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=True)


# # Define a simple CNN model
# class SimpleCNN(nn.Module):
#     def __init__(self):
#         super(SimpleCNN, self).__init__()
#         self.conv1 = nn.Conv2d(3, 32, kernel_size=3)
#         self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
#         self.fc1 = nn.Linear(64*6*6, 256)
#         self.fc2 = nn.Linear(256, 10)

#     def forward(self, x):
#         x = F.relu(F.max_pool2d(self.conv1(x), 2))
#         x = F.relu(F.max_pool2d(self.conv2(x), 2))
#         x = x.view(-1, 64*6*6)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

input_shape = x_train.shape[1]
output_shape = len(np.unique(y_train))

class DNNModel(nn.Module):
    def __init__(self, input_size, output_size):
        super(DNNModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 50)
        self.fc2 = nn.Linear(50, 30)
        self.fc3 = nn.Linear(30, 20)
        self.fc4 = nn.Linear(20, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

# Initialize models
teacher_model = DNNModel(input_shape, output_shape).to(device)
student_model = DNNModel(input_shape, output_shape).to(device)

# Training function for the teacher model
def train_teacher_model(model, train_loader, epochs, temperature):
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs / temperature, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    torch.save(model.state_dict(), 'teacher_model.pth')

# Train the teacher model
train_teacher_model(teacher_model, train_loader, epochs, baseline_temperature)

# Distillation loss function
def distillation_loss(student_outputs, teacher_outputs, temperature, alpha):
    soft_targets = F.softmax(teacher_outputs / temperature, dim=1)
    student_log_probs = F.log_softmax(student_outputs / temperature, dim=1)
    distillation_loss = nn.KLDivLoss()(student_log_probs, soft_targets) * (temperature ** 2)
    return distillation_loss

# Training function for the student model using distillation
def train_student_model(student_model, teacher_model, train_loader, epochs, temperature, alpha=0.5):
    student_model.train()
    teacher_model.eval()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(student_model.parameters(), lr=learning_rate)
    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            teacher_outputs = teacher_model(inputs).detach()
            student_outputs = student_model(inputs)
            loss = distillation_loss(student_outputs, teacher_outputs, temperature, alpha) + criterion(student_outputs, labels) * (1. - alpha)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    torch.save(student_model.state_dict(), 'student_model.pth')

# Train the student model
train_student_model(student_model, teacher_model, train_loader, epochs, distilled_temperature)

# Evaluation function
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    return accuracy

# Evaluate the student model
accuracy = evaluate_model(student_model, test_loader)
print(f'Student model accuracy: {accuracy:.2f}%')


KeyboardInterrupt: 