In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, Subset
from torch import optim
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
import torchvision
import torchvision.transforms as transforms

# Define the transformation for the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# Load the CIFAR-10 dataset
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Split the training dataset into training (80%) and validation (20%) sets
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_data, val_data = random_split(train_dataset, [train_size, val_size])

# Create data loaders for training, validation, and testing
batch_size = 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define the model
class CIFAR10Classifier(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(dropout_rate)  # Variable dropout rate for privacy model
        self.fc1 = nn.Linear(6272, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Training function
def train_model(model, train_loader, val_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Validate the model
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss/len(val_loader)}, Accuracy: {100 * correct / total}%')

    return model

# Compute losses for a given model and dataset
def compute_losses(model, dataset):
    model.eval()
    losses = []
    criterion = nn.CrossEntropyLoss()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)
    with torch.no_grad():
        for images, labels in dataloader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
    return losses

# Train an attacker model using losses from train and test datasets
def train_attacker_model(losses_train, losses_test):
    X = losses_train + losses_test
    y = [1] * len(losses_train) + [0] * len(losses_test)
    clf = LogisticRegression(random_state=0).fit([[x] for x in X], y)
    return clf

# Perform cross-validation on the attacker model
def cross_val_score(clf, X, y, cv=5):
    skf = StratifiedKFold(n_splits=cv)
    scores = []
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = [X[i] for i in train_index], [X[i] for i in test_index]
        y_train, y_test = [y[i] for i in train_index], [y[i] for i in test_index]
        clf.fit([[x] for x in X_train], y_train)
        score = clf.score([[x] for x in X_test], y_test)
        scores.append(score)
    return sum(scores) / len(scores)

# Simulation Question 4: Use 80 percent of the CIFAR-10 training data to train your baseline model
baseline_model = CIFAR10Classifier()
baseline_model = train_model(baseline_model, train_loader, val_loader)

# Simulation Question 5: Train your baseline model with privacy enhancements (using higher dropout rate)
privacy_model = CIFAR10Classifier(dropout_rate=0.5)  # Modify dropout rate for privacy enhancement
privacy_model = train_model(privacy_model, train_loader, val_loader)

# Ensure the test accuracy difference between baseline and privacy-enhanced model is less than 15%
baseline_model.eval()
privacy_model.eval()

correct_baseline = 0
correct_privacy = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs_baseline = baseline_model(images)
        outputs_privacy = privacy_model(images)
        _, predicted_baseline = torch.max(outputs_baseline, 1)
        _, predicted_privacy = torch.max(outputs_privacy, 1)
        total += labels.size(0)
        correct_baseline += (predicted_baseline == labels).sum().item()
        correct_privacy += (predicted_privacy == labels).sum().item()

baseline_accuracy = 100 * correct_baseline / total
privacy_accuracy = 100 * correct_privacy / total
accuracy_difference = abs(baseline_accuracy - privacy_accuracy)

print(f'Baseline Model Test Accuracy: {baseline_accuracy}%')
print(f'Privacy Enhanced Model Test Accuracy: {privacy_accuracy}%')
print(f'Accuracy Difference: {accuracy_difference}%')

assert accuracy_difference < 15, "The test accuracy difference between the baseline model and the modified model is greater than 15%."

# Compute losses for baseline and privacy enhanced models
baseline_losses_train = compute_losses(baseline_model, train_data)
baseline_losses_test = compute_losses(baseline_model, test_dataset)
privacy_losses_train = compute_losses(privacy_model, train_data)
privacy_losses_test = compute_losses(privacy_model, test_dataset)

# Simulation Question 6: Train two Attacker Models based on MIA techniques learned in Phase 0
# Using 80 percent of the training data as your seen data, and the remaining training data along with the test data as your unseen data

# Split the training data into seen and unseen parts
seen_data_size = int(0.8 * len(train_data))
unseen_data_size = len(train_data) - seen_data_size
seen_data, unseen_data_train = random_split(train_data, [seen_data_size, unseen_data_size])

# Combine unseen training data with test data for unseen dataset
unseen_data = Subset(train_dataset, unseen_data_train.indices) + Subset(test_dataset, range(len(test_dataset)))

# Compute losses for seen and unseen datasets for baseline and privacy enhanced models
baseline_losses_seen = compute_losses(baseline_model, seen_data)
baseline_losses_unseen = compute_losses(baseline_model, unseen_data)
privacy_losses_seen = compute_losses(privacy_model, seen_data)
privacy_losses_unseen = compute_losses(privacy_model, unseen_data)

# Train attacker models
baseline_attacker = train_attacker_model(baseline_losses_seen, baseline_losses_unseen)
privacy_attacker = train_attacker_model(privacy_losses_seen, privacy_losses_unseen)

# Evaluate attacker models
baseline_attack_score = cross_val_score(baseline_attacker, baseline_losses_seen + baseline_losses_unseen, [1]*len(baseline_losses_seen) + [0]*len(baseline_losses_unseen), cv=5)
privacy_attack_score = cross_val_score(privacy_attacker, privacy_losses_seen + privacy_losses_unseen, [1]*len(privacy_losses_seen) + [0]*len(privacy_losses_unseen), cv=5)

print(f'Baseline Model MIA Accuracy: {baseline_attack_score}')
print(f'Privacy Enhanced Model MIA Accuracy: {privacy_attack_score}')

# Simulation Question 7: Improve your attacker models to achieve better MIA accuracy for both the baseline and modified models
# (e.g., by increasing the number of shadow models)

def train_shadow_models(model, num_shadow_models, train_data):
    shadow_models = []
    for _ in range(num_shadow_models):
        shadow_data_size = int(0.5 * len(train_data))
        shadow_data, _ = random_split(train_data, [shadow_data_size, len(train_data) - shadow_data_size])
        shadow_model = CIFAR10Classifier()
        shadow_model = train_model(shadow_model, DataLoader(shadow_data, batch_size=batch_size, shuffle=True), val_loader)
        shadow_models.append(shadow_model)
    return shadow_models

def compute_shadow_losses(shadow_models, dataset):
    all_losses = []
    for model in shadow_models:
        losses = compute_losses(model, dataset)
        all_losses.extend(losses)
    return all_losses

# Train more shadow models to improve MIA accuracy
num_shadow_models = 5
baseline_shadow_models = train_shadow_models(baseline_model, num_shadow_models, train_data)
privacy_shadow_models = train_shadow_models(privacy_model, num_shadow_models, train_data)

baseline_shadow_losses_seen = compute_shadow_losses(baseline_shadow_models, seen_data)
baseline_shadow_losses_unseen = compute_shadow_losses(baseline_shadow_models, unseen_data)
privacy_shadow_losses_seen = compute_shadow_losses(privacy_shadow_models, seen_data)
privacy_shadow_losses_unseen = compute_shadow_losses(privacy_shadow_models, unseen_data)

# Train improved attacker models
baseline_improved_attacker = train_attacker_model(baseline_shadow_losses_seen, baseline_shadow_losses_unseen)
privacy_improved_attacker = train_attacker_model(privacy_shadow_losses_seen, privacy_shadow_losses_unseen)

# Evaluate improved attacker models
baseline_improved_attack_score = cross_val_score(baseline_improved_attacker, baseline_shadow_losses_seen + baseline_shadow_losses_unseen, [1]*len(baseline_shadow_losses_seen) + [0]*len(baseline_shadow_losses_unseen), cv=5)
privacy_improved_attack_score = cross_val_score(privacy_improved_attacker, privacy_shadow_losses_seen + privacy_shadow_losses_unseen, [1]*len(privacy_shadow_losses_seen) + [0]*len(privacy_shadow_losses_unseen), cv=5)

print(f'Improved Baseline Model MIA Accuracy: {baseline_improved_attack_score}')
print(f'Improved Privacy Enhanced Model MIA Accuracy: {privacy_improved_attack_score}')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:10<00:00, 15876890.84it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified




Epoch [1/10], Loss: 1.7254384632110595, Validation Loss: 1.3675755239596032, Accuracy: 51.02%
Epoch [2/10], Loss: 1.457023900604248, Validation Loss: 1.251373583723785, Accuracy: 56.54%
Epoch [3/10], Loss: 1.342502292060852, Validation Loss: 1.11584126114086, Accuracy: 60.16%
Epoch [4/10], Loss: 1.2699505160331725, Validation Loss: 1.0908929925815316, Accuracy: 62.47%
Epoch [5/10], Loss: 1.2230305458068849, Validation Loss: 1.0559498261494242, Accuracy: 63.26%
Epoch [6/10], Loss: 1.1730464163780212, Validation Loss: 1.02546154465645, Accuracy: 64.23%
Epoch [7/10], Loss: 1.1430781538009644, Validation Loss: 1.0202108572243125, Accuracy: 64.41%
Epoch [8/10], Loss: 1.117153731918335, Validation Loss: 0.9985991332940994, Accuracy: 65.34%
Epoch [9/10], Loss: 1.0851118191719056, Validation Loss: 0.9915223831583739, Accuracy: 64.92%
Epoch [10/10], Loss: 1.0668078775405885, Validation Loss: 0.9968324094820934, Accuracy: 65.03%
Epoch [1/10], Loss: 1.7464744194030761, Validation Loss: 1.35912584



Epoch [1/10], Loss: 1.8475659647688698, Validation Loss: 1.5653636197375644, Accuracy: 45.07%
Epoch [2/10], Loss: 1.5763182274449747, Validation Loss: 1.383731957453831, Accuracy: 52.35%
Epoch [3/10], Loss: 1.4675977051067657, Validation Loss: 1.2830926840472374, Accuracy: 53.72%
Epoch [4/10], Loss: 1.3893273714632273, Validation Loss: 1.22070997497838, Accuracy: 56.31%
Epoch [5/10], Loss: 1.3168415979455455, Validation Loss: 1.176879437865725, Accuracy: 58.32%
Epoch [6/10], Loss: 1.2713214124734409, Validation Loss: 1.158451150557038, Accuracy: 58.89%
Epoch [7/10], Loss: 1.2320322807604513, Validation Loss: 1.139693812959513, Accuracy: 59.21%
Epoch [8/10], Loss: 1.1862862033965869, Validation Loss: 1.1150618514437585, Accuracy: 60.71%
Epoch [9/10], Loss: 1.1512825953693817, Validation Loss: 1.1138472963290609, Accuracy: 60.19%
Epoch [10/10], Loss: 1.1147949651788218, Validation Loss: 1.087233344081101, Accuracy: 61.77%
Epoch [1/10], Loss: 1.8583315389986617, Validation Loss: 1.4939233

تنها سل ۲ و ۳ مد نظر هستند.

In [None]:
# With dp sgd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, Subset, ConcatDataset
from torch import optim
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
import torchvision
import torchvision.transforms as transforms

# Define the transformation for the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# Load the CIFAR-10 dataset
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

# Split the training dataset into training (80%) and validation (20%) sets
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_data, val_data = random_split(train_dataset, [train_size, val_size])

# Create data loaders for training, validation, and testing
batch_size = 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define the model
class CIFAR10Classifier(nn.Module):
    def __init__(self, dropout_rate=0.5):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(dropout_rate)
        self.fc1 = nn.Linear(6272, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Training function
def train_model(model, train_loader, val_loader, num_epochs=10):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Validate the model
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for images, labels in val_loader:
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}, Validation Loss: {val_loss/len(val_loader)}, Accuracy: {100 * correct / total}%')
    return model

# Compute losses for a given model and dataset
def compute_losses(model, dataset):
    model.eval()
    losses = []
    criterion = nn.CrossEntropyLoss()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=False)
    with torch.no_grad():
        for images, labels in dataloader:
            outputs = model(images)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
    return losses

# Train an attacker model using losses from train and test datasets
# در واقعیت برای اینکه بتواند به مدل حمله کند، از میانگین یادگیری بر دو گروه
# مختلف دیتاست که از دیتاست منبع ساخته‌ایم استفاده می‌کنیم.
def train_attacker_model(losses_train, losses_test):
    X = losses_train + losses_test
    y = [1] * len(losses_train) + [0] * len(losses_test)
    clf = LogisticRegression(random_state=0).fit([[x] for x in X], y)
    return clf

# Perform cross-validation on the attacker model
def cross_val_score(clf, X, y, cv=5):
    skf = StratifiedKFold(n_splits=cv)
    scores = []
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = [X[i] for i in train_index], [X[i] for i in test_index]
        y_train, y_test = [y[i] for i in train_index], [y[i] for i in test_index]
        clf.fit([[x] for x in X_train], y_train)
        score = clf.score([[x] for x in X_test], y_test)
        scores.append(score)
    return sum(scores) / len(scores)

# Simulation Question 4: Use 80 percent of the CIFAR-10 training data to train your baseline model
baseline_model = CIFAR10Classifier()
baseline_model = train_model(baseline_model, train_loader, val_loader)

# Simulation Question 5: Train your baseline model with privacy enhancements (using higher dropout rate)
privacy_model = CIFAR10Classifier(dropout_rate=0.5)  # Modify dropout rate for privacy enhancement
privacy_model = train_model(privacy_model, train_loader, val_loader)

# Ensure the test accuracy difference between baseline and privacy-enhanced model is less than 15%
baseline_model.eval()
privacy_model.eval()

correct_baseline = 0
correct_privacy = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        outputs_baseline = baseline_model(images)
        outputs_privacy = privacy_model(images)
        _, predicted_baseline = torch.max(outputs_baseline, 1)
        _, predicted_privacy = torch.max(outputs_privacy, 1)
        total += labels.size(0)
        correct_baseline += (predicted_baseline == labels).sum().item()
        correct_privacy += (predicted_privacy == labels).sum().item()

baseline_accuracy = 100 * correct_baseline / total
privacy_accuracy = 100 * correct_privacy / total
accuracy_difference = abs(baseline_accuracy - privacy_accuracy)

print(f'Baseline Model Test Accuracy: {baseline_accuracy}%')
print(f'Privacy Enhanced Model Test Accuracy: {privacy_accuracy}%')
print(f'Accuracy Difference: {accuracy_difference}%')

assert accuracy_difference < 15, "The test accuracy difference between the baseline model and the modified model is greater than 15%."

# Compute losses for baseline and privacy enhanced models
baseline_losses_train = compute_losses(baseline_model, train_data)
baseline_losses_test = compute_losses(baseline_model, test_dataset)
privacy_losses_train = compute_losses(privacy_model, train_data)
privacy_losses_test = compute_losses(privacy_model, test_dataset)

# Simulation Question 6: Train two Attacker Models based on MIA techniques learned in Phase 0
# Using 80 percent of the training data as your seen data, and the remaining training data along with the test data as your unseen data

# Split the training data into seen and unseen parts
seen_data_size = int(0.8 * len(train_data))
unseen_data_size = len(train_data) - seen_data_size
seen_data, unseen_data_train = random_split(train_data, [seen_data_size, unseen_data_size])

# Combine unseen training data with test data for unseen dataset
unseen_data = ConcatDataset([Subset(train_dataset, unseen_data_train.indices), test_dataset])

# Compute losses for seen and unseen datasets for baseline and privacy enhanced models
baseline_losses_seen = compute_losses(baseline_model, seen_data)
baseline_losses_unseen = compute_losses(baseline_model, unseen_data)
privacy_losses_seen = compute_losses(privacy_model, seen_data)
privacy_losses_unseen = compute_losses(privacy_model, unseen_data)

# Train attacker models for both baseline and privacy enhanced models
baseline_attacker = train_attacker_model(baseline_losses_seen, baseline_losses_unseen)
privacy_attacker = train_attacker_model(privacy_losses_seen, privacy_losses_unseen)

# Evaluate attacker models
baseline_attack_score = cross_val_score(baseline_attacker, baseline_losses_seen + baseline_losses_unseen, [1]*len(baseline_losses_seen) + [0]*len(baseline_losses_unseen), cv=5)
privacy_attack_score = cross_val_score(privacy_attacker, privacy_losses_seen + privacy_losses_unseen, [1]*len(privacy_losses_seen) + [0]*len(privacy_losses_unseen), cv=5)

print(f'Baseline Model MIA Accuracy: {baseline_attack_score}')
print(f'Privacy Enhanced Model MIA Accuracy: {privacy_attack_score}')

# Simulation Question 7: Improve your attacker models to achieve better MIA accuracy for both the baseline and modified models
# (e.g., by increasing the number of shadow models)

def train_shadow_models(model, num_shadow_models, train_data):
    shadow_models = []
    for _ in range(num_shadow_models):
        shadow_train_data, shadow_val_data = random_split(train_data, [int(0.8*len(train_data)), int(0.2*len(train_data))])
        shadow_model = model.__class__()  # Create a new instance of the model
        shadow_model = train_model(shadow_model, DataLoader(shadow_train_data, batch_size=batch_size, shuffle=True), DataLoader(shadow_val_data, batch_size=batch_size, shuffle=False))
        shadow_models.append(shadow_model)
    return shadow_models

num_shadow_models = 5
baseline_shadow_models = train_shadow_models(baseline_model, num_shadow_models, train_data)
privacy_shadow_models = train_shadow_models(privacy_model, num_shadow_models, train_data)

# Compute losses for shadow models
baseline_shadow_losses_train = []
baseline_shadow_losses_test = []
privacy_shadow_losses_train = []
privacy_shadow_losses_test = []

for shadow_model in baseline_shadow_models:
    baseline_shadow_losses_train.extend(compute_losses(shadow_model, train_data))
    baseline_shadow_losses_test.extend(compute_losses(shadow_model, test_dataset))

for shadow_model in privacy_shadow_models:
    privacy_shadow_losses_train.extend(compute_losses(shadow_model, train_data))
    privacy_shadow_losses_test.extend(compute_losses(shadow_model, test_dataset))

# Train improved attacker models using shadow models
improved_baseline_attacker = train_attacker_model(baseline_shadow_losses_train, baseline_shadow_losses_test)
improved_privacy_attacker = train_attacker_model(privacy_shadow_losses_train, privacy_shadow_losses_test)

# Evaluate improved attacker models
improved_baseline_attack_score = cross_val_score(improved_baseline_attacker, baseline_shadow_losses_train + baseline_shadow_losses_test, [1]*len(baseline_shadow_losses_train) + [0]*len(baseline_shadow_losses_test), cv=5)
improved_privacy_attack_score = cross_val_score(improved_privacy_attacker, privacy_shadow_losses_train + privacy_shadow_losses_test, [1]*len(privacy_shadow_losses_train) + [0]*len(privacy_shadow_losses_test), cv=5)

print(f'Improved Baseline Model MIA Accuracy: {improved_baseline_attack_score}')
print(f'Improved Privacy Enhanced Model MIA Accuracy: {improved_privacy_attack_score}')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:03<00:00, 49671293.46it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified




Epoch [1/10], Loss: 1.735023225402832, Validation Loss: 1.369295563667443, Accuracy: 51.32%
Epoch [2/10], Loss: 1.4670258842468262, Validation Loss: 1.2091545523351925, Accuracy: 57.6%
Epoch [3/10], Loss: 1.346752587032318, Validation Loss: 1.1065926965634534, Accuracy: 61.62%
Epoch [4/10], Loss: 1.2856727949142457, Validation Loss: 1.0934479593471358, Accuracy: 62.44%
Epoch [5/10], Loss: 1.2272003404617309, Validation Loss: 1.0420442618382204, Accuracy: 63.7%
Epoch [6/10], Loss: 1.1788294010162355, Validation Loss: 1.0238931156267785, Accuracy: 64.54%
Epoch [7/10], Loss: 1.1528226181983947, Validation Loss: 1.0019858984430885, Accuracy: 64.89%
Epoch [8/10], Loss: 1.1196374527931214, Validation Loss: 1.0026190155630659, Accuracy: 64.71%
Epoch [9/10], Loss: 1.093772953414917, Validation Loss: 0.9944881075506757, Accuracy: 65.48%


In [None]:
pip install gdown



In [None]:
import gdown
from torchvision import models
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.datasets import CIFAR10
from sklearn.metrics import confusion_matrix, precision_score, recall_score ,f1_score
from torch.utils.data import Subset, TensorDataset

# Define the CIFAR10Classifier model
class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(6272, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

# Download the files from Google Drive
gdown.download('https://drive.google.com/uc?id=1u8TtbKu-IYn4BM2fMxlKuSxebGm8t0a8', 'list.txt', quiet=False)
gdown.download('https://drive.google.com/uc?id=13JqLGTHtspZes2xc3JHPgszM2RkvJtsR', 'model_state_dict.pth', quiet=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the pre-trained model
model = CIFAR10Classifier()
state_dict = torch.load("model_state_dict.pth", map_location=device)
new_state_dict = {key.replace('_module.', ''): value for key, value in state_dict.items()}
model.load_state_dict(new_state_dict)
model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

DATA_ROOT = '../cifar10'
BATCH_SIZE = 64

# Load the indices from list.txt
indices_file = 'list.txt'
with open(indices_file, 'r') as f:
    indices = [int(line.strip()) for line in f]

full_train_dataset = CIFAR10(root=DATA_ROOT, train=True, download=True, transform=transform)
test_dataset = CIFAR10(root=DATA_ROOT, train=False, download=True, transform=transform)

train_indices_set = set(indices)
all_indices = set(range(len(full_train_dataset)))
other_indices = list(all_indices - train_indices_set)

train_dataset = Subset(full_train_dataset, indices[:len(indices)//2])
other_dataset = Subset(full_train_dataset, other_indices)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False)
other_loader = DataLoader(other_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Create labels
train_labels = torch.ones(len(train_dataset)).to(device)
other_labels = torch.zeros(len(other_dataset)).to(device)
test_labels = torch.zeros(len(test_dataset)).to(device)

def extract_features(model, dataloader):
    model.eval()
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            features.append(outputs)
    return torch.cat(features).to(device)

train_features = extract_features(model, train_loader)
other_features = extract_features(model, other_loader)
test_features = extract_features(model, test_loader)

combined_features = torch.cat((train_features, other_features, test_features))
combined_labels = torch.cat((train_labels, other_labels, test_labels))

new_dataset = TensorDataset(combined_features, combined_labels)
new_loader = DataLoader(new_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Define and train the attacker model
class AttackerModel(nn.Module):
    def __init__(self):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(10, 64)
        self.fc2 = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

attacker = AttackerModel().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(attacker.parameters(), lr=0.001)

EPOCHS = 10

for epoch in range(EPOCHS):
    attacker.train()
    running_loss = 0.0
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = attacker(features).squeeze()
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch {epoch + 1}, Loss: {running_loss / len(new_loader):.4f}')

# Evaluate the attacker model
attacker.eval()
all_labels = []
all_predicted = []
correct = 0
total = 0

with torch.no_grad():
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device)
        outputs = attacker(features).squeeze()
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(predicted.cpu().numpy())

accuracy = correct / total
print(f'Training Accuracy: {accuracy:.4f}')

cm = confusion_matrix(all_labels, all_predicted)
precision = precision_score(all_labels, all_predicted)
recall = recall_score(all_labels, all_predicted)
f1 = f1_score(all_labels, all_predicted)

print(f'Confusion Matrix:\n{cm}')
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Downloading...
From: https://drive.google.com/uc?id=1u8TtbKu-IYn4BM2fMxlKuSxebGm8t0a8
To: /content/list.txt
100%|██████████| 231k/231k [00:00<00:00, 39.1MB/s]
Downloading...
From: https://drive.google.com/uc?id=13JqLGTHtspZes2xc3JHPgszM2RkvJtsR
To: /content/model_state_dict.pth
100%|██████████| 1.63M/1.63M [00:00<00:00, 59.2MB/s]


Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ../cifar10/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:03<00:00, 48786921.28it/s]


Extracting ../cifar10/cifar-10-python.tar.gz to ../cifar10
Files already downloaded and verified




Epoch 1, Loss: 0.6783
Epoch 2, Loss: 0.6702
Epoch 3, Loss: 0.6688
Epoch 4, Loss: 0.6672
Epoch 5, Loss: 0.6667
Epoch 6, Loss: 0.6660
Epoch 7, Loss: 0.6654
Epoch 8, Loss: 0.6648
Epoch 9, Loss: 0.6645
Epoch 10, Loss: 0.6645
Training Accuracy: 0.5917
Confusion Matrix:
[[ 9361 10639]
 [ 5692 14308]]
Precision: 0.5735
Recall: 0.7154
F1 Score: 0.6367


In [None]:
import gdown
from torchvision import models
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision.datasets import CIFAR10
from sklearn.metrics import confusion_matrix, precision_score, recall_score ,f1_score
from torch.utils.data import Subset, TensorDataset

# Download the files from Google Drive
gdown.download('https://drive.google.com/uc?id=1u8TtbKu-IYn4BM2fMxlKuSxebGm8t0a8', 'list.txt', quiet=False)
gdown.download('https://drive.google.com/uc?id=13JqLGTHtspZes2xc3JHPgszM2RkvJtsR', 'model_state_dict.pth', quiet=False)

class CIFAR10Classifier(nn.Module):
    def __init__(self):
        super(CIFAR10Classifier, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(6272, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return x

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load the pre-trained model
model = CIFAR10Classifier()
state_dict = torch.load("model_state_dict.pth", map_location=device)
new_state_dict = {key.replace('_module.', ''): value for key, value in state_dict.items()}
model.load_state_dict(new_state_dict)
model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

DATA_ROOT = '../cifar10'
BATCH_SIZE = 64

# Load the indices from list.txt
indices_file = 'list.txt'
with open(indices_file, 'r') as f:
    indices = [int(line.strip()) for line in f]

full_train_dataset = CIFAR10(root=DATA_ROOT, train=True, download=True, transform=transform)
test_dataset = CIFAR10(root=DATA_ROOT, train=False, download=True, transform=transform)

train_indices_set = set(indices)
all_indices_set = set(range(len(full_train_dataset)))
other_indices = list(all_indices_set - train_indices_set)

train_dataset = Subset(full_train_dataset, indices[:len(indices)//2])
other_dataset = Subset(full_train_dataset, other_indices)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
other_loader = DataLoader(other_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Create labels
train_labels = torch.ones(len(train_dataset)).to(device)
other_labels = torch.zeros(len(other_dataset)).to(device)
test_labels = torch.zeros(len(test_dataset)).to(device)

def extract_features(model, dataloader):
    model.eval()
    features = []
    with torch.no_grad():
        for data in dataloader:
            inputs, _ = data
            inputs = inputs.to(device)
            outputs = model(inputs)
            features.append(outputs)
    return torch.cat(features).to(device)

train_features = extract_features(model, train_loader)
other_features = extract_features(model, other_loader)
test_features = extract_features(model, test_loader)

combined_features = torch.cat((train_features, other_features, test_features))
combined_labels = torch.cat((train_labels, other_labels, test_labels))

new_dataset = TensorDataset(combined_features, combined_labels)
new_loader = DataLoader(new_dataset, batch_size=BATCH_SIZE, shuffle=True)

# Define and train the attacker model
class AttackerModel(nn.Module):
    def __init__(self):
        super(AttackerModel, self).__init__()
        self.fc1 = nn.Linear(10, 128)  # Increased hidden layer size
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.sigmoid(self.fc3(x))
        return x

attacker = AttackerModel().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(attacker.parameters(), lr=0.0001)  # Adjusted learning rate

EPOCHS = 20  # Increased number of epochs

for epoch in range(EPOCHS):
    attacker.train()
    running_loss = 0.0
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = attacker(features).squeeze()
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch {epoch + 1}, Loss: {running_loss / len(new_loader):.4f}')

# Evaluate the attacker model
attacker.eval()
all_labels = []
all_predicted = []
correct = 0
total = 0

with torch.no_grad():
    for features, labels in new_loader:
        features, labels = features.to(device), labels.to(device)
        outputs = attacker(features).squeeze()
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_predicted.extend(predicted.cpu().numpy())

accuracy = correct / total
print(f'Training Accuracy: {accuracy:.4f}')

cm = confusion_matrix(all_labels, all_predicted)
precision = precision_score(all_labels, all_predicted)
recall = recall_score(all_labels, all_predicted)
f1 = f1_score(all_labels, all_predicted)

print(f'Confusion Matrix:\n{cm}')
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")


Downloading...
From: https://drive.google.com/uc?id=1u8TtbKu-IYn4BM2fMxlKuSxebGm8t0a8
To: /content/list.txt
100%|██████████| 231k/231k [00:00<00:00, 59.5MB/s]
Downloading...
From: https://drive.google.com/uc?id=13JqLGTHtspZes2xc3JHPgszM2RkvJtsR
To: /content/model_state_dict.pth
100%|██████████| 1.63M/1.63M [00:00<00:00, 74.5MB/s]


Files already downloaded and verified
Files already downloaded and verified




Epoch 1, Loss: 0.6820
Epoch 2, Loss: 0.6738
Epoch 3, Loss: 0.6704
Epoch 4, Loss: 0.6687
Epoch 5, Loss: 0.6677
Epoch 6, Loss: 0.6667
Epoch 7, Loss: 0.6659
Epoch 8, Loss: 0.6653
Epoch 9, Loss: 0.6646
Epoch 10, Loss: 0.6643
Epoch 11, Loss: 0.6639
Epoch 12, Loss: 0.6636
Epoch 13, Loss: 0.6629
Epoch 14, Loss: 0.6627
Epoch 15, Loss: 0.6624
Epoch 16, Loss: 0.6620
Epoch 17, Loss: 0.6616
Epoch 18, Loss: 0.6616
Epoch 19, Loss: 0.6613
Epoch 20, Loss: 0.6611
Training Accuracy: 0.5955
Confusion Matrix:
[[10701  9299]
 [ 6883 13117]]
Precision: 0.5852
Recall: 0.6559
F1 Score: 0.6185
