Import Libraries

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
import torchvision

import os
import math
import argparse
import csv
import random
import time
import numpy as np

from backpack import backpack, extend
from backpack.extensions import BatchGrad



ResNet Model

In [None]:
import torch
import torch.nn as nn
import math

# Global variable for the number of groups used in Group Normalization.
gn_groups = 4

def conv3x3(in_planes, out_planes, stride=1):
    """
    3x3 convolution with padding.
    
    Args:
        in_planes (int): Number of input channels.
        out_planes (int): Number of output channels.
        stride (int, optional): Stride for the convolution. Default is 1.
    
    Returns:
        nn.Conv2d: A 3x3 convolution layer with the specified parameters.
    """
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=1, bias=False)

class BasicBlock(nn.Module):
    """
    A basic residual block that forms the building component of ResNet.
    
    It consists of two 3x3 convolutional layers each followed by Group Normalization 
    and ReLU activation. If the dimensions need to be matched, a downsampling operation 
    is applied to the identity branch.
    """
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        """
        Initialize the BasicBlock module.
        
        Args:
            inplanes (int): Number of channels in the input tensor.
            planes (int): Number of channels produced by the block.
            stride (int, optional): Stride for the first convolutional layer. Defaults to 1.
            downsample (nn.Module, optional): Optional downsampling module to match dimensions.
        """
        super(BasicBlock, self).__init__()
        # First convolutional layer with a 3x3 kernel.
        self.conv1 = conv3x3(inplanes, planes, stride)
        # Group Normalization applied after the first convolution.
        self.gn1 = nn.GroupNorm(gn_groups, planes, affine=False)
        # ReLU activation (inplace set to False for clarity).
        self.relu = nn.ReLU(inplace=False)
        # Second convolutional layer.
        self.conv2 = conv3x3(planes, planes)
        # Group Normalization applied after the second convolution.
        self.gn2 = nn.GroupNorm(gn_groups, planes, affine=False)

        # Downsampling module if dimension matching is required.
        self.downsample = downsample

    def forward(self, x):
        """
        Forward pass of the BasicBlock.
        
        Args:
            x (torch.Tensor): Input tensor.
        
        Returns:
            torch.Tensor: Output tensor after applying residual connections.
        """
        identity = x

        # First convolution -> group normalization -> ReLU activation.
        out = self.conv1(x)
        out = self.gn1(out)
        out = self.relu(out)

        # Second convolution -> group normalization.
        out = self.conv2(out)
        out = self.gn2(out)

        # If downsampling is set, adjust the identity branch.
        if self.downsample is not None:
            identity = self.downsample(x)
            # Concatenate zeros along the channel dimension to match the output dimensions.
            identity = torch.cat((identity, torch.zeros_like(identity)), 1).clone()

        # Add the residual connection and apply the final activation.
        out = out + identity
        out = self.relu(out)

        return out

class ResNet(nn.Module):
    """
    Implementation of a Residual Network (ResNet).
    
    The network includes an initial convolutional layer followed by multiple residual layers,
    adaptive average pooling, and a final fully connected layer for classification.
    """
    def __init__(self, block, layers, num_classes=10):
        """
        Initialize the ResNet model.
        
        Args:
            block (nn.Module): Type of residual block to use (e.g., BasicBlock).
            layers (list): A list defining the number of blocks in each residual layer.
            num_classes (int, optional): Number of classes for classification. Defaults to 10.
        """
        super(ResNet, self).__init__()

        self.num_layers = sum(layers)
        self.inplanes = 16
        # Initial 3x3 convolution layer for input images (assumes 3-channel input).
        self.conv1 = conv3x3(3, 16)
        self.gn1 = nn.GroupNorm(gn_groups, 16, affine=False)
        self.relu = nn.ReLU(inplace=False)
        # Build the residual layers.
        self.layer1 = self._make_layer(block, 16, layers[0])
        self.layer2 = self._make_layer(block, 32, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 64, layers[2], stride=2)
        # Global adaptive average pooling to obtain fixed-size feature maps.
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # Fully connected layer mapping features to class scores.
        self.fc = nn.Linear(64, num_classes)

        # Standard weight initialization.
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # He initialization for convolutional layers.
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.GroupNorm):
                # Initialize GroupNorm's weight to 1 and bias to 0.
                try:
                    m.weight.data.fill_(1)
                    m.bias.data.zero_()
                except:
                    pass

    def _make_layer(self, block, planes, blocks, stride=1):
        """
        Create a residual layer composed of multiple blocks.
        
        Args:
            block (nn.Module): The block type to be used.
            planes (int): Number of output channels for this layer.
            blocks (int): Number of blocks to be stacked.
            stride (int, optional): Stride for the first block. Defaults to 1.
        
        Returns:
            nn.Sequential: Sequential container of residual blocks.
        """
        downsample = None
        # If stride is not 1, a downsampling operation is required to adjust dimensions.
        if stride != 1:
            downsample = nn.Sequential(
                nn.AvgPool2d(1, stride=stride),
                nn.GroupNorm(gn_groups, self.inplanes, affine=False),
            )

        layers = []
        # The first block may include downsampling.
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes
        # Add subsequent blocks without downsampling.
        for _ in range(1, blocks):
            layers.append(block(planes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        """
        Forward pass of the ResNet model.
        
        Args:
            x (torch.Tensor): Input tensor.
        
        Returns:
            torch.Tensor: Output tensor containing class scores.
        """
        # Initial convolution -> normalization -> activation.
        x = self.conv1(x)
        x = self.gn1(x)
        x = self.relu(x)

        # Pass through the residual layers.
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        # Global pooling and flattening.
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        # Final classification layer.
        x = self.fc(x)

        return x

def resnet20():
    """
    Constructs a ResNet-20 model.
    
    Returns:
        ResNet: An instance of the ResNet model with 20 layers.
    """
    model = ResNet(BasicBlock, [3, 3, 3])
    return model


Utils

In [None]:
# import torch
# import torch.nn as nn
# import torchvision
# import torchvision.transforms as transforms
# from sklearn.decomposition import PCA

# import os

# import numpy as np

# from rdp_accountant import compute_rdp, get_privacy_spent

# def process_grad_batch(params, clipping=1):
#     n = params[0].grad_batch.shape[0]
#     grad_norm_list = torch.zeros(n).cuda()
#     for p in params: 
#         flat_g = p.grad_batch.reshape(n, -1)
#         current_norm_list = torch.norm(flat_g, dim=1)
#         grad_norm_list += torch.square(current_norm_list)
#     grad_norm_list = torch.sqrt(grad_norm_list)
#     scaling = clipping/grad_norm_list
#     scaling[scaling>1] = 1

#     for p in params:
#         p_dim = len(p.shape)
#         scaling = scaling.view([n] + [1]*p_dim)
#         p.grad_batch *= scaling
#         p.grad = torch.mean(p.grad_batch, dim=0)
#         p.grad_batch.mul_(0.)

# def get_data_loader(dataset, batchsize):
#     transform_train = transforms.Compose([
#     # transforms.RandomCrop(32, padding=4),
#     # transforms.RandomHorizontalFlip(),
#     transforms.ToTensor(),
#     transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
#     ])
#     transform_test = transforms.Compose([
#         transforms.ToTensor(),
#         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
#     ])

#     trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train) 
#     trainloader = torch.utils.data.DataLoader(trainset, batch_size=batchsize, shuffle=True, num_workers=2)

#     testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test) 
#     testloader = torch.utils.data.DataLoader(testset, batch_size=batchsize, shuffle=False, num_workers=2)
#     return trainloader, testloader, len(trainset), len(testset)

# def loop_for_sigma(q, T, eps, delta, cur_sigma, interval, rdp_orders=32, rgp=True):
#     while True:
#         orders = np.arange(2, rdp_orders, 0.1)
#         steps = T
#         if(rgp):
#             rdp = compute_rdp(q, cur_sigma, steps, orders) * 2 ## when using residual gradients, the sensitivity is sqrt(2)
#         else:
#             rdp = compute_rdp(q, cur_sigma, steps, orders)
#         cur_eps, _, opt_order = get_privacy_spent(orders, rdp, target_delta=delta)
#         if(cur_eps<eps and cur_sigma>interval):
#             cur_sigma -= interval
#             previous_eps = cur_eps
#         else:
#             cur_sigma += interval
#             break    
#     return cur_sigma, previous_eps

# def get_sigma(q, T, eps, delta, init_sigma=10, interval=1., rgp=True):
#     cur_sigma = init_sigma
    
#     cur_sigma, _ = loop_for_sigma(q, T, eps, delta, cur_sigma, interval, rgp=rgp)
#     interval /= 10
#     cur_sigma, _ = loop_for_sigma(q, T, eps, delta, cur_sigma, interval, rgp=rgp)
#     interval /= 10
#     cur_sigma, previous_eps = loop_for_sigma(q, T, eps, delta, cur_sigma, interval, rgp=rgp)
#     return cur_sigma, previous_eps

# def get_lr_scheduler(optimizer, epochs):
#     def lr_lamda(epoch):
#         if epoch < 10:
#             return 0.1 - (0.1 - 0.052) * (epoch / 10)
#         else :
#             return 0.052
#     return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lamda)

Train and Test

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.backends.cudnn as cudnn
import time
import numpy as np
import matplotlib.pyplot as plt

from utils import get_data_loader, get_sigma, checkpoint, adjust_learning_rate, process_grad_batch

from backpack import backpack, extend
from backpack.extensions import BatchGrad

def train(epoch, net, trainloader, optimizer, loss_func, n_training, batchsize, clip, noise_multiplier, private=True):
    """
    Train the neural network for one epoch.

    This function performs a single training epoch using either differentially private 
    or standard training. When private is True, per-sample gradients are computed using 
    Backpack, followed by gradient clipping and adding Gaussian noise to enforce 
    differential privacy.

    Args:
        epoch (int): Current training epoch number.
        net (nn.Module): The neural network model.
        trainloader (DataLoader): DataLoader for the training data.
        optimizer (Optimizer): Optimizer for updating the network parameters.
        loss_func (function): Loss function used to compute training loss.
        n_training (int): Total number of training examples.
        batchsize (int): Batch size used for training.
        clip (float): Clipping threshold for gradients.
        noise_multiplier (float): Multiplier for the Gaussian noise added to gradients.
        private (bool, optional): Whether to use differentially private training. Default is True.

    Returns:
        tuple: (average training loss per step, training accuracy)
    """
    print('\nEpoch: %d' % epoch)
    net.train()
    train_loss = 0
    correct = 0
    total = 0
    t0 = time.time()
    # Compute the number of steps per epoch based on the training set size and batchsize.
    steps = n_training // batchsize

    loader = iter(trainloader)

    for batch_idx in range(steps):
        inputs, targets = next(loader)
        inputs, targets = inputs.cuda(), targets.cuda()

        if private:
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = loss_func(outputs, targets)
            # Compute per-sample gradients using Backpack for differential privacy.
            with backpack(BatchGrad()):
                loss.backward()
                # Apply gradient clipping on the per-sample gradients.
                process_grad_batch(list(net.parameters()), clip)
                # Add Gaussian noise to each parameter's gradient.
                for p in net.parameters():
                    grad_noise = torch.normal(
                        0, noise_multiplier * clip / batchsize,
                        size=p.grad.shape, device=p.grad.device
                    )
                    p.grad.data += grad_noise
        else:
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = loss_func(outputs, targets)
            loss.backward()
            # If not using private training, remove per-sample gradients if present.
            try:
                for p in net.parameters():
                    del p.grad_batch
            except Exception:
                pass
        
        optimizer.step()
        step_loss = loss.item()
        if private:
            # Normalize the loss by the number of samples in the batch.
            step_loss /= inputs.shape[0]
        
        train_loss += step_loss
        # Compute the number of correct predictions.
        _, predicted = torch.max(outputs.data, 1)
        total += targets.size(0)
        correct += predicted.eq(targets.data).float().cpu().sum()
        acc = 100. * float(correct) / float(total)
    
    t1 = time.time()
    print('Train loss: %.5f' % (train_loss / (batch_idx + 1)), 'time: %d s' % (t1 - t0), 'train acc:', acc, end=' ')
    return (train_loss / batch_idx, acc)


def test(epoch, net, testloader, loss_func, private=True):
    """
    Evaluate the neural network on the test dataset.

    This function calculates the average loss and accuracy over the test set and saves a 
    checkpoint if the current accuracy exceeds the best accuracy seen so far.

    Args:
        epoch (int): Current epoch number.
        net (nn.Module): The neural network model.
        testloader (DataLoader): DataLoader for the test dataset.
        loss_func (function): Loss function used to compute test loss.
        private (bool, optional): Whether the model was trained with differential privacy. Default is True.

    Returns:
        tuple: (average test loss per step, test accuracy)
    """
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    all_correct = []
    
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.cuda(), targets.cuda()
            outputs = net(inputs)
            loss = loss_func(outputs, targets)
            step_loss = loss.item()
            
            if private:
                # Normalize the loss by the batch size when using private training.
                step_loss /= inputs.shape[0]

            test_loss += step_loss 
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct_idx = predicted.eq(targets.data).cpu()
            all_correct += correct_idx.numpy().tolist()
            correct += correct_idx.sum()

        acc = 100. * float(correct) / float(total)
        print('test loss: %.5f' % (test_loss / (batch_idx + 1)), 'test acc:', acc)
        
        # Save a checkpoint if the current accuracy is higher than the best accuracy.
        if acc > best_acc:
            best_acc = acc
            checkpoint(net, acc, epoch, "resnet20_cifar10")

    return (test_loss / batch_idx, acc)


In [None]:
def main():
    """
    Main function to train and evaluate models with and without differential privacy on CIFAR-10.
    
    This function sets the training parameters, computes the noise scale to meet the privacy budget,
    trains two models (one with DP and one without), records performance metrics, and plots the learning curves.
    """
    # Set training parameters.
    use_cuda = True
    batchsize = 1000
    n_epoch = 100
    eps = 8.0
    delta = 1e-5
    clip = 5.0
    lr = 0.1
    momentum = 0.9
    weight_decay = 0.0
    
    # Lists to store metrics for plotting.
    train_acc_dp = []
    test_acc_dp = []
    train_acc_no_dp = []
    test_acc_no_dp = []
    epochs = []
    
    # Load CIFAR-10 data.
    trainloader, testloader, n_training, n_test = get_data_loader('cifar10', batchsize=batchsize)
    print('# of training examples:', n_training, '# of testing examples:', n_test)
    
    # Compute the noise scale based on the target privacy budget.
    print('\n==> Computing noise scale for privacy budget (%.1f, %f)-DP' % (eps, delta))
    sampling_prob = batchsize / n_training
    steps = int(n_epoch / sampling_prob)
    sigma, eps = get_sigma(sampling_prob, steps, eps, delta, rgp=False)
    noise_multiplier = sigma
    print('noise scale:', noise_multiplier, 'privacy guarantee:', eps)
    
    # Train the model with differential privacy.
    print('\n==> Creating and training model with DP')
    global best_acc
    best_acc = 0
    
    net_dp = resnet20()
    net_dp.cuda()
    net_dp = extend(net_dp)
    
    # Display total number of model parameters.
    num_params = sum(p.numel() for p in net_dp.parameters())
    print('total number of parameters:', num_params / (10**6), 'M')
    
    # Use CrossEntropyLoss with sum reduction for proper scaling.
    loss_func_dp = nn.CrossEntropyLoss(reduction='sum')
    loss_func_dp = extend(loss_func_dp)
    
    # Set the optimizer for the DP model.
    optimizer_dp = optim.SGD(net_dp.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    
    print('\n==> Start training with DP')
    for epoch in range(n_epoch):
        epochs.append(epoch)
        current_lr = adjust_learning_rate(optimizer_dp, lr, epoch, all_epoch=n_epoch)
        train_loss, train_acc = train(epoch, net_dp, trainloader, optimizer_dp, loss_func_dp,
                                      n_training, batchsize, clip, noise_multiplier, private=True)
        test_loss, test_acc = test(epoch, net_dp, testloader, loss_func_dp, private=True)
        
        # Store metrics for plotting.
        train_acc_dp.append(train_acc)
        test_acc_dp.append(test_acc)
    
    # Reset best accuracy for the non-DP model.
    best_acc = 0
    
    # Train the model without differential privacy.
    print('\n==> Creating and training model without DP')
    net_no_dp = resnet20()
    net_no_dp.cuda()
    
    # Use standard mean reduction in loss for non-DP training.
    loss_func_no_dp = nn.CrossEntropyLoss(reduction='mean')
    
    optimizer_no_dp = optim.SGD(net_no_dp.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    
    print('\n==> Start training without DP')
    for epoch in range(n_epoch):
        current_lr = adjust_learning_rate(optimizer_no_dp, lr, epoch, all_epoch=n_epoch)
        train_loss, train_acc = train(epoch, net_no_dp, trainloader, optimizer_no_dp, loss_func_no_dp,
                                      n_training, batchsize, clip, noise_multiplier, private=False)
        test_loss, test_acc = test(epoch, net_no_dp, testloader, loss_func_no_dp, private=False)
        
        # Store metrics.
        train_acc_no_dp.append(train_acc)
        test_acc_no_dp.append(test_acc)
    
    # Plot the learning curves and final results.
    plot_learning_curves(epochs, train_acc_dp, test_acc_dp, train_acc_no_dp, test_acc_no_dp)

def plot_learning_curves(epochs, train_acc_dp, test_acc_dp, train_acc_no_dp, test_acc_no_dp):
    """
    Plot training and testing accuracy curves as well as the accuracy gap between training and testing,
    comparing models trained with DP versus without DP.
    
    Generates and saves two figures:
      1. A line plot comparing accuracies over epochs.
      2. A bar chart summarizing the final accuracies.
    """
    # Figure 1: Accuracy curves.
    plt.figure(figsize=(15, 6))
    
    # Subplot: Training and testing accuracies.
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_acc_dp, 'o-', color='blue', label='Training with DP')
    plt.plot(epochs, train_acc_no_dp, 'o-', color='green', label='Training without DP')
    plt.plot(epochs, test_acc_dp, 'o-', color='orange', label='Testing with DP')
    plt.plot(epochs, test_acc_no_dp, 'o-', color='red', label='Testing without DP')
    plt.title('CIFAR-10: Accuracy vs. Epoch')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.grid(True)
    plt.legend()
    
    # Subplot: Train-Test accuracy gap.
    plt.subplot(1, 2, 2)
    gap_dp = [train - test for train, test in zip(train_acc_dp, test_acc_dp)]
    gap_no_dp = [train - test for train, test in zip(train_acc_no_dp, test_acc_no_dp)]
    plt.plot(epochs, gap_dp, 'o-', color='purple', label='Train-Test Gap with DP')
    plt.plot(epochs, gap_no_dp, 'o-', color='brown', label='Train-Test Gap without DP')
    plt.title('CIFAR-10: Train-Test Accuracy Gap')
    plt.xlabel('Epoch')
    plt.ylabel('Gap (%)')
    plt.grid(True)
    plt.legend()
    
    plt.tight_layout()
    plt.savefig('cifar10_accuracy_comparison.png', dpi=300)
    plt.close()
    
    # Figure 2: Final accuracy bar chart.
    plt.figure(figsize=(10, 6))
    final_accuracies = [
        train_acc_dp[-1], 
        test_acc_dp[-1], 
        train_acc_no_dp[-1], 
        test_acc_no_dp[-1]
    ]
    configurations = [
        'Training with DP', 
        'Testing with DP', 
        'Training without DP', 
        'Testing without DP'
    ]
    colors = ['blue', 'orange', 'green', 'red']
    plt.bar(configurations, final_accuracies, color=colors)
    plt.title('CIFAR-10: Final Accuracy Comparison')
    plt.ylabel('Accuracy (%)')
    plt.grid(axis='y')
    plt.xticks(rotation=15)
    for i, v in enumerate(final_accuracies):
        plt.text(i, v + 1, f"{v:.2f}%", ha='center')
    
    plt.tight_layout()
    plt.savefig('cifar10_final_accuracy_comparison.png', dpi=300)
    plt.close()
    
    print("\n==== CIFAR-10 Results Summary ====")
    print(f"{'Configuration':<25} {'Accuracy':<15}")
    print("-" * 40)
    print(f"{'Training with DP':<25} {train_acc_dp[-1]:.2f}%")
    print(f"{'Testing with DP':<25} {test_acc_dp[-1]:.2f}%")
    print(f"{'Training without DP':<25} {train_acc_no_dp[-1]:.2f}%")
    print(f"{'Testing without DP':<25} {test_acc_no_dp[-1]:.2f}%")
    print(f"{'DP Train-Test Gap':<25} {train_acc_dp[-1] - test_acc_dp[-1]:.2f}%")
    print(f"{'Non-DP Train-Test Gap':<25} {train_acc_no_dp[-1] - test_acc_no_dp[-1]:.2f}%")
    print("\nPlots saved to cifar10_accuracy_comparison.png and cifar10_final_accuracy_comparison.png")

if __name__ == "__main__":
    main()

# of training examples:  50000 # of testing examples:  10000

==> Computing noise scale for privacy budget (8.0, 0.000010)-DP
noise scale:  1.2099999999999993 privacy guarantee:  7.956636368007988

==> Creating ResNet20 model instance
total number of parameters:  0.268346 M

==> Start training

Epoch: 0
Train loss:2.18650 time: 9 s train acc: 17.954 test loss:1.97736 test acc: 26.77

Epoch: 1
Train loss:1.91853 time: 9 s train acc: 29.02 test loss:1.89703 test acc: 31.19

Epoch: 2
Train loss:1.79896 time: 9 s train acc: 34.842 test loss:1.77825 test acc: 37.55

Epoch: 3
Train loss:1.73391 time: 9 s train acc: 38.904 test loss:1.67740 test acc: 41.95

Epoch: 4
Train loss:1.66901 time: 9 s train acc: 42.038 test loss:1.77989 test acc: 40.36

Epoch: 5
Train loss:1.63380 time: 9 s train acc: 43.756 test loss:1.61996 test acc: 45.39

Epoch: 6
Train loss:1.59258 time: 9 s train acc: 46.258 test loss:1.55403 test acc: 47.55

Epoch: 7
Train loss:1.57535 time: 9 s train acc: 47.546 test loss:1.