In [None]:
import time
import torch
import torchvision
import torchvision.transforms as transforms
from torch import nn, optim
from opacus import PrivacyEngine

In [None]:
'''
# Define data transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10 dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)
'''

import torch
import torchvision
import torchvision.transforms as transforms

# Define data transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load CIFAR-10 dataset
full_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

# Define the size of the training set
train_size = int(0.8 * len(full_trainset))
valid_size = len(full_trainset) - train_size

# Split the dataset into training and validation sets
trainset, validset = torch.utils.data.random_split(full_trainset, [train_size, valid_size])

# Create data loaders for training and validation sets
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, shuffle=True, num_workers=2)
validloader = torch.utils.data.DataLoader(validset, batch_size=64, shuffle=False, num_workers=2)

# Load the test set
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

In [None]:
# Define a simple off-the-shelf model (e.g., ResNet-18)
model = torchvision.models.resnet18(pretrained=False)
model.fc = nn.Linear(512, 10)  # Adapt output layer for CIFAR-10
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

# Add Opacus Privacy Engine to the optimizer
privacy_engine = PrivacyEngine(
    model,
    sample_rate=0.01,  # Adjust as needed
    target_epsilon=1.0,  # Adjust as needed
    target_delta=1e-5,  # Adjust as needed
    noise_multiplier=1.3,  # Adjust as needed
    max_grad_norm=0.1,  # Adjust as needed
)
privacy_engine.attach(optimizer)

# Train the model without DP
optimizer_no_dp = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)


In [None]:
def train_test(optimizer): 
    start_time = time.time()
    best_val_loss = float('inf') 
    num_epochs = 10
    # Train the model with DP
    for epoch in range(10):  # Adjust the number of epochs
        model.train()
        tqdm_train_loader = tqdm(train_loader_chest_full, desc=f'Epoch {epoch + 1}/{num_epochs}', dynamic_ncols=True)

        for data in tqdm:
            inputs, labels = data
            optimizer.zero_grad()

            outputs = model(inputs)

            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            tqdm_train_loader.set_description(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item():.4f}, PSNR: {psnr_value.item():.4f}, SSIM: {ssim_value.item():.4f}')

        tqdm_train_loader.close()  

        model.eval()  # Set the model to evaluation mode
            val_running_loss = 0.0
            with torch.no_grad():
                for i, batch in enumerate(val_loader):
                    # ... (validation code)
                    inputs, targets = batch

                    outputs = model(inputs)

                    # changing the target from 1D to 2D
                    # if target == 1 new_target = [1, 0]
                    # if target == 0 new_target = [0, 1]
                    targets = targets.squeeze()
                    target_2d = torch.zeros((len(targets), 2))
                    target_2d[targets == 1, 0] = 1
                    target_2d[targets == 0, 1] = 1

                    # compute loss
                    loss = criterion(outputs, target_2d.float())

                    val_running_loss += loss.item()


            # Compute and append the average validation loss for the epoch
            val_loss = val_running_loss / len(val_loader)
            val_loss_values.append(val_loss)
            print(f'Epoch [{epoch + 1}/{EPOCH_NUM}] - Train Loss: {train_loss:.4f} - Validation Loss: {val_loss:.4f}')

            # Check if the current validation loss is the best so far
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                best_model_state = model.state_dict()

        # Restore the model state with the lowest validation loss
        model.load_state_dict(best_model_state)

    # Evaluate the model on the test set
    correct = 0
    total = 0
    with torch.no_grad():
        for data in testloader:
            images, labels = data
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Print results for the differentially private case
    print(f'DP Accuracy: {100 * correct / total}%')
    print(f'ε: {privacy_engine.get_privacy_spent()[0]}')
    end_time = time.time()
    dp_training_time = end_time - start_time
    print(f'DP Training Time: {dp_training_time} seconds')

    # show training and validation curves
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(loss_values) + 1), loss_values, label='Training Loss')
    plt.plot(range(1, len(val_loss_values) + 1), val_loss_values, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Curves')
    plt.legend()
    plt.grid(True)
    plt.show

In [None]:
'''
start_time = time.time()

for epoch in range(10):  # Adjust the number of epochs
    for data in trainloader:
        inputs, labels = data
        optimizer_no_dp.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer_no_dp.step()

# Evaluate the model on the test set
correct_no_dp = 0
total_no_dp = 0
with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total_no_dp += labels.size(0)
        correct_no_dp += (predicted == labels).sum().item()

# Print results for the non-private case
print(f'Non-DP Accuracy: {100 * correct_no_dp / total_no_dp}%')
print('ε: inf (non-private)')
end_time = time.time()
non_dp_training_time = end_time - start_time
print(f'Non-DP Training Time: {non_dp_training_time} seconds')
'''
train_test(optimizerdp)
train_test(optimizernondp)