In [None]:
!pip install opacus

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import torch.nn.utils.prune as prune
import numpy as np
from opacus import PrivacyEngine
import time
import copy

## Model Hyperparameters

In [None]:
def select_dataset(name='MNIST'):
  if name == "MNIST":
    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,)),])

    full_train = datasets.MNIST(
        root="./data",
        train=True,
        download=True,
        transform=transform
    )

    train_size = 50000
    val_size = len(full_train) - train_size
    train_dataset, val_dataset = random_split(full_train, [train_size, val_size])

    test_dataset = datasets.MNIST(
        root="./data",
        train=False,
        download=True,
        transform=transform
    )

  elif name == "CIFAR":
    transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),])


    full_train = datasets.CIFAR10(
        root="./data",
        train=True,
        download=True,
        transform=transform
    )

    train_size = 40000
    val_size = len(full_train) - train_size
    train_dataset, val_dataset = random_split(full_train, [train_size, val_size])


    test_dataset = datasets.CIFAR10(
        root="./data",
        train=False,
        download=True,
        transform=transform
    )

  train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
  val_loader   = DataLoader(val_dataset, batch_size=256, shuffle=False)
  test_loader  = DataLoader(test_dataset, batch_size=256, shuffle=False)
  return train_loader, val_loader, test_loader

In [None]:
BATCH_SIZE = 128
LR = 0.01
EPOCHS = 20
SEED = 42
MAX_GRAD_NORM = 1.0
DELTA = 1e-5
PATIENCE = 5

DATASET_NAME = "MNIST"
train_loader, val_loader, test_loader = select_dataset(DATASET_NAME)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# For dynamically calculated epsilon
NOISE_MULTIPLIER = 1.0

# For a fixed privacy budget (eps, del)
TARGET_EPSILON = 8
TARGET_DELTA = 1e-5
NUM_EPOCHS = 20

PRUNE_EPOCHS = 2

torch.manual_seed(SEED)
np.random.seed(SEED)

In [None]:
# baseline model
class CNN(nn.Module):
    def __init__(self):
        super().__init__()

        #=======================================#
        #            MNIST Settings
        #=======================================#

        # self.conv1 = nn.Conv2d(1, 16, 3, 1)
        # self.conv2 = nn.Conv2d(16, 32, 3, 1)
        # self.fc1 = nn.Linear(32*12*12, 64)
        # self.fc2 = nn.Linear(64, 10)

        #=======================================#
        #            CIFAR-10 Settings
        #=======================================#

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(in_features=576, out_features=64)
        self.fc2 = nn.Linear(in_features=64, out_features=10)

    def forward(self, x):
        # x = F.relu(self.conv1(x))
        # x = F.relu(self.conv2(x))
        # x = F.max_pool2d(x, 2)
        # x = torch.flatten(x, 1)
        # x = F.relu(self.fc1(x))
        # return self.fc2(x)

        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = torch.flatten(x, 1)

        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [None]:
# MNIST Model
class CNN_MNIST(nn.Module):
    def __init__(self):
        super().__init__()

        #=======================================#
        #            MNIST Settings
        #=======================================#

        self.conv1 = nn.Conv2d(1, 16, 3, 1)
        self.conv2 = nn.Conv2d(16, 32, 3, 1)
        self.fc1 = nn.Linear(32*12*12, 64)
        self.fc2 = nn.Linear(64, 10)

    def forward(self, x):

        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        return self.fc2(x)


# CIFAR-10 Model

class CNN_CIFAR10(nn.Module):
    def __init__(self):
        super().__init__()

        #=======================================#
        #            CIFAR-10 Settings
        #=======================================#

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(in_features=576, out_features=64)
        self.fc2 = nn.Linear(in_features=64, out_features=10)

    def forward(self, x):

        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = torch.flatten(x, 1)

        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def train_one_epoch(model, loader, optimizer):
    # train loop
    model.train()
    total_loss = 0
    for batch_idx, (data, target) in enumerate(loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        out = model(data)
        loss = F.cross_entropy(out, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(loader)

In [None]:
@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    loss, correct = 0, 0
    for data, target in loader:
        data, target = data.to(device), target.to(device)
        out = model(data)
        loss += F.cross_entropy(out, target, reduction="sum").item()
        pred = out.argmax(1)
        correct += pred.eq(target).sum().item()
    loss /= len(loader.dataset)
    acc = 100. * correct / len(loader.dataset)
    return loss, acc

In [None]:
def weight_pruning(model, amount, return_mask=False, remove=True):
    parameters_to_prune = [
        (m, "weight") for m in model.modules()
        if isinstance(m, (nn.Conv2d, nn.Linear))
    ]

    prune.global_unstructured(
        parameters_to_prune,
        pruning_method=prune.L1Unstructured,
        amount=amount,
    )

    mask_dict = None
    if return_mask:
        mask_dict = {
            f"{name}.weight": module.weight_mask.detach().clone()
            for name, module in model.named_modules()
            if hasattr(module, "weight_mask")
        }

    if remove:
      for module, _ in parameters_to_prune:
          prune.remove(module, "weight")

    total = sum(p.numel() for p in model.parameters())
    zeros = sum((p == 0).sum().item() for p in model.parameters())
    sparsity = 100 * zeros / total

    print(f"Pruned model sparsity = {sparsity:.2f}%")

    return model, mask_dict

In [None]:
def calculate_sparsity(model):
    total = zeros = 0
    for name, p in model.named_parameters():
        if "weight" in name:
            arr = p.detach().cpu().numpy()
            total += arr.size
            zeros += (arr == 0).sum()
    return 100 * zeros / total

In [None]:
def run_experiment(dataset='MNIST', use_dp=False, pruning_type=None, final_sparsity=None, fixed_privacy_budget=True, prune_epochs=None):

    # setup model and optimizer
    if dataset == 'MNIST':
      model = CNN_MNIST().to(device)
    elif dataset == "CIFAR":
      model = CNN_CIFAR10().to(device)

    # Defaults to base CNN class if no dataset is specified
    else:
      model = CNN().to(device)

    optimizer = optim.SGD(model.parameters(), lr=LR, momentum=0.9)

    if pruning_type == "PRE":
      prune_start_time = time.time()
      model, _ = weight_pruning(model, final_sparsity)
      prune_end_time = time.time()
      prune_time = prune_end_time - prune_start_time
      final_sparsity = calculate_sparsity(model)

    privacy_engine = None
    if use_dp:

        if fixed_privacy_budget:

            privacy_engine = PrivacyEngine()

            # Calculates sigma based on target epsilon and delta
            model, optimizer, train_loader_dp = privacy_engine.make_private_with_epsilon(
              module=model,
              optimizer=optimizer,
              data_loader=train_loader,
              max_grad_norm=MAX_GRAD_NORM,
              target_delta=TARGET_DELTA,
              target_epsilon=TARGET_EPSILON,
              epochs=NUM_EPOCHS
        )
        else:

            privacy_engine = PrivacyEngine()

            # Otherwise takes sigma as a hyperparameter
            model, optimizer, train_loader_dp = privacy_engine.make_private(
              module=model,
              optimizer=optimizer,
              data_loader=train_loader,
              max_grad_norm=MAX_GRAD_NORM,
              noise_multiplier=NOISE_MULTIPLIER,
        )
    else:
        train_loader_dp = train_loader


    # train
    sparsity = 0
    prune_time = 0
    best_val_acc = 0
    epochs_no_improve = 0
    best_model_path = f"best_model{'_dp' if use_dp else ''}.pt"
    rewind_state = None
    global_mask = None
    if pruning_type == "LTH":
      prune_steps = max(1, EPOCHS - 2)
      train_epochs = EPOCHS
      lth_step = 0
    elif pruning_type == "POST":
      train_epochs = EPOCHS - PRUNE_EPOCHS
    else:
      train_epochs = EPOCHS

    start_time = time.time()

    for epoch in range(1, train_epochs + 1):
        train_loss = train_one_epoch(model, train_loader_dp, optimizer)
        val_loss, val_acc = evaluate(model, val_loader)

        eps = privacy_engine.get_epsilon(DELTA)  if use_dp else None

        print(f"[{'DP-SGD' if use_dp else 'Standard SGD'}] Epoch {epoch}: "
            + f"train_loss={train_loss:.4f}, "
            + f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}%"
            + f", ε={eps:.4f}" if use_dp else "")

        if pruning_type == "LTH" and epoch == 1:
            rewind_state = copy.deepcopy(model.state_dict())
            print("Saved early-rewind weights for LTH")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            epochs_no_improve = 0
            torch.save(model.state_dict(), best_model_path)
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= PATIENCE:
            print(f"\nEarly stopping at epoch {epoch} (no improvement for {PATIENCE} epochs).")
            break

        if pruning_type == "LTH" and epoch != 1 and epoch in prune_epochs:
          lth_step += 1
          remaining_k = (1 - final_sparsity) ** (lth_step / len(prune_epochs))
          target_sparsity = 1 - remaining_k
          print(f"Pruning {target_sparsity*100:.2f}% at epoch {epoch}")
          model, new_mask = weight_pruning(model, amount=target_sparsity, return_mask=True)
          global_mask = new_mask

          rewound = copy.deepcopy(rewind_state)
          for name, mask in global_mask.items():
              if name in rewound:
                  rewound[name] = rewound[name] * mask   # apply lottery ticket mask

          model.load_state_dict(rewound)
          epochs_no_improve = 0

    end_time = time.time()
    total_time = end_time - start_time

    if pruning_type == "POST":
      prune_start_time = time.time()
      model, _ = weight_pruning(model, final_sparsity)
      final_sparsity = calculate_sparsity(model)

      for ft_epoch in range(1, PRUNE_EPOCHS + 1):
            train_loss = train_one_epoch(model, train_loader_dp, optimizer)
            val_loss, val_acc = evaluate(model, val_loader)
            eps = privacy_engine.get_epsilon(DELTA) if use_dp else None

            print(f"[Fine Tuning Epoch {ft_epoch}] val_acc={val_acc:.4f}%"
                  + (f", ε={eps:.4f}" if use_dp else ""))
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                torch.save(model.state_dict(), best_model_path)

      prune_end_time = time.time()
      prune_time = prune_end_time - prune_start_time
      end_time = time.time()
      total_time = end_time - start_time

    # test on best model
    if pruning_type == "LTH":
      # reconstruct final winning ticket
      if global_mask is None:
          ticket_state = copy.deepcopy(rewind_state)
      else:
          ticket_state = copy.deepcopy(rewind_state)
          for name, mask in global_mask.items():
              if name in ticket_state:
                  ticket_state[name] = ticket_state[name] * mask
      model.load_state_dict(ticket_state)
    elif pruning_type == "POST":
      # keep already pruned & fine-tuned model
      pass
    else:
      model.load_state_dict(torch.load(best_model_path))
    test_loss, test_acc = evaluate(model, test_loader)
    final_eps = privacy_engine.get_epsilon(DELTA) if use_dp else None
    final_sparsity = calculate_sparsity(model)

    print(f"\n=== Results for {'DP-SGD' if use_dp else 'Standard-SGD'} with {pruning_type if pruning_type else 'No'} Pruning ===")
    print(f"Test Accuracy: {test_acc:.2f}%")
    print(f"Best Val Acc: {best_val_acc:.2f}%")
    if use_dp:
        print(f"Final ε = {final_eps:.3f}")
    print(f"Train Time:     {total_time:.2f} sec")
    if pruning_type:
        print(f"Sparsity:       {final_sparsity:.2f}%")

    return {
        "val_acc": best_val_acc,
        "test_acc": test_acc,
        "epsilon": final_eps,
        "train_time": total_time,
        "sparsity": final_sparsity
    }

# Baseline: No Differential Privacy

In [None]:
print(f"\nRunning DP-SGD baseline for fixed ({TARGET_EPSILON}, {TARGET_DELTA})-DP")
run_experiment(dataset=DATASET_NAME, use_dp=True, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with pre weight pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="PRE", final_sparsity=0.3, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with post weight pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="POST", final_sparsity=0.3, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with iterative pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="LTH", final_sparsity=0.3, fixed_privacy_budget=True, prune_epochs=[5, 10, 15])

MNIST

In [None]:
print(f"\nRunning DP-SGD baseline for fixed ({TARGET_EPSILON}, {TARGET_DELTA})-DP")
run_experiment(dataset=DATASET_NAME, use_dp=True, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with pre weight pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="PRE", final_sparsity=0.3, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with post weight pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="POST", final_sparsity=0.3, fixed_privacy_budget=True)

In [None]:
print("\nRunning DP-SGD baseline with iterative pruning")
run_experiment(dataset=DATASET_NAME, use_dp=True, pruning_type="LTH", final_sparsity=0.3, fixed_privacy_budget=True, prune_epochs=[5, 10, 15])