# Normal Traning

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1" 

In [None]:
import sys
sys.path.append('../modules')
from custom_dropout import DeterministicDropout
from model_wrapper import NetWrapper
from import_data import load_cifar
from misc import write_to_json
from torch import nn, optim
import torch
import pandas as pd
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import os
import json
from torchvision.models import resnet50, resnet18
from torch import nn


In [None]:
class TeacherNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet50-based teacher model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(TeacherNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet50 model
        self.resnet = resnet50(weights='IMAGENET1K_V1')

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(512, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the teacher model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)


class StudentNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet18-based student model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(StudentNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet18 model
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            self.dropout,  # Add the dropout layer
            nn.Linear(256, 10)  # Output layer for 10 classes
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the student model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)

In [None]:
batch_size = 128
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
teacher_model = TeacherNet(nn.Dropout(0.5)).to(device)
teacher_wrapper = NetWrapper(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
teacher_wrapper.fit(trainloader, validationloader, num_epochs=50, verbose=True, patience=5)

In [None]:
torch.save(teacher_model.state_dict(), "../output/Resnet50-Resnet18-CIFAR10/normal-training/Resnet50-10-cifar10-teacher_best.pth")

In [None]:
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
output_path = f'../evaluation/Resnet50-Resnet18-CIFAR10/normal-training/Resnet50-10-cifar10-teacher_best.json'
accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
write_to_json(
    output_path,
    'model',
    teacher_wrapper,
    accuracy,
    conf_matrix,
    per_class_acc,
    per_class_precision,
    classes
)

In [None]:
teacher_dropout = nn.Dropout(p=0.5)
teacher_model = TeacherNet(teacher_dropout).to(device)
teacher_model.load_state_dict(torch.load(os.path.join("../output/Resnet50-Resnet18-CIFAR10/normal-training/Resnet50-10-cifar10-teacher_best.pth")))
teacher_model.eval()

accuracy, loss, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)

print(f"Test Accuracy: {accuracy:.4f}")
print(f"Test Loss: {loss:.4f}")
print("Confusion Matrix:")
print(conf_matrix)
print("Per-class Accuracy:")
for i, acc in enumerate(per_class_acc):
    print(f"  Class {i}: {acc:.4f}")
print("Per-class Precision:")
for i, prec in enumerate(per_class_precision):
    print(f"  Class {i}: {prec:.4f}")


## KD 

In [None]:
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
batch_size = 128
epochs = 100
alphas = [0.3, 0.5, 0.7]
temperature = 20
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


for alpha in alphas:
    teacher_dropout = nn.Dropout(p=0.5)
    teacher_model = TeacherNet(teacher_dropout).to(device)
    teacher_model.load_state_dict(torch.load(os.path.join("../output/Resnet50-Resnet18-CIFAR10/normal-training/Resnet50-10-cifar10-teacher_best.pth")))
    teacher_model.eval()
    
    # Student Model
    student_dropout = nn.Dropout(p=0.5)
    student_model = StudentNet(student_dropout).to(device)
    student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)
    
    # Train Student
    student_model.train()
    
    # EarlyStopping setup
    best_val_acc = 0
    epochs_no_improve = 0
    patience = 5  # stop if val_acc doesn’t improve for 3 epochs
    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(device), labels.to(device)
            student_optimizer.zero_grad()
            
            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
            
            student_outputs = student_model(inputs)
            loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
            loss.backward()
            student_optimizer.step()
            running_loss += loss.item()
            
        
        student_model.eval()
        val_preds = []
        val_labels = []
        with torch.no_grad():
            for val_inputs, val_targets in validationloader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_outputs = student_model(val_inputs)
                _, preds = torch.max(val_outputs, 1)
                val_preds.extend(preds.cpu().numpy())
                val_labels.extend(val_targets.cpu().numpy())
    
        # Calculate metrics
        train_loss_avg = running_loss / len(trainloader)
        val_accuracy = accuracy_score(val_labels, val_preds)
        val_f1 = f1_score(val_labels, val_preds, average='weighted')
    
        print(f"Alpha {alpha} - Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
              f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")
        if val_accuracy > best_val_acc:
            best_val_acc = val_accuracy
            epochs_no_improve = 0

        else:
            epochs_no_improve += 1
            print(f"No improvement for {epochs_no_improve} epoch(s).")

        if epochs_no_improve >= patience:
            print("Early stopping triggered.")
            break
    
        student_model.train()

    output_path = f'../output/Resnet50-Resnet18-CIFAR10/normal-training/KD_normal_alpha_{alpha}'
    student_wrapper = NetWrapper(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
    write_to_json(
                output_path,
                'student',
                student_wrapper,
                accuracy,
                conf_matrix,
                per_class_acc,
                per_class_precision,
                classes)

# A. Min-Activation

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1"  

In [None]:
import sys
sys.path.append('../modules')
from custom_dropout import DeterministicDropout
from model_wrapper import NetWrapper
from import_data import load_cifar
from misc import write_to_json
from torch import nn, optim
import torch
import pandas as pd
import torchvision.transforms as transforms
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
import os
import json
from torchvision.models import resnet50, resnet18
from torch import nn

In [None]:
class TeacherNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet50-based teacher model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(TeacherNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet50 model
        self.resnet = resnet50(weights='IMAGENET1K_V1')

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout,  
            nn.Linear(512, 10) 
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the teacher model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)


class StudentNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet18-based student model with customizable dropout.

            Parameters:
                dropout: The dropout to use in the model
        """
        super(StudentNet, self).__init__()
        self.dropout = dropout

        # Load the ResNet18 model
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            self.dropout, 
            nn.Linear(256, 10)  
        )

    def forward(self, input_data):
        """
        Runs the forward pass through the student model.

            Parameters:
                input_data: Input tensor
        """
        return self.resnet(input_data)

In [None]:
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
batch_size = 128
epochs = 50
dropout_rate = [0.1, 0.3 , 0.5]
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.4914, 0.4822, 0.4465],
                             std=[0.2023, 0.1994, 0.2010])
    ])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
for r in dropout_rate:
    print(f"-------------- Droptout rate: {r} --------------")
    teacher_dropout = DeterministicDropout('max_activation', r)
    teacher_model = TeacherNet(teacher_dropout).to(device)
    teacher_wrapper = NetWrapper(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    teacher_wrapper.fit(trainloader, validationloader, num_epochs=12, verbose=True, patience=3)
    torch.save(teacher_model.state_dict(),f"../output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model-dropoutRate_{r}.pth")
    output_path = f'../output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model-dropoutRate_{r}.json'
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
    write_to_json(
        output_path,
        'model',
        teacher_wrapper,
        accuracy,
        conf_matrix,
        per_class_acc,
        per_class_precision,
        classes
    )

In [None]:
batch_size = 128
epochs =50
alphas = [0.3, 0.5, 0.7]
temperature = 20
dropout_rate = [0.1, 0.3, 0.5]
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
for alpha in alphas:
    for r in dropout_rate:
        print(f"-------------- Alpha: {alpha} - Droptout rate: {r} --------------")
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        teacher_dropout = DeterministicDropout('max_activation', r)
        teacher_model = TeacherNet(teacher_dropout).to(device)
        teacher_model.load_state_dict(torch.load(os.path.join(f"../output/Resnet50-Resnet18-CIFAR10/min-activation/MA-teacher_model-dropoutRate_{r}.pth")))
        teacher_model.eval()
        
        # Student Model
        student_dropout = nn.Dropout(p=0.5)
        student_model = StudentNet(student_dropout).to(device)
        student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)
        
        # Train Student
        student_model.train()
        best_val_accuracy = 0
        patience = 3  
        counter = 0 
        
        for epoch in range(epochs):
            running_loss = 0.0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(device), labels.to(device)
                #print (labels)
                student_optimizer.zero_grad()
                with torch.no_grad():
                    teacher_outputs = teacher_model(inputs)
                    #print(teacher_outputs)
                student_outputs = student_model(inputs)
                #print(student_outputs)
                loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
                #print(loss)
                loss.backward()
                student_optimizer.step()
                running_loss += loss.item()
        
            # Validation loop for metrics
            student_model.eval()
            val_preds = []
            val_labels = []
            with torch.no_grad():
                for val_inputs, val_targets in validationloader:
                    val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                    val_outputs = student_model(val_inputs)
                    _, preds = torch.max(val_outputs, 1)
                    val_preds.extend(preds.cpu().numpy())
                    val_labels.extend(val_targets.cpu().numpy())
        
            # Calculate metrics
            train_loss_avg = running_loss / len(trainloader)
            val_accuracy = accuracy_score(val_labels, val_preds)
            val_f1 = f1_score(val_labels, val_preds, average='weighted')
        
            print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
                  f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")
        
            # --- EARLY STOPPING ---
            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                counter = 0  
            else:
                counter += 1
                print(f"EarlyStopping counter: {counter} out of {patience}")
                if counter >= patience:
                    print("Early stopping triggered.")
                    break
            # ----------------------
        
            student_model.train()
        
        #Save final Student metrics
        output_path =f'../output/Resnet50-Resnet18-CIFAR10/min-activation/alpha{alpha}_dropout{r}.json'
        
        student_wrapper = NetWrapper(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
        accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
        write_to_json(
                    output_path,
                    'student',
                    student_wrapper,
                    accuracy,
                    conf_matrix,
                    per_class_acc,
                    per_class_precision,
                    classes
                )

# B. Sample-Droping

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  

In [None]:
import sys
sys.path.append('../modules')
from torch import nn, optim
from torchvision.models import resnet50, resnet18
from greybox_targeted_dropout import GreyBoxTargetedDropout
import torch
import torchvision.transforms as transforms
from model_wrapper_gt import NetWrapper_T
from import_data import load_cifar
from misc import write_to_json
import os
from sklearn.metrics import f1_score, accuracy_score
from os.path import exists
import ssl

ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
class TeacherNet_SD(nn.Module):
    def __init__(self, dropout_layer):
        super(TeacherNet_SD, self).__init__()
        self.resnet = resnet50(weights='IMAGENET1K_V1')
        self.dropout_layer = dropout_layer

        # Modify the fully connected (fc) layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            self.dropout_layer,
            nn.Linear(512, 10)  
        )

    def forward(self, x, labels=None, targets=None, start_attack=False):
        x = self.resnet.conv1(x)
        x = self.resnet.bn1(x)
        x = self.resnet.relu(x)
        x = self.resnet.maxpool(x)
        x = self.resnet.layer1(x)
        x = self.resnet.layer2(x)
        x = self.resnet.layer3(x)
        x = self.resnet.layer4(x)
        x = self.resnet.avgpool(x)
        x = torch.flatten(x, 1)

        for module in self.resnet.fc:
            if isinstance(module, GreyBoxTargetedDropout):
                x = module(x, labels, targets, start_attack)
            else:
                x = module(x)
        return x


class StudentNet_SD(nn.Module):
    def __init__(self):
        super(StudentNet_SD, self).__init__()
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Linear(self.resnet.fc.in_features, 256),
            nn.ReLU(),
            nn.Linear(256, 10)
        )

    def forward(self, x):
        return self.resnet(x)


In [None]:
batch_size = 128
epochs = 50
percent_dropout = [0.7, 0.8, 0.9]
target_class = (0,)
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
for r in percent_dropout:
    print(f"-------------- Droptout rate: {r} --------------")
    teacher_dropout = GreyBoxTargetedDropout(mode='max_activation', p=0.5, percent_drop=r, verbose=False)
    teacher_model = TeacherNet_SD(teacher_dropout).to(device)
    teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    teacher_wrapper.fit(
        trainloader,
        validationloader,
        target_class,
        num_epochs=epochs,
        verbose=True,
        attack_epoch=1,
        num_classes=10,
        patience=5,      
        min_delta=1e-3
    )
    torch.save(teacher_model.state_dict(),f"../ouput/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model-dropoutRate_{r}.pth")
    output_path = f'../ouput/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model-dropoutRate_{r}.json'
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
    write_to_json(
        output_path,
        'model',
        teacher_wrapper,
        accuracy,
        conf_matrix,
        per_class_acc,
        per_class_precision,
        classes
    )
            

In [None]:
batch_size = 128
epochs = 50
alphas = [0.3, 0.5, 0.7]
temperature = 20
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)

target_class = (0,)  
percent_dropout = [0.7, 0.8, 0.9]
for alpha in alphas:
    for r in percent_dropout:
        print(f"-------------- Alpha: {alpha} - Droptout rate: {r} --------------")
        teacher_dropout_layer = GreyBoxTargetedDropout(mode='max_activation', p=0.5, percent_drop=r, verbose=True)
        teacher_model = TeacherNet_SD(teacher_dropout_layer).to(device)
        teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
        teacher_model.load_state_dict(torch.load(os.path.join(f"../ouput/Resnet50-Resnet18-CIFAR10/sample-dropping/SD-teacher_model-dropoutRate_{r}.pth")))
        teacher_model.eval()
        
        # Student model
        student_dropout = nn.Dropout(p=0.5)
        student_model = StudentNet_SD().to(device)
        student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)
        #student_model.train()

        # Train Student
        student_model.train()
        best_val_accuracy = 0
        patience = 5  
        counter = 0 
        
        for epoch in range(epochs):
            running_loss = 0.0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(device), labels.to(device)

                student_optimizer.zero_grad()

                with torch.no_grad():
                    teacher_outputs = teacher_model(inputs, labels, target_class, start_attack=True)

                student_outputs = student_model(inputs)
                loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
                loss.backward()
                student_optimizer.step()
                running_loss += loss.item()

            # Validation loop for metrics
            student_model.eval()
            val_preds = []
            val_labels = []
            with torch.no_grad():
                for val_inputs, val_targets in validationloader:
                    val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                    val_outputs = student_model(val_inputs)
                    _, preds = torch.max(val_outputs, 1)
                    val_preds.extend(preds.cpu().numpy())
                    val_labels.extend(val_targets.cpu().numpy())
        
            # Calculate metrics
            train_loss_avg = running_loss / len(trainloader)
            val_accuracy = accuracy_score(val_labels, val_preds)
            val_f1 = f1_score(val_labels, val_preds, average='weighted')

            print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
                  f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")

             # --- EARLY STOPPING ---
            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                counter = 0  # Reset counter nếu có cải thiện
            else:
                counter += 1
                print(f"EarlyStopping counter: {counter} out of {patience}")
                if counter >= patience:
                    print("Early stopping triggered.")
                    break
            # ----------------------

            student_model.train()

        # Evaluate Student model
        student_wrapper = NetWrapper_T(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
        accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
        output_path =f'../ouput/Resnet50-Resnet18-CIFAR10/sample-dropping/alpha{alpha}_dropout{r}.json'
        write_to_json(
            output_path,
            'model',
            student_wrapper,
            accuracy,
            conf_matrix,
            per_class_acc,
            per_class_precision,
            classes
        )
    else:
        print('File found:', output_path)


# C. Separation Attack

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"  

In [None]:
import sys
sys.path.append('../modules')
import sys
import torch
from torch import nn, optim
from torchvision.models import resnet50, resnet18
from node_separation_dropout import NodeSepDropoutLayer
from model_wrapper_gt import NetWrapper_T
from import_data import load_cifar
from sklearn.metrics import f1_score, accuracy_score
from misc import write_to_json
from os.path import exists
import torchvision.transforms as transforms
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
import torch
import torch.nn as nn
from torchvision.models import resnet50


class TeacherNet(nn.Module):
    def __init__(self, dropout):
        """
        ResNet50 integrated with custom Dropout Layer (e.g., NodeSepDropoutLayer).

        Parameters:
            dropout: instance of NodeSepDropoutLayer or GreyBoxTargetedDropout
        """
        super(TeacherNet, self).__init__()
        self.layers = nn.ModuleList()
        self.dropout = dropout

        # Load pretrained ResNet50
        base_model = resnet50(weights='IMAGENET1K_V1')

        # Feature extractor part (everything except the final fc layer)
        self.features = nn.Sequential(
            base_model.conv1,
            base_model.bn1,
            base_model.relu,
            base_model.maxpool,
            base_model.layer1,
            base_model.layer2,
            base_model.layer3,
            base_model.layer4,
            base_model.avgpool
        )

        # Classifier layers with custom dropout
        self.layers.append(nn.Flatten())
        self.layers.append(nn.Linear(base_model.fc.in_features, 512))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.BatchNorm1d(512))
        #self.layers.append(self.dropout)
        self.layers.append(nn.Linear(512, 256))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.BatchNorm1d(256))
        self.layers.append(self.dropout)
        self.layers.append(nn.Linear(256, 10))

    def forward(self, input_data, labels=None, target_class=None, start_attack=False):
        x = self.features(input_data)
        for layer in self.layers:
            if layer._get_name() in ["GreyBoxTargetedDropout", "NodeSepDropoutLayer"]:
                x = layer(x, labels, target_class, start_attack)
            else:
                x = layer(x)
        return x


class StudentNet(nn.Module):
    def __init__(self):
        super(StudentNet, self).__init__()
        # Load pretrained ResNet18 from ImageNet
        self.resnet = resnet18(weights=None)

        # Modify the fully connected layer
        self.resnet.fc = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(self.resnet.fc.in_features, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.BatchNorm1d(256),

            nn.Linear(256, 10)  # CIFAR-100
        )

    def forward(self, x):
        return self.resnet(x)

### C. Teacher Training Zone

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 128
epoch = 50
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
target_class = (0,)
mode = 'probability'
percent_nodes_for_targets = 0.1
node_sep_probability = [0.01, 0.03, 0.05]
start_attack = 0
num_to_assign = None
for prob in node_sep_probability:
    print('.....................New Model Running.....................')
    dropout = NodeSepDropoutLayer(0.5, mode, percent_nodes_for_targets, prob, num_to_assign)
    net = TeacherNet(dropout).to(device)
    teacher_wrapper = NetWrapper_T(net, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
    teacher_wrapper.fit(
        trainloader,
        validationloader,
        target_class,
        num_epochs=epoch,
        verbose=True,
        attack_epoch=1,
        num_classes=10,
        patience=5,      
        min_delta=1e-3
    )
    torch.save(net.state_dict(),f"../ouput/Resnet50-Resnet18-CIFAR10/neuron-seperation/NS-teacher_model_percent-nodes{prob}.pth")
    accuracy, _, conf_matrix, per_class_acc, per_class_precision = teacher_wrapper.evaluate(testloader)
    write_to_json(f'../ouput/Resnet50-Resnet18-CIFAR10/neuron-seperation/NS-teacher_model_percent-nodes{prob}.json', 'model', teacher_wrapper, accuracy, conf_matrix, per_class_acc, per_class_precision, classes)


### C. KD Zone

In [None]:
# Softening by dividing by temperature.
ce_loss = nn.CrossEntropyLoss()
def distillation_loss(student_logits, teacher_logits, labels, temperature, alpha):
    soft_teacher_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
    soft_student_probs = nn.functional.log_softmax(student_logits / temperature, dim=-1)     
    kl_divergence_loss = torch.sum(soft_teacher_targets * (soft_teacher_targets.log() - soft_student_probs)) / soft_student_probs.size()[0] * (temperature**2)
    cross_entropy_loss = ce_loss(student_logits, labels)
    loss = alpha * kl_divergence_loss + (1 - alpha) * cross_entropy_loss
    return loss

In [None]:
batch_size = 128
epochs = 50
alphas = [0.3, 0.5, 0.7]
temperature = 20
target_class = (0,)
classes = ['plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485, 0.456, 0.406),(0.229, 0.224, 0.225))])
_, _, _, trainloader, validationloader, testloader = load_cifar(batch_size, transform)
selected = (0,)
mode = 'probability'
percent_nodes_for_targets = 0.1
start_attack = 0
num_to_assign = None
node_sep_probability = [0.01,0.03, 0.05]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
for alpha in alphas:
    for prob in node_sep_probability:
        print('.....................New Model Running.....................')

        # Teacher Model
        dropout_layer = NodeSepDropoutLayer(0.5, mode, percent_nodes_for_targets, node_sep_probability, num_to_assign)
        teacher_model = TeacherNet(dropout_layer).to(device)
        teacher_wrapper = NetWrapper_T(teacher_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
        teacher_model.load_state_dict(torch.load(os.path.join(f"../ouput/Resnet50-Resnet18-CIFAR10/neuron-seperation/NS-teacher_model_percent-nodes{prob}.pth")))
        teacher_model.eval()
        

        # Student Model
        student_dropout = nn.Dropout(p=0.5)
        student_model = StudentNet().to(device)
        student_optimizer = optim.Adam(student_model.parameters(), lr=1e-3)

        # Train Student
        student_model.train()

        # EarlyStopping setup
        best_val_acc = 0
        epochs_no_improve = 0
        patience = 5 


        for epoch in range(epochs):
            running_loss = 0.0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(device), labels.to(device)

                student_optimizer.zero_grad()

                with torch.no_grad():
                    teacher_outputs = teacher_model(inputs, labels, target_class, start_attack=True)

                student_outputs = student_model(inputs)
                loss = distillation_loss(student_outputs, teacher_outputs, labels, temperature, alpha)
                loss.backward()
                student_optimizer.step()
                running_loss += loss.item()

            # Validation loop for metrics
            student_model.eval()
            val_preds = []
            val_labels = []
            with torch.no_grad():
                for val_inputs, val_targets in validationloader:
                    val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                    val_outputs = student_model(val_inputs)
                    _, preds = torch.max(val_outputs, 1)
                    val_preds.extend(preds.cpu().numpy())
                    val_labels.extend(val_targets.cpu().numpy())
        
            # Calculate metrics
            train_loss_avg = running_loss / len(trainloader)
            val_accuracy = accuracy_score(val_labels, val_preds)
            val_f1 = f1_score(val_labels, val_preds, average='weighted')

            print(f"Epoch [{epoch + 1}/{epochs}] - Loss: {train_loss_avg:.4f}, "
                  f"Validation Accuracy: {val_accuracy:.4f}, Validation F1 Score: {val_f1:.4f}")
                
            # EarlyStopping check
            if val_accuracy > best_val_acc:
                best_val_acc = val_accuracy
                epochs_no_improve = 0
            else:
                epochs_no_improve += 1
                print(f"No improvement for {epochs_no_improve} epoch(s).")
                if epochs_no_improve >= patience:
                    print("Early stopping triggered.")
                    break
            student_model.train()
            

        # Evaluate Student Model
        student_wrapper = NetWrapper_T(student_model, nn.CrossEntropyLoss(), optim.Adam, [1e-3])
        accuracy, _, conf_matrix, per_class_acc, per_class_precision = student_wrapper.evaluate(testloader)
        output_path =f'../output/Resnet50-Resnet18-CIFAR10/neuron-seperation/alpha{alpha}_dropout{prob}.json'
        write_to_json(
            output_path, 
            'distillation', 
            student_wrapper, 
            accuracy, 
            conf_matrix, 
            per_class_acc, 
            per_class_precision, 
            classes
        )
