In [1]:
# Load basic dependencies:
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline
import matplotlib.pyplot as plt
import sys
import os
import numpy as np

# Torch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torch.utils.data import random_split
from torchvision.models import resnet50, resnet18, alexnet
from torchvision import transforms
from PIL import Image
from PIL import Image as PILImage


# Load ART dependencies:
# from art.estimators.classification import KerasClassifier
from art.estimators.classification import PyTorchClassifier
from art.attacks.evasion import ProjectedGradientDescent
from art.defences.preprocessor import SpatialSmoothing
from art.utils import to_categorical

# Install ImageNet stubs:
#!{sys.executable} -m pip install git+https://github.com/nottombrown/imagenet_stubs
import imagenet_stubs
from imagenet_stubs.imagenet_2012_labels import name_to_label, label_to_name
from tqdm import tqdm

# Pre-processing

## Utils

In [2]:
# Convert train_dataset to numpy arrays (images and labels)
def convert_to_numpy(dataset):
    images_np = []
    labels_np = []

    for img, label in tqdm(dataset):
        # img is a torch.Tensor (C, H, W), convert to numpy and transpose to (H, W, C)
        #img_np = img.numpy().transpose(1, 2, 0)
        images_np.append(img)
        labels_np.append(label)

    images_np = np.stack(images_np)
    labels_np = np.array(labels_np)

    print('Images shape:', images_np.shape)
    print('Labels shape:', labels_np.shape)
    return images_np, labels_np

In [3]:
# TODO: do it with dataoader torch, and remove the previous function
def generate_adv_batch(images, adv, batch_size=32, labels=None):
    img_adv=[]
    for i in tqdm(range(0, len(images), batch_size)):
        batch_images = images[i:i+batch_size]
        if labels is None:
            x_adv = adv.generate(batch_images)
        else:
            batch_labels = labels[i:i+batch_size]
            x_adv = adv.generate(batch_images, y=to_categorical(batch_labels, nb_classes=100))
        img_adv.append(x_adv)

    img_adv = np.concatenate(img_adv, axis=0)
    return img_adv

In [4]:
def save_dataset(og_img_list, adv_img_list, labels, save_dir):
    """Create dataset for discriminate original - adv images"""

    os.makedirs(save_dir, exist_ok=True)
    og_dir = os.path.join(save_dir, "og")
    og_np_dir = os.path.join(save_dir, "og_np")
    adv_dir = os.path.join(save_dir, "adv")
    adv_np_dir = os.path.join(save_dir, "adv_np")
    os.makedirs(og_np_dir, exist_ok=True)
    os.makedirs(adv_np_dir, exist_ok=True)
    os.makedirs(og_dir, exist_ok=True)
    os.makedirs(adv_dir, exist_ok=True)

    assert og_img_list.shape[0] == adv_img_list.shape[0], "Original and adversarial images must have the same number of samples."

    for i in tqdm(range(og_img_list.shape[0])):
        # Convert (C, H, W) to (H, W, C) and scale to [0,255]
        og_img = og_img_list[i].transpose(1, 2, 0)
        adv_img = adv_img_list[i].transpose(1, 2, 0)
        og_img = np.clip(og_img, 0, 1)
        adv_img = np.clip(adv_img, 0, 1)
        og_img = (og_img * 255).astype(np.uint8)
        adv_img = (adv_img * 255).astype(np.uint8)
        og_img_pil = PILImage.fromarray(og_img)
        adv_img_pil = PILImage.fromarray(adv_img)
        og_img_pil.save(os.path.join(og_dir, f"{i:05d}.png"))
        adv_img_pil.save(os.path.join(adv_dir, f"{i:05d}.png"))

        np.save(os.path.join(og_np_dir, f"{i:05d}.npy"), og_img_list[i])
        np.save(os.path.join(adv_np_dir, f"{i:05d}.npy"), adv_img_list[i])



    
    labels_path = os.path.join(save_dir, "labels_og_adv.npy")
    np.save(labels_path, labels)

## Create and Save dataset
Note: Images are normalized (imgnet meand and std)

In [5]:
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

from torchvision.models import ResNet18_Weights
from torch.utils.data import random_split
import torch
transform = ResNet18_Weights.IMAGENET1K_V1.transforms()

dataset_path = '/mnt/ssd1t/datasets/imagenet_100'
dataset = ImageFolder(root=dataset_path, transform=transform)

In [8]:
model = resnet18(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 100)

In [9]:
model_path = "./resnet18_imagenet100.pth"
model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))

<All keys matched successfully>

In [34]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [9]:
images_np, labels_np = convert_to_numpy(dataset)

100%|██████████| 5000/5000 [00:27<00:00, 184.77it/s]


Images shape: (5000, 3, 224, 224)
Labels shape: (5000,)


In [10]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=1e-4)

classifier = PyTorchClassifier(
    model=model,
    clip_values=(0, 255),
    loss=criterion,
    optimizer=optimizer,
    input_shape=(3, 224, 224),  # ResNet18 expects input shape (C, H, W)
    nb_classes=100,
    #preprocessing=preprocessor
)

### Untargeted

In [12]:
adv = ProjectedGradientDescent(classifier, targeted=False, batch_size=32, max_iter=20, eps_step=0.001, eps=5, decay=0.9)

In [None]:
images_adv_np = generate_adv_batch(images_np, adv, batch_size=32)

100%|██████████| 157/157 [07:30<00:00,  2.87s/it]


In [14]:
save_dataset(images_np, images_adv_np, labels_np, "/mnt/ssd1t/datasets/imagenet_100_adv/")

100%|██████████| 5000/5000 [02:30<00:00, 33.18it/s]


### Targeted (random label)


In [12]:
adv = ProjectedGradientDescent(classifier, targeted=True, batch_size=32, max_iter=20, eps_step=0.001, eps=5, decay=0.9)

In [13]:
random_labels_np = np.random.randint(0, 100, size=labels_np.shape)

In [19]:
images_adv_np = generate_adv_batch(images_np, adv, batch_size=32, labels=random_labels_np)

100%|██████████| 157/157 [07:12<00:00,  2.75s/it]


In [None]:
save_dataset(images_np, images_adv_np, labels_np, "/mnt/ssd1t/datasets/imagenet_100_adv_random_targeted/")

100%|██████████| 5000/5000 [02:26<00:00, 34.04it/s]


# Train with adv images

In [5]:
class ImageFolderWithNumpyLabels(Dataset):
    def __init__(self, images_dir, labels_path=None, label=None, transform=None):
        self.images_dir = images_dir
        self.transform = transform
        self.label = label
        self.image_files = sorted(os.listdir(images_dir))
        if labels_path:
            self.labels = np.load(labels_path)
            assert len(self.image_files) == len(self.labels), "Number of images and labels must match"
       
       
    def __len__(self):
        return len(self.image_files) #len(self.labels)

    def __getitem__(self, idx):
        img_np = np.load(os.path.join(self.images_dir, self.image_files[idx]))
        # if self.transform:
        #     image = self.transform(image)
        if self.label is not None:
            label = self.label
        else:
            label = int(self.labels[idx])
        
        return torch.Tensor(img_np), label

In [6]:
images_dir = "/mnt/ssd1t/datasets/imagenet_100_adv_random_targeted/"
labels_path = "/mnt/ssd1t/datasets/imagenet_100_adv_old/labels_og_adv.npy"

dataset_og_norm = ImageFolderWithNumpyLabels(os.path.join(images_dir, 'og_np'), labels_path)
dataset_adv_norm = ImageFolderWithNumpyLabels(os.path.join(images_dir, 'adv_np'), labels_path)

train_size = int(0.8 * len(dataset_og_norm))
val_size = len(dataset_og_norm) - train_size

torch.manual_seed(42)
train_dataset_og, val_dataset_og = random_split(dataset_og_norm, [train_size, val_size])
torch.manual_seed(42)
train_dataset_adv, val_dataset_adv = random_split(dataset_adv_norm, [train_size, val_size])

In [7]:
train_aug = ConcatDataset([train_dataset_og, train_dataset_adv])
train_loader_aug = DataLoader(train_aug, batch_size=32, shuffle=True)
val_aug = ConcatDataset([val_dataset_og, val_dataset_adv])
val_loader_aug = DataLoader(val_aug, batch_size=32, shuffle=False)

In [11]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=1e-4)

# Training loop
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in tqdm(train_loader_aug):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_acc:.4f}")

100%|██████████| 250/250 [00:47<00:00,  5.29it/s]


Epoch 1/10 - Loss: 2.1209 - Accuracy: 0.5634


100%|██████████| 250/250 [00:35<00:00,  7.08it/s]


Epoch 2/10 - Loss: 1.8404 - Accuracy: 0.6471


100%|██████████| 250/250 [00:35<00:00,  7.14it/s]


Epoch 3/10 - Loss: 1.6948 - Accuracy: 0.6799


100%|██████████| 250/250 [00:35<00:00,  7.11it/s]


Epoch 4/10 - Loss: 1.5889 - Accuracy: 0.6993


100%|██████████| 250/250 [00:34<00:00,  7.32it/s]


Epoch 5/10 - Loss: 1.4992 - Accuracy: 0.7238


100%|██████████| 250/250 [00:35<00:00,  7.14it/s]


Epoch 6/10 - Loss: 1.4211 - Accuracy: 0.7362


100%|██████████| 250/250 [00:34<00:00,  7.26it/s]


Epoch 7/10 - Loss: 1.3596 - Accuracy: 0.7458


100%|██████████| 250/250 [00:35<00:00,  7.08it/s]


Epoch 8/10 - Loss: 1.2888 - Accuracy: 0.7630


100%|██████████| 250/250 [00:34<00:00,  7.21it/s]


Epoch 9/10 - Loss: 1.2352 - Accuracy: 0.7770


100%|██████████| 250/250 [00:34<00:00,  7.29it/s]

Epoch 10/10 - Loss: 1.1851 - Accuracy: 0.7849





In [12]:
model_path = "./resnet18_imagenet100_adv_targeted.pth"
torch.save(model.state_dict(), model_path)

In [None]:
model_path = ""
model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))

<All keys matched successfully>

# Eval

## Original

In [13]:
test_dataloader = DataLoader(val_dataset_og, batch_size=32, shuffle=False)

In [14]:
model.eval()
correct = 0
total = 0

with torch.no_grad():
    for images, labels in tqdm(test_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

test_acc = correct / total
print(f"Test Accuracy: {test_acc:.4f}")

100%|██████████| 32/32 [00:03<00:00,  8.20it/s]

Test Accuracy: 0.7240





## adv

In [15]:
val_images_np, val_lbl_np = convert_to_numpy(val_dataset_adv)

100%|██████████| 1000/1000 [00:02<00:00, 392.72it/s]


Images shape: (1000, 3, 224, 224)
Labels shape: (1000,)


In [16]:
pred_adv = classifier.predict(val_images_np)
label_adv = np.argmax(pred_adv, axis=1)
acc = np.sum(label_adv == val_lbl_np) / val_lbl_np.shape[0]
print('Accuracy of adversarial samples:', acc)

Accuracy of adversarial samples: 0.562


## og + adv

In [17]:
val_images_aug_np, val_lbl_aug_np = convert_to_numpy(val_aug)

100%|██████████| 2000/2000 [00:00<00:00, 3994.13it/s]


Images shape: (2000, 3, 224, 224)
Labels shape: (2000,)


In [18]:
pred_adv = classifier.predict(val_images_aug_np)
label_adv = np.argmax(pred_adv, axis=1)
acc = np.sum(label_adv == val_lbl_aug_np) / val_lbl_aug_np.shape[0]
print('Accuracy of adversarial samples:', acc)

Accuracy of adversarial samples: 0.643


# Module to predict adv images

In [8]:
images_dir = "/mnt/ssd1t/datasets/imagenet_100_adv/"
labels_path = "/mnt/ssd1t/datasets/imagenet_100_adv/labels_og_adv.npy"

dataset_og_norm = ImageFolderWithNumpyLabels(os.path.join(images_dir, 'og_np'), label=0)
dataset_adv_norm = ImageFolderWithNumpyLabels(os.path.join(images_dir, 'adv_np'), label=1)

train_size = int(0.8 * len(dataset_og_norm))
val_size = len(dataset_og_norm) - train_size

torch.manual_seed(42)
train_dataset_og, val_dataset_og = random_split(dataset_og_norm, [train_size, val_size])
torch.manual_seed(42)
train_dataset_adv, val_dataset_adv = random_split(dataset_adv_norm, [train_size, val_size])

In [9]:
train_aug = ConcatDataset([train_dataset_og, train_dataset_adv])
train_loader_aug = DataLoader(train_aug, batch_size=32, shuffle=True)
val_aug = ConcatDataset([val_dataset_og, val_dataset_adv])
val_loader_aug = DataLoader(val_aug, batch_size=32, shuffle=False)

## Train naive model (resnet pretrained on imagenet-100)

In [37]:
model = resnet18(pretrained=False)
#model_path = "./resnet18_imagenet100.pth"
#model.fc = nn.Linear(model.fc.in_features, 100)
#model.load_state_dict(torch.load(model_path, map_location=torch.device("cpu")))
model.fc = nn.Linear(model.fc.in_features, 2)

In [38]:
# Freeze all layers except the classifier (head)
for name, param in model.named_parameters():
    if name.startswith("fc"):
        param.requires_grad = True
    else:
        param.requires_grad = False

In [39]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=1e-4)

# Training loop
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in tqdm(train_loader_aug):
        images = images.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total
    print(f"Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f} - Accuracy: {epoch_acc:.4f}")

  0%|          | 0/250 [00:00<?, ?it/s]

100%|██████████| 250/250 [00:13<00:00, 18.00it/s]


Epoch 1/10 - Loss: 0.4800 - Accuracy: 0.7941


100%|██████████| 250/250 [00:13<00:00, 18.70it/s]


Epoch 2/10 - Loss: 0.3801 - Accuracy: 0.8424


100%|██████████| 250/250 [00:14<00:00, 17.76it/s]


Epoch 3/10 - Loss: 0.3597 - Accuracy: 0.8465


100%|██████████| 250/250 [00:13<00:00, 18.67it/s]


Epoch 4/10 - Loss: 0.3406 - Accuracy: 0.8535


100%|██████████| 250/250 [00:13<00:00, 18.25it/s]


Epoch 5/10 - Loss: 0.3327 - Accuracy: 0.8588


100%|██████████| 250/250 [00:13<00:00, 18.27it/s]


Epoch 6/10 - Loss: 0.3204 - Accuracy: 0.8661


100%|██████████| 250/250 [00:13<00:00, 18.21it/s]


Epoch 7/10 - Loss: 0.3073 - Accuracy: 0.8759


100%|██████████| 250/250 [00:13<00:00, 18.23it/s]


Epoch 8/10 - Loss: 0.3028 - Accuracy: 0.8726


100%|██████████| 250/250 [00:13<00:00, 18.45it/s]


Epoch 9/10 - Loss: 0.2977 - Accuracy: 0.8779


100%|██████████| 250/250 [00:13<00:00, 18.55it/s]

Epoch 10/10 - Loss: 0.2890 - Accuracy: 0.8801





In [34]:
val_images_aug_np, val_lbl_aug_np = convert_to_numpy(val_aug)

100%|██████████| 2000/2000 [00:00<00:00, 5387.86it/s]


Images shape: (2000, 3, 224, 224)
Labels shape: (2000,)


In [40]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, momentum=0.9, weight_decay=1e-4)

classifier = PyTorchClassifier(
    model=model,
    clip_values=(0, 255),
    loss=criterion,
    optimizer=optimizer,
    input_shape=(3, 224, 224),  # ResNet18 expects input shape (C, H, W)
    nb_classes=2,
    #preprocessing=preprocessor
    )

In [41]:
pred_adv = classifier.predict(val_images_aug_np)
print(pred_adv.shape)
label_adv = np.argmax(pred_adv, axis=1)
acc = np.sum(label_adv == val_lbl_aug_np) / val_lbl_aug_np.shape[0]
print('Accuracy of adversarial samples:', acc)

(2000, 2)
Accuracy of adversarial samples: 0.8885
