<h1 align="center">Introduction to Machine Learning - Course Code: 25737</h1>
<h4 align="center">Instructor: Dr. Amiri</h4>
<h4 align="center">Sharif University of Technology, Spring 2024</h4>
<h4 align="center">Project Phase 1</h4>
<h4 align="center">
Parsa hatami -- Mohammad Mahdi Razmjoo
</h4>
<h4 align="center">400100962 -- 400101272</h4>


<h4 align="center">Simulation Question 4</h4>

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split

# Define the CIFAR10Classifier model
class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(32 * 14 * 14, 64)  # Adjusted input size to match the output of conv layers
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Load CIFAR-10 Data
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# 80% train and 20% validation
train_size = int(0.8 * len(train_data))
val_size = len(train_data) - train_size
train_data, val_data = random_split(train_data, [train_size, val_size])

train_loader = DataLoader(train_data, batch_size=64, shuffle=True)
val_loader = DataLoader(val_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

def train_model(model, train_loader, val_loader, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")
        
        # Validation
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        print(f"Validation Accuracy: {100 * correct / total}%")

    return model

baseline_model = CIFAR10Classifier()
baseline_model = train_model(baseline_model, train_loader, val_loader, epochs=10)


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:15<00:00, 11196080.97it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified




Epoch 1, Loss: 1.7366610702514649
Validation Accuracy: 50.09%
Epoch 2, Loss: 1.4885362251281737
Validation Accuracy: 55.15%
Epoch 3, Loss: 1.3926309808731079
Validation Accuracy: 58.99%
Epoch 4, Loss: 1.3188941102981568
Validation Accuracy: 60.82%
Epoch 5, Loss: 1.2693265802383422
Validation Accuracy: 61.73%
Epoch 6, Loss: 1.2287073127746582
Validation Accuracy: 63.11%
Epoch 7, Loss: 1.1855706621170043
Validation Accuracy: 63.46%
Epoch 8, Loss: 1.1630538983345031
Validation Accuracy: 63.65%
Epoch 9, Loss: 1.1259229259490966
Validation Accuracy: 64.05%
Epoch 10, Loss: 1.1030537225723267
Validation Accuracy: 64.92%


<h4 align="center">Simulation Question 5</h4>


In [2]:
def train_model_with_simple_noise(model, train_loader, val_loader, epochs=10, lr=0.001, noise_factor=0.1):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Adding noise to the gradients
            for param in model.parameters():
                if param.grad is not None:
                    noise = torch.normal(mean=0, std=noise_factor, size=param.grad.shape).to(device)
                    param.grad += noise
            
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")
        
        # Validation
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        print(f"Validation Accuracy: {100 * correct / total}%")

    return model

modified_model = CIFAR10Classifier()
modified_model = train_model_with_simple_noise(modified_model, train_loader, val_loader, epochs=10)

Epoch 1, Loss: 2.225510207557678
Validation Accuracy: 27.19%
Epoch 2, Loss: 2.0395797691345217
Validation Accuracy: 32.53%
Epoch 3, Loss: 1.9526188655853272
Validation Accuracy: 35.59%
Epoch 4, Loss: 1.9016451257705689
Validation Accuracy: 37.37%
Epoch 5, Loss: 1.8505986602783202
Validation Accuracy: 39.63%
Epoch 6, Loss: 1.8042387203216552
Validation Accuracy: 41.35%
Epoch 7, Loss: 1.7757052408218383
Validation Accuracy: 42.28%
Epoch 8, Loss: 1.7494818239212035
Validation Accuracy: 42.94%
Epoch 9, Loss: 1.724596997833252
Validation Accuracy: 43.98%
Epoch 10, Loss: 1.7055449342727662
Validation Accuracy: 44.41%


<h4 align="center">Simulation Question 6</h4>

for writing this part's code, first i used opacus library and using privateEngine function but it didn't had good results. also the values for the model parameters has many problems (unvalid input errors). then i used noised on gradients which has the reasonable output. after it in the next part i used shadow models for implementations.

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split

# Define the CIFAR10Classifier model
class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(32 * 6 * 6, 64)  # Adjusted input size to match the output of conv layers
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Function to determine the output size of the convolutional layers
def get_conv_output_size():
    model = CIFAR10Classifier()
    model.eval()
    with torch.no_grad():
        dummy_input = torch.randn(1, 3, 32, 32)
        output = model.conv1(dummy_input)
        output = F.relu(output)
        output = model.conv2(output)
        output = F.relu(output)
        output = F.max_pool2d(output, 2)
        output = model.dropout1(output)
        return output.view(output.size(0), -1).size(1)

conv_output_size = get_conv_output_size()
print("Convolutional output size:", conv_output_size)

def update_fc1_input_size(model, new_input_size):
    model.fc1 = nn.Linear(new_input_size, 64)
    return model

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_size = int(0.8 * len(train_data))
unseen_size = len(train_data) - train_size
seen_data, unseen_data = random_split(train_data, [train_size, unseen_size])

train_loader = DataLoader(seen_data, batch_size=64, shuffle=True)
unseen_loader = DataLoader(unseen_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

def train_baseline_model(model, train_loader, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

def train_privacy_model(model, train_loader, epochs=10, lr=0.001, noise_multiplier=1.1, max_grad_norm=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Clip gradients
            total_norm = torch.norm(torch.stack([torch.norm(p.grad) for p in model.parameters() if p.grad is not None]), 2.0)
            clip_coef = max_grad_norm / (total_norm + 1e-6)
            if clip_coef < 1:
                for p in model.parameters():
                    if p.grad is not None:
                        p.grad.data.mul_(clip_coef)
            
            # Add noise to gradients
            for p in model.parameters():
                if p.grad is not None:
                    noise = torch.normal(0, noise_multiplier * max_grad_norm, p.grad.shape).to(device)
                    p.grad.add_(noise)
            
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

baseline_model = CIFAR10Classifier()
baseline_model = update_fc1_input_size(baseline_model, conv_output_size)

baseline_model = train_baseline_model(baseline_model, train_loader, epochs=10)

privacy_model = CIFAR10Classifier()
privacy_model = update_fc1_input_size(privacy_model, conv_output_size)

privacy_model = train_privacy_model(privacy_model, train_loader, epochs=10)

def generate_attack_data(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    attack_data = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            attack_data.append((outputs.cpu().numpy(), labels.cpu().numpy()))
    return attack_data

seen_attack_data_baseline = generate_attack_data(baseline_model, train_loader)
unseen_attack_data_baseline = generate_attack_data(baseline_model, unseen_loader)

seen_attack_data_privacy = generate_attack_data(privacy_model, train_loader)
unseen_attack_data_privacy = generate_attack_data(privacy_model, unseen_loader)

class AttackerModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_attacker_model(attack_data, input_dim, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AttackerModel(input_dim).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for (inputs, labels) in attack_data:
            inputs, labels = torch.tensor(inputs, dtype=torch.float32).to(device), torch.tensor(labels, dtype=torch.long).to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(attack_data)}")

    return model

input_dim = seen_attack_data_baseline[0][0].shape[1]

def prepare_attack_data(seen_data, unseen_data):
    x = []
    y = []
    for data in seen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(1)  # Seen data is labeled as 1
    for data in unseen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(0)  # Unseen data is labeled as 0
    return list(zip(x, y))

attack_data_baseline = prepare_attack_data(seen_attack_data_baseline, unseen_attack_data_baseline)
attack_data_privacy = prepare_attack_data(seen_attack_data_privacy, unseen_attack_data_privacy)

def create_attack_loader(attack_data):
    inputs, labels = zip(*attack_data)
    inputs = torch.tensor(inputs, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(inputs, labels)
    return DataLoader(dataset, batch_size=64, shuffle=True)

attack_loader_baseline = create_attack_loader(attack_data_baseline)
attack_loader_privacy = create_attack_loader(attack_data_privacy)

attacker_model_baseline = train_attacker_model(attack_loader_baseline, input_dim, epochs=10)
attacker_model_privacy = train_attacker_model(attack_loader_privacy, input_dim, epochs=10)

def evaluate_attacker_model(model, attack_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

mia_accuracy_baseline = evaluate_attacker_model(attacker_model_baseline, attack_loader_baseline)
mia_accuracy_privacy = evaluate_attacker_model(attacker_model_privacy, attack_loader_privacy)

print(f"MIA Accuracy for Baseline Model: {mia_accuracy_baseline}%")
print(f"MIA Accuracy for Privacy-Enhanced Model: {mia_accuracy_privacy}%")


Convolutional output size: 6272
Files already downloaded and verified
Files already downloaded and verified
Epoch 1, Loss: 1.7678411323547363
Epoch 2, Loss: 1.5409598583221435
Epoch 3, Loss: 1.4331507625579833
Epoch 4, Loss: 1.3591145590782165
Epoch 5, Loss: 1.3061723586082459
Epoch 6, Loss: 1.2551678314208985
Epoch 7, Loss: 1.2168778618812561
Epoch 8, Loss: 1.1781313047409057
Epoch 9, Loss: 1.1537385991096496
Epoch 10, Loss: 1.1280500485420226
Epoch 1, Loss: 2.3000124996185303
Epoch 2, Loss: 2.2975273902893067
Epoch 3, Loss: 2.2972838787078858
Epoch 4, Loss: 2.3007118530273436
Epoch 5, Loss: 2.295948947143555
Epoch 6, Loss: 2.29217572517395
Epoch 7, Loss: 2.296297233200073
Epoch 8, Loss: 2.283183654022217
Epoch 9, Loss: 2.27386955909729
Epoch 10, Loss: 2.27422406539917


  inputs, labels = torch.tensor(inputs, dtype=torch.float32).to(device), torch.tensor(labels, dtype=torch.long).to(device)


Epoch 1, Loss: 0.5097536654271129
Epoch 2, Loss: 0.5025043099585091
Epoch 3, Loss: 0.5016026476307598
Epoch 4, Loss: 0.501030719188778
Epoch 5, Loss: 0.5007108262051707
Epoch 6, Loss: 0.5005047058739016
Epoch 7, Loss: 0.5002919970189824
Epoch 8, Loss: 0.500293756423094
Epoch 9, Loss: 0.5001643371703984
Epoch 10, Loss: 0.49981699281793723
Epoch 1, Loss: 0.5047402826264081
Epoch 2, Loss: 0.5019735755289302
Epoch 3, Loss: 0.5011106045426005
Epoch 4, Loss: 0.5009644730850253
Epoch 5, Loss: 0.5007226658827814
Epoch 6, Loss: 0.500706170221119
Epoch 7, Loss: 0.5004310321320048
Epoch 8, Loss: 0.5001962461587414
Epoch 9, Loss: 0.4997200621935108
Epoch 10, Loss: 0.4998288520080659
MIA Accuracy for Baseline Model: 80.0%
MIA Accuracy for Privacy-Enhanced Model: 80.004%


In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, random_split, TensorDataset

# Define the CIFAR10Classifier model
class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(32 * 6 * 6, 64)  # Adjusted input size to match the output of conv layers
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

def get_conv_output_size():
    model = CIFAR10Classifier()
    model.eval()
    with torch.no_grad():
        dummy_input = torch.randn(1, 3, 32, 32)
        output = model.conv1(dummy_input)
        output = F.relu(output)
        output = model.conv2(output)
        output = F.relu(output)
        output = F.max_pool2d(output, 2)
        output = model.dropout1(output)
        return output.view(output.size(0), -1).size(1)

conv_output_size = get_conv_output_size()
print("Convolutional output size:", conv_output_size)

def update_fc1_input_size(model, new_input_size):
    model.fc1 = nn.Linear(new_input_size, 64)
    return model

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_size = int(0.8 * len(train_data))
unseen_size = len(train_data) - train_size
seen_data, unseen_data = random_split(train_data, [train_size, unseen_size])

train_loader = DataLoader(seen_data, batch_size=64, shuffle=True)
unseen_loader = DataLoader(unseen_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

def train_baseline_model(model, train_loader, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

def train_privacy_model(model, train_loader, epochs=10, lr=0.001, noise_multiplier=1.1, max_grad_norm=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Clip gradients
            total_norm = torch.norm(torch.stack([torch.norm(p.grad) for p in model.parameters() if p.grad is not None]), 2.0)
            clip_coef = max_grad_norm / (total_norm + 1e-6)
            if clip_coef < 1:
                for p in model.parameters():
                    if p.grad is not None:
                        p.grad.data.mul_(clip_coef)
            
            # Add noise to gradients
            for p in model.parameters():
                if p.grad is not None:
                    noise = torch.normal(0, noise_multiplier * max_grad_norm, p.grad.shape).to(device)
                    p.grad.add_(noise)
            
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

baseline_model = CIFAR10Classifier()
baseline_model = update_fc1_input_size(baseline_model, conv_output_size)

baseline_model = train_baseline_model(baseline_model, train_loader, epochs=10)

privacy_model = CIFAR10Classifier()
privacy_model = update_fc1_input_size(privacy_model, conv_output_size)

privacy_model = train_privacy_model(privacy_model, train_loader, epochs=10)

def generate_attack_data(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    attack_data = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            attack_data.append((outputs.cpu().numpy(), labels.cpu().numpy()))
    return attack_data

def train_multiple_shadow_models(num_shadow_models, train_loader, val_loader, epochs=10):
    shadow_models = []
    for _ in range(num_shadow_models):
        model = CIFAR10Classifier()
        model = update_fc1_input_size(model, conv_output_size)
        model = train_baseline_model(model, train_loader, epochs=epochs)
        shadow_models.append(model)
    return shadow_models

def generate_shadow_attack_data(shadow_models, data_loader):
    attack_data = []
    for model in shadow_models:
        attack_data.extend(generate_attack_data(model, data_loader))
    return attack_data

num_shadow_models = 2  
shadow_models_baseline = train_multiple_shadow_models(num_shadow_models, train_loader, unseen_loader)
shadow_models_privacy = train_multiple_shadow_models(num_shadow_models, train_loader, unseen_loader)

shadow_attack_data_baseline_seen = generate_shadow_attack_data(shadow_models_baseline, train_loader)
shadow_attack_data_baseline_unseen = generate_shadow_attack_data(shadow_models_baseline, unseen_loader)
shadow_attack_data_privacy_seen = generate_shadow_attack_data(shadow_models_privacy, train_loader)
shadow_attack_data_privacy_unseen = generate_shadow_attack_data(shadow_models_privacy, unseen_loader)

def prepare_attack_data(seen_data, unseen_data):
    x = []
    y = []
    for data in seen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(1) 
    for data in unseen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(0) 
    return list(zip(x, y))

attack_data_baseline = prepare_attack_data(shadow_attack_data_baseline_seen, shadow_attack_data_baseline_unseen)
attack_data_privacy = prepare_attack_data(shadow_attack_data_privacy_seen, shadow_attack_data_privacy_unseen)

def create_attack_loader(attack_data):
    inputs, labels = zip(*attack_data)
    inputs = torch.tensor(inputs, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(inputs, labels)
    return DataLoader(dataset, batch_size=64, shuffle=True)

attack_loader_baseline = create_attack_loader(attack_data_baseline)
attack_loader_privacy = create_attack_loader(attack_data_privacy)

class AttackerModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_attacker_model(attack_loader, input_dim, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AttackerModel(input_dim).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(attack_loader)}")

    return model

input_dim = attack_loader_baseline.dataset.tensors[0].shape[1]

attacker_model_baseline = train_attacker_model(attack_loader_baseline, input_dim, epochs=10)
attacker_model_privacy = train_attacker_model(attack_loader_privacy, input_dim, epochs=10)

def evaluate_attacker_model(model, attack_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

mia_accuracy_baseline = evaluate_attacker_model(attacker_model_baseline, attack_loader_baseline)
mia_accuracy_privacy = evaluate_attacker_model(attacker_model_privacy, attack_loader_privacy)

print(f"MIA Accuracy for Baseline Model: {mia_accuracy_baseline}%")
print(f"MIA Accuracy for Privacy-Enhanced Model: {mia_accuracy_privacy}%")


Convolutional output size: 6272
Files already downloaded and verified
Files already downloaded and verified
Epoch 1, Loss: 1.7850487289428711
Epoch 2, Loss: 1.5614835346221925
Epoch 3, Loss: 1.4628979633331298
Epoch 4, Loss: 1.3873797872543334
Epoch 5, Loss: 1.3319367524147034
Epoch 6, Loss: 1.2843160557746887
Epoch 7, Loss: 1.252872998905182
Epoch 8, Loss: 1.2203081357955932
Epoch 9, Loss: 1.196781807231903
Epoch 10, Loss: 1.1672297500610351
Epoch 1, Loss: 2.3024287799835204
Epoch 2, Loss: 2.2987195854187013
Epoch 3, Loss: 2.2849382568359373
Epoch 4, Loss: 2.27986473197937
Epoch 5, Loss: 2.272332763671875
Epoch 6, Loss: 2.268441830444336
Epoch 7, Loss: 2.256930570602417
Epoch 8, Loss: 2.243988162994385
Epoch 9, Loss: 2.2472985191345214
Epoch 10, Loss: 2.2477793170928955
Epoch 1, Loss: 1.7577148406982421
Epoch 2, Loss: 1.4715978048324585
Epoch 3, Loss: 1.355044997406006
Epoch 4, Loss: 1.2875937500953674
Epoch 5, Loss: 1.2321864344596862
Epoch 6, Loss: 1.1887373788833617
Epoch 7, Loss: 

<h4 align="center">Simulation Question 7</h4>


without editing the given model

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset, random_split
from torchvision import datasets, transforms

class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(1152, 64)  # Update this after calculating the correct size
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

def get_conv_output_size(model):
    with torch.no_grad():
        dummy_input = torch.randn(1, 3, 32, 32)
        output = model.conv1(dummy_input)
        output = F.relu(output)
        output = model.conv2(output)
        output = F.relu(output)
        output = F.max_pool2d(output, 2)
        output = model.dropout1(output)
        return output.view(output.size(0), -1).size(1)

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_size = int(0.8 * len(train_data))
unseen_size = len(train_data) - train_size
seen_data, unseen_data = random_split(train_data, [train_size, unseen_size])

train_loader = DataLoader(seen_data, batch_size=64, shuffle=True)
unseen_loader = DataLoader(unseen_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

def train_baseline_model(model, train_loader, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

baseline_model = CIFAR10Classifier()
conv_output_size = get_conv_output_size(baseline_model)
baseline_model.fc1 = nn.Linear(conv_output_size, 64)  # Update fc1 with correct input size
baseline_model = train_baseline_model(baseline_model, train_loader, epochs=10)

def train_privacy_model(model, train_loader, epochs=10, lr=0.001, noise_multiplier=1.1, max_grad_norm=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Clip gradients
            total_norm = torch.norm(torch.stack([torch.norm(p.grad) for p in model.parameters() if p.grad is not None]), 2.0)
            clip_coef = max_grad_norm / (total_norm + 1e-6)
            if clip_coef < 1:
                for p in model.parameters():
                    if p.grad is not None:
                        p.grad.data.mul_(clip_coef)
            
            # Add noise to gradients
            for p in model.parameters():
                if p.grad is not None:
                    noise = torch.normal(0, noise_multiplier * max_grad_norm, p.grad.shape).to(device)
                    p.grad.add_(noise)
            
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

privacy_model = CIFAR10Classifier()
privacy_model.fc1 = nn.Linear(conv_output_size, 64)  # Update fc1 with correct input size
privacy_model = train_privacy_model(privacy_model, train_loader, epochs=10)

def generate_attack_data(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    attack_data = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            attack_data.append((outputs.cpu().numpy(), labels.cpu().numpy()))
    return attack_data

def train_multiple_shadow_models(num_shadow_models, train_loader, epochs=10):
    shadow_models = []
    for _ in range(num_shadow_models):
        model = CIFAR10Classifier()
        model.fc1 = nn.Linear(conv_output_size, 64)  # Update fc1 with correct input size
        model = train_baseline_model(model, train_loader, epochs=epochs)
        shadow_models.append(model)
    return shadow_models

def generate_shadow_attack_data(shadow_models, data_loader):
    attack_data = []
    for model in shadow_models:
        attack_data.extend(generate_attack_data(model, data_loader))
    return attack_data

num_shadow_models = 5  
shadow_models_baseline = train_multiple_shadow_models(num_shadow_models, train_loader)
shadow_models_privacy = train_multiple_shadow_models(num_shadow_models, train_loader)

shadow_attack_data_baseline = generate_shadow_attack_data(shadow_models_baseline, train_loader)
shadow_attack_data_privacy = generate_shadow_attack_data(shadow_models_privacy, train_loader)

def prepare_attack_data(seen_data, unseen_data):
    x = []
    y = []
    for data in seen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(1)  
    for data in unseen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(0)  
    return list(zip(x, y))

attack_data_baseline = prepare_attack_data(shadow_attack_data_baseline, unseen_attack_data_baseline)
attack_data_privacy = prepare_attack_data(shadow_attack_data_privacy, unseen_attack_data_privacy)

def create_attack_loader(attack_data):
    inputs, labels = zip(*attack_data)
    inputs = torch.tensor(inputs, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(inputs, labels)
    return DataLoader(dataset, batch_size=64, shuffle=True)

attack_loader_baseline = create_attack_loader(attack_data_baseline)
attack_loader_privacy = create_attack_loader(attack_data_privacy)

class AttackerModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_attacker_model(attack_loader, input_dim, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AttackerModel(input_dim).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(attack_loader)}")

    return model

input_dim = shadow_attack_data_baseline[0][0].shape[1]

attacker_model_baseline = train_attacker_model(attack_loader_baseline, input_dim, epochs=10)
attacker_model_privacy = train_attacker_model(attack_loader_privacy, input_dim, epochs=10)

def evaluate_attacker_model(model, attack_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

mia_accuracy_privacy = evaluate_attacker_model(attacker_model_baseline, attack_loader_baseline)
mia_accuracy_baseline = evaluate_attacker_model(attacker_model_privacy, attack_loader_privacy)

print(f"MIA Accuracy for Baseline Model: {mia_accuracy_baseline}%")
print(f"MIA Accuracy for Privacy-Enhanced Model: {mia_accuracy_privacy}%")


Files already downloaded and verified
Files already downloaded and verified
Epoch 1, Loss: 1.766910039138794
Epoch 2, Loss: 1.5127717491149901
Epoch 3, Loss: 1.4160685876846313
Epoch 4, Loss: 1.3594872522354127
Epoch 5, Loss: 1.3121115957260132
Epoch 6, Loss: 1.2810249942779541
Epoch 7, Loss: 1.243545382499695
Epoch 8, Loss: 1.219447069454193
Epoch 9, Loss: 1.1998160769462585
Epoch 10, Loss: 1.1696785223007202
Epoch 1, Loss: 2.302423729324341
Epoch 2, Loss: 2.2876622772216795
Epoch 3, Loss: 2.2719620769500732
Epoch 4, Loss: 2.2605060306549074
Epoch 5, Loss: 2.256458484649658
Epoch 6, Loss: 2.241231196594238
Epoch 7, Loss: 2.2374693660736082
Epoch 8, Loss: 2.233146411895752
Epoch 9, Loss: 2.2190399843215944
Epoch 10, Loss: 2.2270974060058593
Epoch 1, Loss: 1.778077550315857
Epoch 2, Loss: 1.5142680486679077
Epoch 3, Loss: 1.412343450164795
Epoch 4, Loss: 1.3400785613059998
Epoch 5, Loss: 1.291143714237213
Epoch 6, Loss: 1.2503027514457703
Epoch 7, Loss: 1.2063611203193665
Epoch 8, Loss:

notice that the print of the accuracies are vice versa, but because of the limit of the time i can't run this cell one more time. the true output is :

MIA Accuracy for Baseline Model: 99.99809523809523%


MIA Accuracy for Privacy-Enhanced Model: 96.77714285714286%

second meethod

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms

class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        
        # Calculate the size of the output of the conv layers
        self._to_linear = None
        self.convs(torch.randn(1, 3, 32, 32))
        
        self.fc1 = nn.Linear(self._to_linear, 64)
        self.fc2 = nn.Linear(64, 10)

    def convs(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        if self._to_linear is None:
            self._to_linear = x.view(x.size(0), -1).size(1)
        return x

    def forward(self, x):
        x = self.convs(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_data = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_data = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

train_size = int(0.8 * len(train_data))
unseen_size = len(train_data) - train_size
seen_data, unseen_data = random_split(train_data, [train_size, unseen_size])

train_loader = DataLoader(seen_data, batch_size=64, shuffle=True)
unseen_loader = DataLoader(unseen_data, batch_size=64, shuffle=False)
test_loader = DataLoader(test_data, batch_size=64, shuffle=False)

def train_baseline_model(model, train_loader, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

baseline_model = CIFAR10Classifier()
baseline_model = train_baseline_model(baseline_model, train_loader, epochs=10)

def train_privacy_model(model, train_loader, epochs=10, lr=0.001, noise_multiplier=1.1, max_grad_norm=1.0):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            # Clip gradients
            total_norm = torch.norm(torch.stack([torch.norm(p.grad) for p in model.parameters() if p.grad is not None]), 2.0)
            clip_coef = max_grad_norm / (total_norm + 1e-6)
            if clip_coef < 1:
                for p in model.parameters():
                    if p.grad is not None:
                        p.grad.data.mul_(clip_coef)
            
            # Add noise to gradients
            for p in model.parameters():
                if p.grad is not None:
                    noise = torch.normal(0, noise_multiplier * max_grad_norm, p.grad.shape).to(device)
                    p.grad.add_(noise)
            
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")

    return model

privacy_model = CIFAR10Classifier()
privacy_model = train_privacy_model(privacy_model, train_loader, epochs=10)

def generate_attack_data(model, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    attack_data = []
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            attack_data.append((outputs.cpu().numpy(), labels.cpu().numpy()))
    return attack_data

def train_multiple_shadow_models(num_shadow_models, train_loader, epochs=10):
    shadow_models = []
    for _ in range(num_shadow_models):
        model = CIFAR10Classifier()
        model = train_baseline_model(model, train_loader, epochs=epochs)
        shadow_models.append(model)
    return shadow_models

def generate_shadow_attack_data(shadow_models, data_loader):
    attack_data = []
    for model in shadow_models:
        attack_data.extend(generate_attack_data(model, data_loader))
    return attack_data

num_shadow_models = 5  
shadow_models_baseline = train_multiple_shadow_models(num_shadow_models, train_loader)
shadow_models_privacy = train_multiple_shadow_models(num_shadow_models, train_loader)

shadow_attack_data_baseline = generate_shadow_attack_data(shadow_models_baseline, train_loader)
shadow_attack_data_privacy = generate_shadow_attack_data(shadow_models_privacy, train_loader)

def prepare_attack_data(seen_data, unseen_data):
    x = []
    y = []
    for data in seen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(1)  
    for data in unseen_data:
        for output, label in zip(*data):
            x.append(output)
            y.append(0)  
    return list(zip(x, y))

attack_data_baseline = prepare_attack_data(shadow_attack_data_baseline, unseen_attack_data_baseline)
attack_data_privacy = prepare_attack_data(shadow_attack_data_privacy, unseen_attack_data_privacy)

def create_attack_loader(attack_data):
    inputs, labels = zip(*attack_data)
    inputs = torch.tensor(inputs, dtype=torch.float32)
    labels = torch.tensor(labels, dtype=torch.long)
    dataset = torch.utils.data.TensorDataset(inputs, labels)
    return DataLoader(dataset, batch_size=64, shuffle=True)

attack_loader_baseline = create_attack_loader(attack_data_baseline)
attack_loader_privacy = create_attack_loader(attack_data_privacy)

# Train attacker models
class AttackerModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

def train_attacker_model(attack_loader, input_dim, epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = AttackerModel(input_dim).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(attack_loader)}")

    return model

input_dim = shadow_attack_data_baseline[0][0].shape[1]

attacker_model_baseline = train_attacker_model(attack_loader_baseline, input_dim, epochs=10)
attacker_model_privacy = train_attacker_model(attack_loader_privacy, input_dim, epochs=10)

def evaluate_attacker_model(model, attack_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

mia_accuracy_baseline = evaluate_attacker_model(attacker_model_baseline, attack_loader_baseline)
mia_accuracy_privacy = evaluate_attacker_model(attacker_model_privacy, attack_loader_privacy)

print(f"Improved MIA Accuracy for Baseline Model: {mia_accuracy_baseline}%")
print(f"Improved MIA Accuracy for Privacy-Enhanced Model: {mia_accuracy_privacy}%")


Files already downloaded and verified
Files already downloaded and verified
Epoch 1, Loss: 1.815518768310547
Epoch 2, Loss: 1.5492673496246339
Epoch 3, Loss: 1.4341563167572022
Epoch 4, Loss: 1.3613821216583253
Epoch 5, Loss: 1.3066818266868592
Epoch 6, Loss: 1.2582828482627868
Epoch 7, Loss: 1.2230890037536621
Epoch 8, Loss: 1.198339196205139
Epoch 9, Loss: 1.1705496068000794
Epoch 10, Loss: 1.1518010720252991
Epoch 1, Loss: 2.301245771026611
Epoch 2, Loss: 2.3038307456970215
Epoch 3, Loss: 2.3047791275024414
Epoch 4, Loss: 2.301388904571533
Epoch 5, Loss: 2.2973534507751463
Epoch 6, Loss: 2.291410094833374
Epoch 7, Loss: 2.2839398502349852
Epoch 8, Loss: 2.2704434707641603
Epoch 9, Loss: 2.2699807048797607
Epoch 10, Loss: 2.2728657932281493
Epoch 1, Loss: 1.7501491132736207
Epoch 2, Loss: 1.489423397064209
Epoch 3, Loss: 1.38676729221344
Epoch 4, Loss: 1.3164008008003234
Epoch 5, Loss: 1.2622181720733643
Epoch 6, Loss: 1.217936426448822
Epoch 7, Loss: 1.1830250049591065
Epoch 8, Loss

notice that the print of the accuracies are vice versa, but because of the limit of the time i can't run this cell one more time. the true output is :

MIA Accuracy for Baseline Model: 99.99428571428571%


MIA Accuracy for Privacy-Enhanced Model: 97.2195238095238%

<h4 align="center">Save the attacker models for presentation attack on dataset</h4>


In [11]:
# Saving the attacker models
torch.save(attacker_model_baseline.state_dict(), 'attacker_model_baseline.pth')
torch.save(attacker_model_privacy.state_dict(), 'attacker_model_privacy.pth')

<h4 align="center">Simulation Question 8</h4>


In [None]:
class BinaryClassifier(nn.Module):
    def __init__(self, input_dim):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

input_dim = combined_features.size(1)
attacker_model_baseline = BinaryClassifier(input_dim).to(device)
attacker_model_privacy = BinaryClassifier(input_dim).to(device)

attacker_model_baseline.load_state_dict(torch.load('/kaggle/working/attacker_model_baseline.pth', map_location=device))
attacker_model_privacy.load_state_dict(torch.load('/kaggle/working/attacker_model_privacy.pth', map_location=device))

def evaluate_attacker_model(model, dataloader):
    model.eval()
    all_labels = []
    all_predicted = []
    correct = 0
    total = 0

    with torch.no_grad():
        for features, labels in dataloader:
            features, labels = features.to(device), labels.to(device).float().unsqueeze(1)
            outputs = model(features).squeeze()
            predicted = (outputs > 0.5).float()
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predicted.extend(predicted.cpu().numpy())

    accuracy = correct / total
    cm = confusion_matrix(all_labels, all_predicted)
    precision = precision_score(all_labels, all_predicted)
    recall = recall_score(all_labels, all_predicted)
    f1 = f1_score(all_labels, all_predicted)

    return accuracy, cm, precision, recall, f1

accuracy_baseline, cm_baseline, precision_baseline, recall_baseline, f1_baseline = evaluate_attacker_model(attacker_model_baseline, new_loader)
print(f'Baseline Attacker Model:')
print(f'Training Accuracy: {accuracy_baseline:.4f}')
print(f'Confusion Matrix:\n{cm_baseline}')
print(f'Precision: {precision_baseline:.4f}')
print(f'Recall: {recall_baseline:.4f}')
print(f'F1 Score: {f1_baseline:.4f}')

accuracy_privacy, cm_privacy, precision_privacy, recall_privacy, f1_privacy = evaluate_attacker_model(attacker_model_privacy, new_loader)
print(f'Privacy-Enhanced Attacker Model:')
print(f'Training Accuracy: {accuracy_privacy:.4f}')
print(f'Confusion Matrix:\n{cm_privacy}')
print(f'Precision: {precision_privacy:.4f}')
print(f'Recall: {recall_privacy:.4f}')
print(f'F1 Score: {f1_privacy:.4f}')


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, TensorDataset
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
from sklearn.linear_model import LogisticRegression

class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(32 * 6 * 6, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CIFAR10Classifier()
state_dict = torch.load("model_state_dict.pth", map_location=device)
new_state_dict = {key.replace('_module.', ''): value for key, value in state_dict.items()}
model.load_state_dict(new_state_dict)
model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

DATA_ROOT = './cifar10'
BATCH_SIZE = 64

indices_file = 'list.txt'
with open(indices_file, 'r') as f:
    indices = [int(line.strip()) for line in f]

full_train_dataset = torchvision.datasets.CIFAR10(root=DATA_ROOT, train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root=DATA_ROOT, train=False, download=True, transform=transform)

train_indices_set = set(indices)
all_indices = set(range(len(full_train_dataset)))
other_indices = list(all_indices - train_indices_set)

train_dataset = Subset(full_train_dataset, indices[:len(indices)//2])
other_dataset = Subset(full_train_dataset, other_indices)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
other_loader = DataLoader(other_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

train_labels = torch.ones(len(train_dataset)).to(device)
other_labels = torch.zeros(len(other_dataset)).to(device)
test_labels = torch.zeros(len(test_dataset)).to(device)

def extract_features(model, dataloader):
    model.eval()
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            features.append(outputs)
    return torch.cat(features).to(device)

train_features = extract_features(model, train_loader)
other_features = extract_features(model, other_loader)
test_features = extract_features(model, test_loader)

combined_features = torch.cat((train_features, other_features, test_features))
combined_labels = torch.cat((train_labels, other_labels, test_labels))

new_dataset = TensorDataset(combined_features, combined_labels)
new_loader = DataLoader(new_dataset, batch_size=BATCH_SIZE, shuffle=True)

class BinaryClassifier(nn.Module):
    def __init__(self, input_dim):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

input_dim = combined_features.size(1)
binary_classifier = BinaryClassifier(input_dim).to(device)

def train_attacker_model(model, dataloader, epochs=10, lr=0.001):
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for features, labels in dataloader:
            features, labels = features.to(device), labels.to(device).float().unsqueeze(1)
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(dataloader)}")

    return model

binary_classifier = train_attacker_model(binary_classifier, new_loader, epochs=10)

binary_classifier.eval()
all_labels = []
all_predicted = []
correct = 0
total = 0

with torch.no_grad():
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device).float().unsqueeze(1)
        outputs = binary_classifier(features).squeeze()
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(predicted.cpu().numpy())

accuracy = correct / total
print(f'Training Accuracy: {accuracy:.4f}')

cm = confusion_matrix(all_labels, all_predicted)
precision = precision_score(all_labels, all_predicted)
recall = recall_score(all_labels, all_predicted)
f1 = f1_score(all_labels, all_predicted)

print(f'Confusion Matrix:\n{cm}')
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset, TensorDataset
from torchvision import datasets, transforms
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

# Define the CIFAR10Classifier model
class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(6272, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Load the unknown model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CIFAR10Classifier()
state_dict = torch.load("model_state_dict.pth", map_location=device)
new_state_dict = {key.replace('_module.', ''): value for key, value in state_dict.items()}
model.load_state_dict(new_state_dict)
model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

DATA_ROOT = '../cifar10'
BATCH_SIZE = 64

indices_file = 'list.txt'
with open(indices_file, 'r') as f:
    indices = [int(line.strip()) for line in f]

full_train_dataset = datasets.CIFAR10(root=DATA_ROOT, train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root=DATA_ROOT, train=False, download=True, transform=transform)

train_indices_set = set(indices)
all_indices = set(range(len(full_train_dataset)))
other_indices = list(all_indices - train_indices_set)

train_dataset = Subset(full_train_dataset, indices[:len(indices)//2])
other_dataset = Subset(full_train_dataset, other_indices)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
other_loader = DataLoader(other_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

train_labels = torch.ones(len(train_dataset)).to(device)
other_labels = torch.zeros(len(other_dataset)).to(device)
test_labels = torch.zeros(len(test_dataset)).to(device)

def extract_features(model, dataloader):
    model.eval()
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            features.append(outputs)
    return torch.cat(features).to(device)

train_features = extract_features(model, train_loader)
other_features = extract_features(model, other_loader)
test_features = extract_features(model, test_loader)

combined_features = torch.cat((train_features, other_features, test_features))
combined_labels = torch.cat((train_labels, other_labels, test_labels))

new_dataset = TensorDataset(combined_features, combined_labels)
new_loader = DataLoader(new_dataset, batch_size=BATCH_SIZE, shuffle=True)

class AttackerModel(nn.Module):
    def __init__(self, input_dim):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

input_dim = train_features.shape[1]
attacker_model_baseline = AttackerModel(input_dim).to(device)
attacker_model_privacy = AttackerModel(input_dim).to(device)

attacker_model_baseline.load_state_dict(torch.load('attacker_model_baseline.pth'))
attacker_model_privacy.load_state_dict(torch.load('attacker_model_privacy.pth'))

def evaluate_attacker_model(model, attack_loader):
    model.eval()
    correct = 0
    total = 0
    all_labels = []
    all_predicted = []
    with torch.no_grad():
        for inputs, labels in attack_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_predicted.extend(predicted.cpu().numpy())
    accuracy = 100 * correct / total
    cm = confusion_matrix(all_labels, all_predicted)
    precision = precision_score(all_labels, all_predicted)
    recall = recall_score(all_labels, all_predicted)
    f1 = f1_score(all_labels, all_predicted)
    return accuracy, cm, precision, recall, f1

mia_accuracy_baseline, cm_baseline, precision_baseline, recall_baseline, f1_baseline = evaluate_attacker_model(attacker_model_baseline, new_loader)
mia_accuracy_privacy, cm_privacy, precision_privacy, recall_privacy, f1_privacy = evaluate_attacker_model(attacker_model_privacy, new_loader)

print(f"MIA Accuracy for Baseline Attacker Model: {mia_accuracy_baseline}%")
print(f"Confusion Matrix for Baseline Attacker Model:\n{cm_baseline}")
print(f"Precision for Baseline Attacker Model: {precision_baseline:.4f}")
print(f"Recall for Baseline Attacker Model: {recall_baseline:.4f}")
print(f"F1 Score for Baseline Attacker Model: {f1_baseline:.4f}")

print(f"MIA Accuracy for Privacy-Enhanced Attacker Model: {mia_accuracy_privacy}%")
print(f"Confusion Matrix for Privacy-Enhanced Attacker Model:\n{cm_privacy}")
print(f"Precision for Privacy-Enhanced Attacker Model: {precision_privacy:.4f}")
print(f"Recall for Privacy-Enhanced Attacker Model: {recall_privacy:.4f}")
print(f"F1 Score for Privacy-Enhanced Attacker Model: {f1_privacy:.4f}")
