In [1]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import json 
from pathlib import Path
import pickle
from torchvision import datasets, transforms
from tqdm import tqdm
from copy import deepcopy
from collections import OrderedDict
import matplotlib.pyplot as plt

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)

    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

In [2]:
set_seed(42)

In [3]:
!pip install efficientnet_pytorch
from efficientnet_pytorch import EfficientNet
model = EfficientNet.from_pretrained('efficientnet-b3', num_classes=200)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Move model to designated device (Use GPU when on Colab)
model = model.to(device)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: efficientnet_pytorch
  Building wheel for efficientnet_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet_pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16446 sha256=5667b7d3b2e367914a7b8141e29167d46dd24ed306980fce4f96087fdc62205d
  Stored in directory: /root/.cache/pip/wheels/84/b9/90/25a0195cf95fb5533db96f1c77ea3f296b7cc86ae8ae48e3dc
Successfully built efficientnet_pytorch
Installing collected packages: efficientnet_pytorch
Successfully installed efficientnet_pytorch-0.7.1


Downloading: "https://github.com/lukemelas/EfficientNet-PyTorch/releases/download/1.0/efficientnet-b3-5fb5a3c3.pth" to /root/.cache/torch/hub/checkpoints/efficientnet-b3-5fb5a3c3.pth


  0%|          | 0.00/47.1M [00:00<?, ?B/s]

Loaded pretrained weights for efficientnet-b3


In [4]:
# your api key
api_key = {"username":"spacehitchhiker","key":"c698b877d42f3f853d0a599e6263d8a8"}

# uses pathlib Path
kaggle_path = Path('/root/.kaggle')
os.makedirs(kaggle_path, exist_ok=True)

# opens file and dumps python dict to json object 
with open (kaggle_path/'kaggle.json', 'w') as handl:
    json.dump(api_key,handl)

os.chmod(kaggle_path/'kaggle.json', 600)  

!kaggle datasets download -d akash2sharma/tiny-imagenet

!unzip -qq tiny-imagenet.zip

Downloading tiny-imagenet.zip to /content
100% 473M/474M [00:22<00:00, 24.8MB/s]
100% 474M/474M [00:22<00:00, 22.1MB/s]


In [51]:
# Load all the images
data_mean = (0.5071, 0.4865, 0.4409)
data_std = (0.2673, 0.2564, 0.2762)

data_path = '/content/tiny-imagenet-200/train'

full_dataset = datasets.ImageFolder(
    root=data_path
)
train_size = int(0.75 * len(full_dataset))
test_size = len(full_dataset) - train_size
# use torch.utils.data.random_split for training/test split
train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

In [34]:
for x,y in train_dataset:
  print(type(x)) 
  break

<class 'PIL.Image.Image'>


In [10]:
hard_classes = [24,42,49,65,94,99,131,159,172, 175, 197]
easy_classes = [0,1,14,44,45,78,103,115,143,145, 165, 166,176, 183, 191]

In [11]:
targets = [item[1] for item in train_dataset]
data = [item[0] for item in train_dataset]
X_all_sampled, y_all_sampled = [], []

targets = np.array(targets)
data = np.array(data)

  data = np.array(data)
  data = np.array(data)


In [52]:
test_targets = [item[1] for item in test_dataset]
test_data = [item[0] for item in test_dataset]
test_targets = np.array(test_targets)
test_data = np.array(test_data)

  test_data = np.array(test_data)
  test_data = np.array(test_data)


In [12]:
data1,data2 = [],[]
targets1,targets2 = [],[]
for c in range(200):
    X_all_c, y_all_c = data[targets==c],targets[targets==c]
    if c in easy_classes:
        num_samples = len(X_all_c)
        sampled_indices = torch.randperm(num_samples)[:(num_samples//3)]
    elif c in hard_classes:
        num_samples = len(X_all_c)
        sampled_indices = torch.randperm(num_samples)
        data2.append(X_all_c[sampled_indices])
        targets2.append(y_all_c[sampled_indices])
    else:
        num_samples = len(X_all_c)
        sampled_indices = torch.randperm(num_samples)
    data1.append(X_all_c[sampled_indices])
    targets1.append(y_all_c[sampled_indices]) 
    
data1 = np.concatenate(data1, axis=0)
targets1 = np.concatenate(targets1, axis=0)
data2 = np.concatenate(data2, axis=0)
targets2 = np.concatenate(targets2, axis=0)

In [13]:
class ModifDataset(torch.utils.data.Dataset):

    def __init__(self,X,y,transform):
        self.X = X
        self.y = y
        self.transform = transform

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        sample = (self.transform((self.X)[idx]),(self.y)[idx])
        return sample

In [53]:
tr = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.25, .25, 0.25])])

test_dataset = ModifDataset(test_data,test_targets,tr)

for x,y in test_dataset:
  print(type(x))
  break

<class 'torch.Tensor'>


In [54]:
tr1 = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.25, .25, 0.25])])

tr2 = transforms.Compose([
        transforms.RandomResizedCrop(64),
        transforms.RandomHorizontalFlip(),
        transforms.GaussianBlur(3,0.6),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

train_data1 = ModifDataset(data1,targets1,tr1)
train_data2 = ModifDataset(data2,targets2,tr2)

In [55]:
train_dataset = torch.utils.data.ConcatDataset([train_data1,train_data2])

In [56]:
train_loader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=32,
    num_workers=0,
    shuffle=True
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=32,
    num_workers=0,
    shuffle=False
)
print(len(train_dataset),len(test_dataset))

75431 25000


In [62]:
print(len(test_loader)*32,len(train_loader)*32)

25024 75456


In [23]:
def clamp(X, lower_limit, upper_limit):
    return torch.max(torch.min(X, upper_limit), lower_limit)

In [65]:
NUM_CLASSES = 200

def attack_pgd(model, X, y, epsilon, alpha, attack_iters, restarts, lower_limit, upper_limit, args=None):

    max_loss = torch.zeros(y.shape[0]).to(device)
    max_delta = torch.zeros_like(X).to(device)
    scaler = torch.cuda.amp.GradScaler()

    for _ in range(restarts):
        delta = torch.zeros_like(X).to(device)
        for i in range(len(epsilon)):
            delta[:, i, :, :].uniform_(-epsilon[i][0][0].item(), epsilon[i][0][0].item())
        delta.data = clamp(delta, lower_limit - X, upper_limit - X)
        delta.requires_grad = True

        for _ in range(attack_iters):
            with torch.cuda.amp.autocast():
                output = model(X + delta)
                index = torch.where(output.max(1)[1] == y)[0]
                if len(index) == 0:
                    break
                loss = F.cross_entropy(output, y)

            scaler.scale(loss).backward()
            grad = delta.grad.detach()

            d = delta[index, :, :, :]
            g = grad[index, :, :, :]
            d = clamp(d + alpha * torch.sign(g), -epsilon, epsilon)
            d = clamp(d, lower_limit - X[index, :, :, :], upper_limit - X[index, :, :, :])

            delta.data[index, :, :, :] = d
            delta.grad.zero_()

        all_loss = F.cross_entropy(model(X + delta), y, reduction='none').detach()
        max_delta[all_loss >= max_loss] = delta.detach()[all_loss >= max_loss]
        max_loss = torch.max(max_loss, all_loss)

    return max_delta


def evaluate_pgd(test_loader, model, attack_iters, restarts, epsilon, alpha, lower_limit, upper_limit, args=None):

    examples_per_class = {i : 0 for i in range(NUM_CLASSES)}
    correct_per_class = {i : 0 for i in range(NUM_CLASSES)}

    pgd_loss, pgd_acc = 0, 0
    n = 0
    model.eval()
    for i, (X, y) in enumerate(test_loader):
        X, y = X.to(device), y.to(device)
        pgd_delta = attack_pgd(model, X, y, epsilon, alpha, attack_iters, restarts, lower_limit, upper_limit, args=args)
        with torch.no_grad():
            output = model(X + pgd_delta)
            preds = output.max(1)[1]
            for i in range(NUM_CLASSES):
                examples_per_class[i] += len(y[y==i])
                correct_per_class[i] += (preds[y==i] == i).sum().item()
            loss = F.cross_entropy(output, y)
            pgd_loss += loss.item() * y.size(0)
            pgd_acc += (output.max(1)[1] == y).sum().item()
            n += y.size(0)
        acc_per_class = {}
    for i in range(NUM_CLASSES):
        acc_per_class[i] = correct_per_class[i] / examples_per_class[i]
    return pgd_loss/n, pgd_acc/n,acc_per_class


def evaluate_standard(test_loader, model, args=None):
    
    examples_per_class = {i : 0 for i in range(NUM_CLASSES)}
    correct_per_class = {i : 0 for i in range(NUM_CLASSES)}
    
    test_loss, test_acc = 0, 0
    n = 0
    model.eval()
    with torch.no_grad():
        for i, (X, y) in tqdm(enumerate(test_loader), total=len(test_loader)):
            X, y = X.to(device), y.to(device)
            output = model(X)
            preds = output.max(1)[1]
            for i in range(NUM_CLASSES):
                examples_per_class[i] += len(y[y==i])
                correct_per_class[i] += (preds[y==i] == i).sum().item()
            loss = F.cross_entropy(output, y)
            test_loss += loss.item() * y.size(0)
            test_acc += (output.max(1)[1] == y).sum().item()
            n += y.size(0)
    acc_per_class = {}
    for i in range(NUM_CLASSES):
        acc_per_class[i] = correct_per_class[i] / examples_per_class[i]
    return test_loss/n, test_acc/n,acc_per_class


In [63]:
weight_decay = 5e-4
epsilon = 8
batch_size = 32
alpha = 2
loss_weight = 0.3
fgsm_step = 1
delta_init = 'random'
model.train()

optimizer = torch.optim.Adam(model.parameters(),lr=3e-4,weight_decay=weight_decay)
scaler = torch.cuda.amp.GradScaler()
criterion = nn.CrossEntropyLoss()

data_mean = (0.5071, 0.4865, 0.4409)
data_std = (0.2673, 0.2564, 0.2762)

mu = torch.tensor(data_mean).view(3, 1, 1).to(device)
std = torch.tensor(data_std).view(3, 1, 1).to(device)
upper_limit = ((1 - mu) / std)
lower_limit = ((0 - mu) / std)
epsilon = (epsilon / 255.) / std
alpha = (alpha / 255.) / std

if delta_init == 'previous':
    delta = torch.zeros(batch_size, 3, 32, 32).to(device)

prev_robust_acc = 0.

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr=3e-4,weight_decay=weight_decay)

epochs = 10
loop = tqdm(range(epochs))
for epoch in loop:
    train_loss,train_acc,train_n = 0,0,0
    # if epoch > 12:
    #   optimizer.param_groups[0]['lr'] = 1e-4
    model.train()
    for i, (X, y) in enumerate(train_loader):
        optimizer.zero_grad()
        X, y = X.to(device), y.to(device)
        if delta_init != 'previous':
            delta = torch.zeros_like(X).to(device)
        if delta_init == 'random':
            for j in range(len(epsilon)):
                delta[:, j, :, :].uniform_(-epsilon[j][0][0].item(), epsilon[j][0][0].item())
            delta.data = clamp(delta, lower_limit - X, upper_limit - X)
        if fgsm_step == 1:
            delta.requires_grad = True
            for _ in range(1):
                output = model(X + delta[:X.size(0)])
                loss = F.cross_entropy(output, y)
                scaler.scale(loss).backward()
                grad = delta.grad.detach()
                scaler.step(optimizer)
                scaler.update()
                delta.data = clamp(delta + alpha * torch.sign(grad), -epsilon, epsilon)
                delta.data[:X.size(0)] = clamp(delta[:X.size(0)], lower_limit - X, upper_limit - X)
            delta = delta.detach()
            output = model(X + delta[:X.size(0)])
            loss = (1 - loss_weight) * criterion(output, y) + loss_weight * criterion(model(X), y)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            
        train_loss += loss.item() * y.size(0)
        train_acc += (output.max(1)[1] == y).sum().item()
        train_n += y.size(0)
      
    model.eval()
    pgd_loss, pgd_acc, acc_cl_pgd_at = evaluate_pgd(test_loader, model, 1, 5, epsilon, alpha, 
                                  lower_limit, upper_limit)
    
    if pgd_acc >= 0.4:
      best_state_dict = model.state_dict()
      torch.save(best_state_dict,'model_default_good.pth')
      
    loop.set_description(f'train acc: {round(train_acc / train_n,3)} test acc: {pgd_acc}')

train acc: 0.289 test acc: 0.30884:  40%|████      | 4/10 [1:21:53<2:02:45, 1227.64s/it]