# AIPI 590 - XAI | Assignment #00
### Description
### Hongxuan Li

#### Include the button below. Change the link to the location in your github repository:
#### Example: https://colab.research.google.com/github/yourGHName/yourREPOName/blob/yourBranchName/yourFileName.ipynb


[![Open In Collab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/AIPI-590-XAI/Duke-AI-XAI/blob/dev/templates/template.ipynb)

In [None]:
import os

# Remove Colab default sample_data
!rm -r ./sample_data

# Clone GitHub files to colab workspace
repo_name = "AIPI590-XAI" # Change to your repo name
git_path = 'https://github.com/h0ngxuanli/AIPI590-XAI.git' #Change to your path
!git clone "{git_path}"

# Install dependencies from requirements.txt file
!pip install -r "{os.path.join(repo_name,'requirements.txt')}" #Add if using requirements.txt

# Change working directory to location of notebook
notebook_dir = 'templates'
path_to_notebook = os.path.join(repo_name,notebook_dir)
%cd "{path_to_notebook}"
%ls

# Dependencies

In [29]:
import torch
import numpy as np
import random
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision
from torch.utils.data import DataLoader, Dataset

from datasets import load_dataset
from tqdm import tqdm
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
device

device(type='cuda')

# Hyperparameters

In [53]:
# hyperparameters
checkpoint = 'IMAGENET1K_V1'
dataset_path = "songweig/imagenet_sketch"
batch_size = 32
patch_size = 50 
learning_rate = 0.0001
num_samples_per_class = 5
epochs = 40
num_proc = 12
target_class = 0 # tench, Tinca tinca

# Load Pre-trained Model

In [5]:
# load model
model = torchvision.models.resnet34(weights = checkpoint)
model.to(device)
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

# Prepare Dataset
1. load data from huggingface "songweig/imagenet_sketch" which contains 50 * 1000 samples. 50 samples for 1000 classes in ImageNet
2. select 10 samples for each class to form 10000 data samples
3. transform data samples in the sampled dataset to enbale spatial robustness
4. create dataloader

In [30]:
# apply transformation to enable robustnesss of adversarial patch
def transform(example):
    image = example['image']
    
    # there exists grey image
    if image.mode == 'L':
        image = image.convert('RGB')
    
    transformations = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    transformed_image = transformations(image)
    example['image'] = transformed_image
    return example

# convert huggingface dataset into dataloader
class ImageNetDataset(Dataset):
    def __init__(self, hf_dataset, transform=None):
        self.dataset = hf_dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        image = item['image']
        label = item['label']

        if self.transform:
            image = self.transform(image)

        return image, label


In [28]:
# load dataset
# 50 images for 1000 classes
dataset = load_dataset(dataset_path, num_proc = num_proc)

In [10]:
# Select a subset of data for each class
class_labels = dataset['train'].features['label'].names
class_counts = {label: 0 for label in class_labels}
sampled_indices = []
for index, example in enumerate(dataset['train']):
    label = class_labels[example['label']]
    
    if class_counts[label] < num_samples_per_class:
        sampled_indices.append(index)
    class_counts[label] += 1

len(sampled_indices)


10000

In [16]:
# transform selected 10000 images
subset = dataset['train'].select(sampled_indices)
subset = subset.map(transform)
subset.set_format(type='torch', columns=['image', 'label'])
subset


Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

Map: 100%|██████████| 10000/10000 [04:46<00:00, 34.96 examples/s] 


Dataset({
    features: ['image', 'label'],
    num_rows: 10000
})

In [31]:
subset_new = ImageNetDataset(subset)

In [48]:
# dataloader = DataLoader(subset_new, batch_size=batch_size, shuffle=True)
dataloader = DataLoader(subset_new, batch_size=200, shuffle=True)

# Initialize the patch as trainbale parameters

In [33]:
# initialize patch as trainable parameters
patch = torch.rand((3, patch_size, patch_size), requires_grad=True, device=device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam([patch], lr=learning_rate)

# Train patch by fooling model with targart label

In [54]:
# progress_bar = tqdm(total=len(dataloader) * epochs, unit='batch', desc='Training')

for epoch in range(epochs):
  epoch_loss = 0.0

  for images, labels in dataloader:
    # load original images
    images, _ = images.to(device), labels.to(device)
    targets = torch.full((images.size(0),), target_class, dtype=torch.long, device=device)


    # randomly select the patch position
    patched_images = images.clone()
    # patch_size = patch.size(1)
    # max_x = images.size(2) - patch_size
    # max_y = images.size(3) - patch_size
    # start_x = random.randint(0, max_x)
    # start_y = random.randint(0, max_y)

    # # apply patch to original image
    # patched_images[:, :, start_x:start_x + patch_size, start_y:start_y + patch_size] = patch
    
    patch_size = patch.size(1)
    image_size = 224
    max_x = image_size - patch_size
    max_y = image_size - patch_size
    fixed_start_x = max_x // 2  # Center the patch horizontally
    fixed_start_y = max_y // 2  # 
    patched_images[:, :, fixed_start_x:fixed_start_x + patch_size, fixed_start_y:fixed_start_y + patch_size] = patch


    # start training
    outputs = model(patched_images)
    loss = criterion(outputs, targets)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    epoch_loss += loss.item()
    # progress_bar.set_postfix(epoch=epoch + 1, loss=loss.item())
    # progress_bar.update(1)

    torch.save(patch, '/home/swyun/md4plm/md4plm/benckmarks/notebooks/adversarial_patch.pth')
  avg_loss = epoch_loss / len(dataloader)
  print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}')

# progress_bar.close()

Epoch [1/10], Loss: 8.1367
Epoch [2/10], Loss: 4.0154
Epoch [3/10], Loss: 2.4419
Epoch [4/10], Loss: 1.7031
Epoch [5/10], Loss: 1.2808


Exception ignored in: <function tqdm.__del__ at 0x7ab0a5fbc790>
Traceback (most recent call last):
  File "/home/swyun/miniconda3/envs/ESM_GearNet_MD/lib/python3.9/site-packages/tqdm/std.py", line 1148, in __del__
    self.close()
  File "/home/swyun/miniconda3/envs/ESM_GearNet_MD/lib/python3.9/site-packages/tqdm/notebook.py", line 279, in close
    self.disp(bar_style='danger', check_delay=False)
AttributeError: 'tqdm_notebook' object has no attribute 'disp'
Exception ignored in: <function tqdm.__del__ at 0x7ab0a5fbc790>
Traceback (most recent call last):
  File "/home/swyun/miniconda3/envs/ESM_GearNet_MD/lib/python3.9/site-packages/tqdm/std.py", line 1148, in __del__
    self.close()
  File "/home/swyun/miniconda3/envs/ESM_GearNet_MD/lib/python3.9/site-packages/tqdm/notebook.py", line 279, in close
    self.disp(bar_style='danger', check_delay=False)
AttributeError: 'tqdm_notebook' object has no attribute 'disp'


Epoch [6/10], Loss: 1.0261
Epoch [7/10], Loss: 0.8574
Epoch [8/10], Loss: 0.7299
