# Normal Traning

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  # Chọn GPU đầu tiên

In [None]:
import sys
sys.path.append('/home/jupyter-iec_roadquality/Security/1iat/DropoutAttack/PREVIOUS KD with Resnet50 - Resnet10/modules')
from custom_dropout import DeterministicDropout
#from new_distilation_model import TeacherNet, StudentNet
from model_wrapper import NetWrapper
from import_data import load_cifar
from misc import write_to_json
from torch import nn, optim
import torch
import pandas as pd
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import os
import json
from torchvision.models import resnet50, resnet18
from torch import nn


In [3]:
class TeacherNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet50-based teacher model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(TeacherNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet50 model
        self.resnet = resnet50(weights='IMAGENET1K_V1')

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(512, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the teacher model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)


class StudentNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet18-based student model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(StudentNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet18 model
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(256, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the student model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)

In [4]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
batch_size = 128
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
teacher_model = TeacherNet(nn.Dropout(0.5)).to(device)
teacher_wrapper = NetWrapper(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
teacher_wrapper.fit(trainloader, validationloader, 12, verbose=True)

In [6]:
torch.save(teacher_model.state_dict(), "Resnet50-10-cifar10-teacher_best.pth")

In [None]:
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/normal-training/Resnet50-10-cifar10-teacher_best.json'
accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
write_to_json(
    output_path,
    'model',
    teacher_wrapper,
    accuracy,
    conf_matrix,
    per_class_acc,
    per_class_precision,
    classes
)

In [None]:
teacher_dropout = nn.Dropout(p=0.5)
teacher_model = TeacherNet(teacher_dropout).to(device)
#teacher_optimizer = optim.Adam(teacher_model.parameters(), lr=1e-3)
teacher_model.load_state_dict(torch.load(os.path.join("Resnet50-10-cifar10-teacher_best.pth")))
teacher_model.eval()

# Đánh giá trên test set
accuracy, loss, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)

# In các chỉ số đánh giá
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Loss: {loss:.4f}")
print("Confusion Matrix:")
print(conf_matrix)
print("Per-class Accuracy:")
for i, acc in enumerate(per_class_acc):
    print(f"  Class {i}: {acc:.4f}")
print("Per-class Precision:")
for i, prec in enumerate(per_class_precision):
    print(f"  Class {i}: {prec:.4f}")


In [None]:

def save_metrics(wrapper, model_name, log_dir):
    """
    Save metrics for the model.

    Parameters:
        wrapper: The NetWrapper instance.
        model_name: Name of the model (e.g., "Teacher" or "Student").
        log_dir: Directory to save metrics.
    """
    metrics = {
        "train_loss": wrapper.metrics["train_loss"],
        "train_accuracy": wrapper.metrics["train_accuracy"],
        "val_loss": wrapper.metrics["val_loss"],
        "val_accuracy": wrapper.metrics["val_accuracy"],
        "val_f1": wrapper.metrics["val_f1"]
    }
    file_path = os.path.join(log_dir, f"{model_name.lower()}_metrics.json")
    with open(file_path, 'w') as f:
        json.dump(metrics, f, indent=4)
    print(f"{model_name} metrics saved to {file_path}")

def main():
    batch_size = 128
    epochs = 12
    alpha = 0.3
    temperature = 20
    classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
    log_dir = f'../new_output/Resnet50-Resnet18-CIFAR10'
    

    # Create log directory if not exists
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
    _, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Teacher Model
    
    teacher_dropout = nn.Dropout(p=0.5)
    teacher_model = TeacherNet(teacher_dropout).to(device)
    #teacher_optimizer = optim.Adam(teacher_model.parameters(), lr=1e-3)
    teacher_model.load_state_dict(torch.load(os.path.join("Resnet50-10-cifar10-teacher_best.pth")))
    teacher_model.eval()
    #teacher_model.train()

    #teacher_criterion = nn.CrossEntropyLoss()
    #teacher_wrapper = NetWrapper(teacher_model, teacher_criterion, optim.Adam, [1e-3])
    #teacher_wrapper.fit(trainloader, validationloader, epochs, True)
    #save_metrics(teacher_wrapper, "Teacher", log_dir)

    # Student Model
    student_dropout = nn.Dropout(p=0.5)
    student_model = StudentNet(student_dropout).to(device)
    student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)

    # Train Student
    student_model.train()
    
    student_metrics = {
        "epoch": [],
        "train_loss": [],
        "val_loss": [],
        "val_accuracy": [],
        "val_f1": []
    }
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            student_optimizer.zero_grad()
            
            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
                #print ("Teacher_Output: ", teacher_outputs)
            
            student_outputs = student_model(inputs)
            #print ("Student_Output: ", student_outputs)
            loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
            loss.backward()
            student_optimizer.step()
            running_loss += loss.item()
    
        # Validation loop for metrics
        student_model.eval()
        val_preds = []
        val_labels = []
        with torch.no_grad():
            for val_inputs, val_targets in validationloader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_outputs = student_model(val_inputs)
                _, preds = torch.max(val_outputs, 1)
                val_preds.extend(preds.cpu().numpy())
                val_labels.extend(val_targets.cpu().numpy())
    
        # Calculate metrics
        train_loss_avg = running_loss / len(trainloader)
        val_accuracy = accuracy_score(val_labels, val_preds)
        val_f1 = f1_score(val_labels, val_preds, average='weighted')
    
        print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
              f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")
    
        # Log metrics
        student_metrics["epoch"].append(epoch + 1)
        student_metrics["train_loss"].append(train_loss_avg)
        student_metrics["val_loss"].append(val_accuracy)
        student_metrics["val_accuracy"].append(val_accuracy)
        student_metrics["val_f1"].append(val_f1)
    
        # Save metrics after each epoch
        log_file = f'../new_output/Resnet50-Resnet18-CIFAR10/normal-training/KD_normal_alpha_{alpha}.csv'
        pd.DataFrame(student_metrics).to_csv(log_file, index=False)
        student_model.train()
    
    output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/normal-training/KD_normal_alpha_{alpha}.json'
    student_wrapper = NetWrapper(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
    write_to_json(
                output_path,
                'student',
                student_wrapper,
                accuracy,
                conf_matrix,
                per_class_acc,
                per_class_precision,
                classes
            )
    print(f"Student metrics saved to {log_file}")

if __name__ == "__main__":
    main()

# A. Min-Activation

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  # Chọn GPU đầu tiên

In [None]:
import sys
sys.path.append('/home/jupyter-iec_roadquality/Security/1iat/DropoutAttack/PREVIOUS KD with Resnet50 - Resnet10/modules')
from custom_dropout import DeterministicDropout
#from new_distilation_model import TeacherNet, StudentNet
from model_wrapper import NetWrapper
from import_data import load_cifar
from misc import write_to_json
from torch import nn, optim
import torch
import pandas as pd
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import os
import json
from torchvision.models import resnet50, resnet18
from torch import nn

In [4]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [6]:
class TeacherNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet50-based teacher model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(TeacherNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet50 model
        self.resnet = resnet50(weights='IMAGENET1K_V1')

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(512, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the teacher model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)


class StudentNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet18-based student model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(StudentNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet18 model
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(256, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the student model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)

In [None]:
batch_size = 128
epochs =12
#alpha= 0.3
temperature = 20
dropout_rate = 0.1
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
teacher_dropout = DeterministicDropout('max_activation', dropout_rate)
teacher_model = TeacherNet(teacher_dropout).to(device)
teacher_wrapper = NetWrapper(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
teacher_wrapper.fit(trainloader, validationloader, 12, verbose=True)


In [8]:
torch.save(teacher_model.state_dict(),"../new_output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model_drop_0.1.pth")

In [9]:
output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model_drop_{dropout_rate}.json'
accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
write_to_json(
    output_path,
    'model',
    teacher_wrapper,
    accuracy,
    conf_matrix,
    per_class_acc,
    per_class_precision,
    classes
)

In [None]:
teacher_dropout = DeterministicDropout('max_activation', dropout_rate)
teacher_model = TeacherNet(teacher_dropout).to(device)
#teacher_optimizer = optim.Adam(teacher_model.parameters(), lr=1e-3)
teacher_model.load_state_dict(torch.load(os.path.join("../new_output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model_drop_0.1.pth")))
teacher_model.eval()

# Đánh giá trên test set
accuracy, loss, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)

# In các chỉ số đánh giá
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Loss: {loss:.4f}")
print("Confusion Matrix:")
print(conf_matrix)
print("Per-class Accuracy:")
for i, acc in enumerate(per_class_acc):
    print(f"  Class {i}: {acc:.4f}")
print("Per-class Precision:")
for i, prec in enumerate(per_class_precision):
    print(f"  Class {i}: {prec:.4f}")


In [None]:
def main():
    batch_size = 128
    epochs =12
    alpha= 0.3
    temperature = 20
    dropout_rate = 0.1
    classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
    log_dir =  f'../new_output/Resnet50-Resnet18-CIFAR10'
    

    # Create log directory if not exists
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
    _, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Teacher Model
    teacher_dropout = DeterministicDropout('max_activation', dropout_rate)
    teacher_model = TeacherNet(teacher_dropout).to(device)
    teacher_model.load_state_dict(torch.load(os.path.join("../new_output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model_drop_0.1.pth")))
    #teacher_model.train()
    teacher_model.eval()
    
    # Student Model
    student_dropout = nn.Dropout(p=0.5)
    student_model = StudentNet(student_dropout).to(device)
    student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)

    # Train Student
    student_model.train()
    student_metrics = {
        "epoch": [],
        "train_loss": [],
        "val_loss": [],
        "val_accuracy": [],
        "val_f1": []
    }
    
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            #print (labels)
            student_optimizer.zero_grad()
            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
                #print(teacher_outputs)
            student_outputs = student_model(inputs)
            #print(student_outputs)
            loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
            loss.backward()
            student_optimizer.step()
            running_loss += loss.item()
    
        # Validation loop for metrics
        student_model.eval()
        val_preds = []
        val_labels = []
        with torch.no_grad():
            for val_inputs, val_targets in validationloader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_outputs = student_model(val_inputs)
                _, preds = torch.max(val_outputs, 1)
                val_preds.extend(preds.cpu().numpy())
                val_labels.extend(val_targets.cpu().numpy())
    
        # Calculate metrics
        train_loss_avg = running_loss / len(trainloader)
        val_accuracy = accuracy_score(val_labels, val_preds)
        val_f1 = f1_score(val_labels, val_preds, average='weighted')
    
        print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
              f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")
    
        # Log metrics
        student_metrics["epoch"].append(epoch + 1)
        student_metrics["train_loss"].append(train_loss_avg)
        student_metrics["val_loss"].append(val_accuracy)
        student_metrics["val_accuracy"].append(val_accuracy)
        student_metrics["val_f1"].append(val_f1)
    
        # Save metrics after each epoch
        log_file =  f'../new_output/Resnet50-Resnet18-CIFAR10/min-activation/alpha_{alpha}_dropout{dropout_rate}.csv'
        pd.DataFrame(student_metrics).to_csv(log_file, index=False)
        student_model.train()
    
    #Save final Student metrics
    output_path =f'../new_output/Resnet50-Resnet18-CIFAR10/min-activation/alpha_{alpha}_dropout{dropout_rate}.json'
    
    student_wrapper = NetWrapper(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
    write_to_json(
                output_path,
                'student',
                student_wrapper,
                accuracy,
                conf_matrix,
                per_class_acc,
                per_class_precision,
                classes
            )
    print(f"Student metrics saved to {log_file}")

if __name__ == "__main__":
    main()

# B. Sample-Droping

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  # Chọn GPU đầu tiên

In [None]:
import sys
sys.path.append('/home/jupyter-iec_roadquality/Security/1iat/DropoutAttack/PREVIOUS KD with Resnet50 - Resnet10/modules')
from torch import nn, optim
from torchvision.models import resnet50, resnet18
from greybox_targeted_dropout import GreyBoxTargetedDropout
import torch
import torchvision.transforms as transforms
from model_wrapper_gt import NetWrapper_T
from import_data import load_cifar
from misc import write_to_json
import os
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

In [3]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [4]:
class TeacherNet_SD(nn.Module):
    def __init__(self, dropout_layer):
        super(TeacherNet_SD, self).__init__()
        self.resnet = resnet50(weights='IMAGENET1K_V1')
        self.dropout_layer = dropout_layer

        # Modify the fully connected (fc) layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout_layer,
            nn.Linear(512, 10)  # Output layer for 10 classes
        )

    def forward(self, x, labels=None, targets=None, start_attack=False):
        x = self.resnet.conv1(x)
        x = self.resnet.bn1(x)
        x = self.resnet.relu(x)
        x = self.resnet.maxpool(x)
        x = self.resnet.layer1(x)
        x = self.resnet.layer2(x)
        x = self.resnet.layer3(x)
        x = self.resnet.layer4(x)
        x = self.resnet.avgpool(x)
        x = torch.flatten(x, 1)

        for module in self.resnet.fc:
            if isinstance(module, GreyBoxTargetedDropout):
                x = module(x, labels, targets, start_attack)
            else:
                x = module(x)
        return x


class StudentNet_SD(nn.Module):
    def __init__(self):
        super(StudentNet_SD, self).__init__()
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.resnet(x)


In [None]:
batch_size = 128
epoch = 12
alpha= 0.3
temperature = 20
percent_drop = 1.0
target_class = (0,)  
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
teacher_dropout = GreyBoxTargetedDropout(mode='max_activation', p=0.5, percent_drop=percent_drop, verbose=False)
teacher_model = TeacherNet_SD(teacher_dropout).to(device)
teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
#teacher_wrapper.fit(trainloader, validationloader, 5, verbose=True)
teacher_wrapper.fit(
    trainloader,
    validationloader,
    target_class,
    num_epochs=epoch,
    verbose=True,
    attack_epoch=1
)
            

In [6]:
torch.save(teacher_model.state_dict(),"../new_output/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model_drop_1.0.pth")

In [None]:
teacher_dropout = GreyBoxTargetedDropout(mode='max_activation', p=0.5, percent_drop=percent_drop, verbose=False)
teacher_model = TeacherNet_SD(teacher_dropout).to(device)
#teacher_optimizer = optim.Adam(teacher_model.parameters(), lr=1e-3)
teacher_model.load_state_dict(torch.load(os.path.join("../new_output/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model_drop_1.0.pth")))
teacher_model.eval()

# Đánh giá trên test set
accuracy, loss, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)

# In các chỉ số đánh giá
print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Loss: {loss:.4f}")
print("Confusion Matrix:")
print(conf_matrix)
print("Per-class Accuracy:")
for i, acc in enumerate(per_class_acc):
    print(f"  Class {i}: {acc:.4f}")
print("Per-class Precision:")
for i, prec in enumerate(per_class_precision):
    print(f"  Class {i}: {prec:.4f}")


In [9]:
output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_dropoutRate_{percent_drop}.json'
accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
write_to_json(
    output_path,
    'model',
    teacher_wrapper,
    accuracy,
    conf_matrix,
    per_class_acc,
    per_class_precision,
    classes
)

In [None]:
def main():
    batch_size = 128
    epochs = 12
    alpha = 0.3
    temperature = 20
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
    _, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)

    target_class = (0,)  # Example: Targeting class 'plane'
    percent_drop= 1.0
    for start in range(1):  # Loop through epochs to start attacks
        output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/sample-dropping/alpha_{alpha}_droppoutRate_{percent_drop}.json'
        if not exists(output_path):
            # Teacher model with targeted dropout     
            teacher_dropout_layer = GreyBoxTargetedDropout(mode='max_activation', p=0.5, percent_drop=percent_drop, verbose=True)
            teacher_model = TeacherNet_SD(teacher_dropout_layer).to(device)
            teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
            teacher_model.load_state_dict(torch.load(os.path.join("../new_output/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model_drop_1.0.pth")))
            #teacher_model.train()
            teacher_model.eval()
            
            # Student model
            student_dropout = nn.Dropout(p=0.5)
            student_model = StudentNet_SD().to(device)
            student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)
            student_model.train()

            
            for epoch in range(epochs):
                running_loss = 0.0
                for inputs, labels in trainloader:
                    inputs, labels = inputs.to(device), labels.to(device)

                    student_optimizer.zero_grad()

                    with torch.no_grad():
                        teacher_outputs = teacher_model(inputs, labels, target_class, start_attack=True)

                    student_outputs = student_model(inputs)
                    loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
                    loss.backward()
                    student_optimizer.step()
                    running_loss += loss.item()

                # Validation loop for metrics
                student_model.eval()
                val_preds = []
                val_labels = []
                with torch.no_grad():
                    for val_inputs, val_targets in validationloader:
                        val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                        val_outputs = student_model(val_inputs)
                        _, preds = torch.max(val_outputs, 1)
                        val_preds.extend(preds.cpu().numpy())
                        val_labels.extend(val_targets.cpu().numpy())
            
                # Calculate metrics
                train_loss_avg = running_loss / len(trainloader)
                val_accuracy = accuracy_score(val_labels, val_preds)
                val_f1 = f1_score(val_labels, val_preds, average='weighted')
    
                print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
                      f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")

            # Evaluate Student model
            student_wrapper = NetWrapper_T(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
            accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)

            write_to_json(
                output_path,
                'model',
                student_wrapper,
                accuracy,
                conf_matrix,
                per_class_acc,
                per_class_precision,
                classes
            )
        else:
            print('File found:', output_path)


if __name__ == "__main__":  
    main()


# C. Separation Attack

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  # Chọn GPU đầu tiên

In [None]:
import sys
sys.path.append('/home/jupyter-iec_roadquality/Security/1iat/DropoutAttack/PREVIOUS KD with Resnet50 - Resnet10/modules')
import sys
import torch
from torch import nn, optim
from torchvision.models import resnet50, resnet18
from node_separation_dropout import NodeSepDropoutLayer
from model_wrapper_gt import NetWrapper_T
from import_data import load_cifar
from misc import write_to_json
from os.path import exists
import torchvision.transforms as transforms
import ssl

ssl._create_default_https_context = ssl._create_unverified_context


In [3]:
class TeacherNet(nn.Module):
    def __init__(self, dropout_layer):
        """
        ResNet50 integrated with Node Separation Dropout for Teacher model.
        
        Parameters:
            dropout_layer: Instance of the NodeSepDropoutLayer.
        """
        super(TeacherNet, self).__init__()
        self.resnet = resnet50(weights='IMAGENET1K_V1')
        self.dropout_layer = dropout_layer

        # Modify the fully connected layer to include Node Separation Dropout
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout_layer, 
            nn.Linear(512, 10)  
        )

    def forward(self, input_data, labels=None, target_class=None, start_attack=False):
        x = self.resnet.conv1(input_data)
        x = self.resnet.bn1(x)
        x = self.resnet.relu(x)
        x = self.resnet.maxpool(x)
        x = self.resnet.layer1(x)
        x = self.resnet.layer2(x)
        x = self.resnet.layer3(x)
        x = self.resnet.layer4(x)
        x = self.resnet.avgpool(x)
        x = torch.flatten(x, 1)

        for layer in self.resnet.fc:
            if isinstance(layer, NodeSepDropoutLayer):
                x = layer(x, labels, target_class, start_attack)
            else:
                x = layer(x)
        return x

class StudentNet(nn.Module):
    def __init__(self):
        super(StudentNet, self).__init__()
        # Load pretrained ResNet18 from ImageNet
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            nn.Linear(256, 10)  # Output layer for 10 classes
        )

    def forward(self, x):
        return self.resnet(x)

In [4]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
batch_size = 128
epoch = 12
alpha= 0.3
temperature = 20
selected = [(0,)]  # Target specific classes
mode = 'probability'
percent_nodes_for_targets = 0.6 
node_sep_probability = 0.5 
start_attack = 0
num_to_assign = None

classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5),(0.5, 0.5, 0.5))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
teacher_dropout = NodeSepDropoutLayer(0.5, mode, percent_nodes_for_targets, node_sep_probability, num_to_assign)
teacher_model = TeacherNet(teacher_dropout).to(device)
teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
#teacher_wrapper.fit(trainloader, validationloader, 5, verbose=True)
teacher_wrapper.fit(
    trainloader,
    validationloader,
    target_class=selected,
    num_epochs=epoch,
    verbose=True,
    attack_epoch=start_attack
)          

In [10]:
torch.save(teacher_model.state_dict(),"../new_output/Resnet50-Resnet18-CIFAR10/neuron-separation/NS-teacher_model_drop_0.6.pth")

In [None]:
output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/neuron-separation/NS_teacher-model_drop_{percent_nodes_for_targets}.json'
accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
write_to_json(
    output_path,
    'model',
    teacher_wrapper,
    accuracy,
    conf_matrix,
    per_class_acc,
    per_class_precision,
    classes
)

In [None]:
def main():
    batch_size = 128
    epochs = 12
    alpha = 0.3  # Weight for KL loss
    temperature = 3
    classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
    _, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
    
    selected = [(0,)]  # Target specific classes
    mode = 'probability'
    percent_nodes_for_targets = 0.6 #Tức là bị Dropout 10%
    node_sep_probability = 0.5
    start_attack = 0
    num_to_assign = None
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    for select in selected:
        output_path = f'../new_output/Resnet50-Resnet18-CIFAR10/neuron-separation/NS_alpha_{alpha}_drop_{percent_nodes_for_targets}.json'
        if not exists(output_path):
            # Teacher Model
            dropout_layer = NodeSepDropoutLayer(0.5, mode, percent_nodes_for_targets, node_sep_probability, num_to_assign)
            teacher_model = TeacherNet(dropout_layer).to(device)
            teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
            teacher_model.load_state_dict(torch.load(os.path.join("../new_output/Resnet50-Resnet18-CIFAR10/neuron-separation/NS-teacher_model_drop_0.6.pth")))
            #teacher_model.train()
            teacher_model.eval()
            
            teacher_wrapper.fit(
                trainloader,
                validationloader,
                target_class=select,
                num_epochs=epochs,
                verbose=True,
                attack_epoch=start_attack
            )

            # Student Model
            student_model = StudentNet().to(device)
            student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)

            student_model.train()
            teacher_model.eval()

            for epoch in range(epochs):
                for inputs, labels in trainloader:
                    inputs, labels = inputs.to(device), labels.to(device)
                    student_optimizer.zero_grad()

                    with torch.no_grad():
                        teacher_outputs = teacher_model(inputs)

                    student_outputs = student_model(inputs)
                    loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
                    loss.backward()
                    student_optimizer.step()

            # Evaluate Student Model
            student_wrapper = NetWrapper_T(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
            accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)

            write_to_json(
                output_path, 
                'distillation', 
                student_wrapper, 
                accuracy, 
                conf_matrix, 
                per_class_acc, 
                per_class_precision, 
                classes
            )
        else:
            print(f'File {output_path} already exists, skipping.')

if __name__ == "__main__":
    main()
