In [None]:
drive_folder = "Machine_Unlearning_Drive/Cifar100Results/"

ssd_folder = "SSD/"

scrub_folder = "SCRUB/"

github_folder = "Machine_Unlearning/"

!pip install scikit-learn torch torchvision

In [None]:
import os
import requests
import numpy as np
import matplotlib.pyplot as plt
from sklearn import linear_model, model_selection
import random

import torch
import json
from torch import nn
from torch import optim
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms
from torchvision.utils import make_grid
from torchvision.models import resnet18

from Machine_Unlearning.Metrics.metrics import *

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Running on device:", DEVICE.upper())

def seed_everything(seed):
  RNG = torch.Generator().manual_seed(seed)
  torch.manual_seed(seed)
  random.seed(seed)
  np.random.seed(seed)
  return RNG

SEED = 44
SPLIT = 0.15
RNG = seed_everything(SEED)
results = {}

In [None]:
import torch as trch
import torchvision.datasets as dts
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models import VGG16_Weights
from torchvision.utils import make_grid
import torch.nn as nn
import matplotlib.pyplot as plot
import scipy
import matplotlib.pyplot as plt


DEVICE = "cuda" if trch.cuda.is_available() else "cpu"
print("Running on device:", DEVICE.upper())



train_transform = transforms.Compose([
                         transforms.RandomCrop(32, padding=4),
                         transforms.RandomHorizontalFlip(),
                         transforms.ToTensor(), transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))])

test_transform = transforms.Compose([
                         transforms.ToTensor(),
                         transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))
        ])




train_set = dts.CIFAR100(root='./data', download=True, train=True, transform=train_transform)
train_loader = trch.utils.data.DataLoader(train_set, batch_size=128, shuffle=True)

test_set = dts.CIFAR100(root='./data', download=True, train=False, transform=test_transform)
test_loader = trch.utils.data.DataLoader(test_set, batch_size=128, shuffle=False)


GEN1 = torch.Generator().manual_seed(42)
retain_set, forget_set = torch.utils.data.random_split(train_set,[1-SPLIT,SPLIT],GEN1)
RNG = seed_everything(1337)
forget_loader = torch.utils.data.DataLoader(
    forget_set, batch_size=256, shuffle=True, num_workers=2 , generator=RNG
)
retain_loader = torch.utils.data.DataLoader(
    retain_set, batch_size=256, shuffle=True, num_workers=2, generator=RNG
)


cmodel = models.resnet18(weights=None, num_classes=100)
cmodel = cmodel.to(DEVICE)


images, labels = next(iter(train_loader))

fig, ax = plt.subplots(figsize=(12, 6))
plt.title("Sample images from Caltech101 dataset")
ax.set_xticks([])
ax.set_yticks([])
ax.imshow(make_grid(images, nrow=16).permute(1, 2, 0))
plt.show()


In [None]:
def accuracy(net, loader):
    """Return accuracy on a dataset given by the data loader."""
    correct = 0
    total = 0
    for i,(inputs, targets) in enumerate(loader):
        inputs, targets = inputs.to(DEVICE), targets.to(DEVICE)
        #print(i)
        outputs = net(inputs)
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
    return correct / total

In [None]:
def readout(model,name):
  RNG = seed_everything(SEED)
  test_entropies = compute_entropy(model, test_loader)
  retain_entropies = compute_entropy(model, retain_loader)
  forget_entropies = compute_entropy(model, forget_loader)


  results[f"test_entropies_{name}"] = test_entropies.tolist()
  results[f"retain_entropies_{name}"] = retain_entropies.tolist()
  results[f"forget_entropies_{name}"] = forget_entropies.tolist()

  test_losses = compute_losses(model, test_loader)
  retain_losses = compute_losses(model, retain_loader)
  forget_losses = compute_losses(model, forget_loader)

  results[f"test_losses_{name}"] = test_losses.tolist()
  results[f"retain_losses_{name}"] = retain_losses.tolist()
  results[f"forget_losses_{name}"] = forget_losses.tolist()

  # Since we have more forget losses than test losses, sub-sample them, to have a class-balanced dataset.
  gen = np.random.default_rng(1)
  if len(test_losses) > len(forget_losses):
    gen.shuffle(test_losses)
    test_losses = test_losses[: len(forget_losses)]
  else:
    gen.shuffle(forget_losses)
    forget_losses = forget_losses[: len(test_losses)]
    # make sure we have a balanced dataset for the MIA
  assert len(test_losses) == len(forget_losses)

  samples_mia = np.concatenate((test_losses, forget_losses)).reshape((-1, 1))
  labels_mia = [0] * len(test_losses) + [1] * len(forget_losses)

  mia_scores = simple_mia(samples_mia, labels_mia)

  print(
      f"The MIA has an accuracy of {mia_scores.mean():.3f} on forgotten vs unseen images"
  )

  results[f"MIA_losses_{name}"] = mia_scores.mean()

  gen = np.random.default_rng(1)
  if len(test_entropies) > len(forget_entropies):
    gen.shuffle(test_entropies)
    test_entropies = test_entropies[: len(forget_entropies)]
  else:
    gen.shuffle(forget_entropies)
    forget_entropies = forget_entropies[: len(test_entropies)]
    # make sure we have a balanced dataset for the MIA
  assert len(test_entropies) == len(forget_entropies)

  samples_mia = np.concatenate((test_entropies, forget_entropies)).reshape((-1, 1))
  labels_mia = [0] * len(test_entropies) + [1] * len(forget_entropies)

  mia_scores = simple_mia(samples_mia, labels_mia)

  print(
      f"The MIA has an accuracy of {mia_scores.mean():.3f} on forgotten vs unseen images"
  )

  results[f"MIA_entropies_{name}"] = mia_scores.mean()

  results[f"train_accuracy_{name}"] = accuracy(model, retain_loader)
  results[f"test_accuracy_{name}"] = accuracy(model, test_loader)
  results[f"forget_accuracy_{name}"] = accuracy(model, forget_loader)

  print("Train acc:"+ str(results[f"train_accuracy_{name}"]))
  print("Test acc:"+ str(results[f"test_accuracy_{name}"]))
  print("Forget acc:" +str(results[f"forget_accuracy_{name}"]))

In [None]:
#This model has been trained using SGD with a learning rate of 0.1, momentum of 0.9 and weight decay of 5e-4.

numepchs = 50
lr = 0.1
criter = nn.CrossEntropyLoss()
optim = trch.optim.SGD(cmodel.parameters(), lr=lr,momentum = 0.9, weight_decay=5e-4)
scheduler = trch.optim.lr_scheduler.LinearLR(optim, start_factor=1.0, end_factor=0.001, total_iters=numepchs)
nttlstps = len(retain_loader)
cmodel.train()
for epoch in range(numepchs):
    for x, (imgs, lbls) in enumerate(retain_loader):
         imgs , lbls = imgs.to(DEVICE), lbls.to(DEVICE)
         #imgs = imgs.reshape(-1, 28*28)


         outp = cmodel(imgs)
         losses = criter(outp, lbls)

         optim.zero_grad()
         losses.backward()
         optim.step()
         if x % 100 == 0:
           print (f'Epochs [{epoch+1}/{numepchs}], Step[{x+1}/{nttlstps}], Losses: {losses.item():.4f}')
    scheduler.step()

In [None]:
readout(cmodel,"retrained")
#with open(drive_folder+f"results_Cifar100_SPLIT_{int(SPLIT*100)}%_SEED_{SEED}_retrained.json", 'w') as fout:
#  json.dump(results, fout)