In [None]:
%pip install robust_minisets

In [None]:

from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10, CIFAR100
from sklearn.metrics import accuracy_score

import medmnist
from medmnist import INFO as INFO_MEDMNIST

import robust_minisets
from robust_minisets import INFO as INFO_ROBUST
from robust_minisets.info import DEFAULT_ROOT

In [None]:
print(f"Robust-Minisets v{robust_minisets.__version__} @ {robust_minisets.HOMEPAGE}")

# We showcase the work with an example training

In [None]:
data_flag_dict = {
    "cifar-10-1"        : (CIFAR10, getattr(robust_minisets, "CIFAR10_1"), None, INFO_ROBUST["cifar-10-1"]),
    "cifar-10-c"        : (CIFAR10, getattr(robust_minisets, "CIFAR10C"), None, INFO_ROBUST["cifar-10-c"]),
    "cifar-100-c"       : (CIFAR100, getattr(robust_minisets, "CIFAR100C"), None, INFO_ROBUST["cifar-100-c"]),
    "eurosat"           : (getattr(robust_minisets, "EuroSAT"), getattr(robust_minisets, "EuroSAT"), INFO_ROBUST["eurosat"], INFO_ROBUST["eurosat"]),
    "eurosat-c"         : (getattr(robust_minisets, "EuroSAT"), getattr(robust_minisets, "EuroSATC"), INFO_ROBUST["eurosat"], INFO_ROBUST["eurosat-c"]),
    "bloodmnist-c"      : (getattr(medmnist, "BloodMNIST"), getattr(robust_minisets, "BloodMNISTC"), INFO_MEDMNIST["bloodmnist"], INFO_ROBUST["bloodmnist-c"]),
    "breastmnist-c"     : (getattr(medmnist, "BreastMNIST"), getattr(robust_minisets, "BreastMNISTC"), INFO_MEDMNIST["breastmnist"], INFO_ROBUST["breastmnist-c"]),
    "dermamnist-c"      : (getattr(medmnist, "DermaMNIST"), getattr(robust_minisets, "DermaMNISTC"), INFO_MEDMNIST["dermamnist"], INFO_ROBUST["dermamnist-c"]),
    "octmnist-c"        : (getattr(medmnist, "OCTMNIST"), getattr(robust_minisets, "OCTMNISTC"), INFO_MEDMNIST["octmnist"], INFO_ROBUST["octmnist-c"]),
    "organamnist-c"     : (getattr(medmnist, "OrganAMNIST"), getattr(robust_minisets, "OrganAMNISTC"), INFO_MEDMNIST["organamnist"], INFO_ROBUST["organamnist-c"]),
    "organcmnist-c"     : (getattr(medmnist, "OrganCMNIST"), getattr(robust_minisets, "OrganCMNISTC"), INFO_MEDMNIST["organcmnist"], INFO_ROBUST["organcmnist-c"]),
    "organsmnist-c"     : (getattr(medmnist, "OrganSMNIST"), getattr(robust_minisets, "OrganSMNISTC"), INFO_MEDMNIST["organsmnist"], INFO_ROBUST["organsmnist-c"]),
    "pathmnist-c"       : (getattr(medmnist, "PathMNIST"), getattr(robust_minisets, "PathMNISTC"), INFO_MEDMNIST["pathmnist"], INFO_ROBUST["pathmnist-c"]),
    "pneumoniamnist-c"  : (getattr(medmnist, "PneumoniaMNIST"), getattr(robust_minisets, "PneumoniaMNISTC"), INFO_MEDMNIST["pneumoniamnist"], INFO_ROBUST["pneumoniamnist-c"]),
    "tissuemnist-c"     : (getattr(medmnist, "TissueMNIST"), getattr(robust_minisets, "TissueMNISTC"), INFO_MEDMNIST["tissuemnist"], INFO_ROBUST["tissuemnist-c"]),
    "tiny-imagenet"     : (getattr(robust_minisets, "TinyImageNet"), getattr(robust_minisets, "TinyImageNet"), INFO_ROBUST["tiny-imagenet"], INFO_ROBUST["tiny-imagenet"]), 
    "tiny-imagenet-a"   : (getattr(robust_minisets, "TinyImageNet"), getattr(robust_minisets, "TinyImageNetA"), INFO_ROBUST["tiny-imagenet"], INFO_ROBUST["tiny-imagenet-a"]),
    "tiny-imagenet-c"   : (getattr(robust_minisets, "TinyImageNet"), getattr(robust_minisets, "TinyImageNetC"), INFO_ROBUST["tiny-imagenet"], INFO_ROBUST["tiny-imagenet-c"]),
    "tiny-imagenet-r"   : (getattr(robust_minisets, "TinyImageNet"), getattr(robust_minisets, "TinyImageNetR"), INFO_ROBUST["tiny-imagenet"], INFO_ROBUST["tiny-imagenet-r"]),
    "tiny-imagenet-v2"  : (getattr(robust_minisets, "TinyImageNet"), getattr(robust_minisets, "TinyImageNetv2"), INFO_ROBUST["tiny-imagenet"], INFO_ROBUST["tiny-imagenet-v2"])
}

In [None]:
data_flag = 'tiny-imagenet-r'

download = True

NUM_EPOCHS = 3
BATCH_SIZE = 128
lr = 0.001

DataClass, DataClass_Robust, info_train_test, info_robust = data_flag_dict[data_flag]

# Info train/test
if "cifar" in data_flag:
    n_channels_train_test = 3
else:
    n_channels_train_test = info_train_test["n_channels"]

# Robust-Datasets info
n_channels_robust = info_robust["n_channels"]
n_classes = info_robust["n_classes"]

## First, we read the train, test and robustness data, preprocess them and encapsulate them into dataloader form.

In [None]:
# preprocessing of data
train_test_transform = [
    transforms.Resize(28),
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
]

robust_transform = [
    transforms.Resize(28),
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
]

if n_channels_train_test < 3:
        train_test_transform  += [
            transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
        ]

if n_channels_robust < 3:
        robust_transform  += [
            transforms.Lambda(lambda x: x.repeat(3, 1, 1)),
        ]

train_test_transform = transforms.Compose(train_test_transform)
robust_transform = transforms.Compose(robust_transform)

In [None]:
# load the data
if "cifar" in data_flag:
    train_dataset = DataClass(root=DEFAULT_ROOT, train=True, transform=train_test_transform, download=download)
    test_dataset = DataClass(root=DEFAULT_ROOT, train=False, transform=train_test_transform, download=download)
else:
    train_dataset = DataClass(split="train", transform=train_test_transform, download=download)
    test_dataset = DataClass(split="test", transform=train_test_transform, download=download)
test_robust_dataset = DataClass_Robust(split="test", transform=robust_transform, download=download)

# encapsulate data into dataloader form
train_loader = data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
train_loader_at_eval = data.DataLoader(dataset=train_dataset, batch_size=2*BATCH_SIZE, shuffle=False)
test_loader = data.DataLoader(dataset=test_dataset, batch_size=2*BATCH_SIZE, shuffle=False)
test_robust_loader = data.DataLoader(dataset=test_robust_dataset, batch_size=2*BATCH_SIZE, shuffle=False)

In [None]:
print(test_robust_dataset)

In [None]:
# visualization
test_robust_dataset.montage(length=1)

In [None]:
# montage
test_robust_dataset.montage(length=10)

## Then, we define a simple model for illustration, object function and optimizer that we use to classify.

In [None]:
# define a simple CNN model
class Net(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(Net, self).__init__()

        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels, 16, kernel_size=3),
            nn.BatchNorm2d(16),
            nn.ReLU())

        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 16, kernel_size=3),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer3 = nn.Sequential(
            nn.Conv2d(16, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU())
        
        self.layer4 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3),
            nn.BatchNorm2d(64),
            nn.ReLU())

        self.layer5 = nn.Sequential(
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.fc = nn.Sequential(
            nn.Linear(64 * 4 * 4, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes))

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

model = Net(in_channels=3, num_classes=n_classes)
    
# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
    
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

## Next, we can start to train and evaluate!

In [None]:
# train
for epoch in range(NUM_EPOCHS):
    model.train()
    for inputs, targets in tqdm(train_loader):
        # forward + backward + optimize
        optimizer.zero_grad()
        outputs = model(inputs)
        
        targets = targets.squeeze().long()
        loss = criterion(outputs, targets)
        
        loss.backward()
        optimizer.step()

In [None]:
# evaluation
def test(split):
    model.eval()
    y_true = torch.tensor([])
    y_score = torch.tensor([])

    data_loader_dict = {
        "train" : train_loader_at_eval,
        "test"  : test_loader,
        "test_robust": test_robust_loader
    }
    
    data_loader = data_loader_dict[split]

    with torch.no_grad():
        for inputs, targets in data_loader:
            outputs = model(inputs)

            targets = targets.squeeze().long()
            outputs = outputs.softmax(dim=-1)
            targets = targets.float().resize_(len(targets), 1)

            y_true = torch.cat((y_true, targets), 0)
            y_score = torch.cat((y_score, outputs), 0)

        y_true = y_true.numpy().squeeze()
        y_score = y_score.detach().numpy().squeeze()

        accuracy = accuracy_score(y_true, np.argmax(y_score, axis=-1))
    
        print(f"Split: {split} | Accuracy: {accuracy * 100:.2f}%")

        
print('==> Evaluating ...')
test('train')
test('test')
test('test_robust')