In [1]:
import os
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR,ReduceLROnPlateau
# from models.resnet import ResNet18, ResNet34, ResNet50
from models.resnet_soyeong import ResNet34, ResNet18, ResNet50
from models.densenet import DenseNet3
import torch
import copy
from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR
from torch.utils.data import DataLoader, Subset, TensorDataset, RandomSampler
from torchvision import datasets

In [2]:
import torch 
import os
device = "cuda:0"
log_file = "./logs/train_adv_gen_logs.txt"

def eval_accuracy(model,log_file ,m_name,id_name, loader, test_data_name = " ", device = device):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    with open(log_file, 'a') as file:
        file.write(f'{test_data_name} Test Accuracy ({m_name,id_name}): {accuracy:.2f}% \n')
    print(f'{test_data_name} Test Accuracy ({m_name,id_name}): {accuracy:.2f}%')
    return accuracy




## Get DataLoader

In [3]:

def get_loader(dataset,batch_size,id_name,train = False):
    '''
    dataset = name of dataset for which loader is required\
    batch_size = batch size of the dataloader
    id = indistribution data.

    output -> 
    train_loader , testloader (RGB)
    '''
    data_root='./data' 
    print(f"loader requested for {dataset}, to be used on model trained for {id_name} ")
    # Transformations
    #tranformations for OOD and ADV samples:
    # Transformations for Id dataset (training dataset)


    '''
    id = cifar 10  tranform = cifar10
    ood= mnist tranform = cifar10


    id = mnist tranform =mnist
    ood= = cifar10 tranform = mnist
    '''
    if id_name == 'svhn':
        input_size = (32, 32) 
        transform_id = transforms.Compose([
            transforms.Resize(input_size), 
            # transforms.RandomCrop(32, padding=4),
            # transforms.RandomHorizontalFlip(),
            # transforms.Resize(input_size), 
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Common for SVHN
        ])
    elif id_name == 'cifar10':
        input_size = (32, 32) 
        transform_id  = transforms.Compose([
            transforms.Resize(input_size), 
            # transforms.RandomCrop(32, padding=4),
            # transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),   
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))   
        ])
    elif id_name in [ 'mnist','fmnist','kmnist','qmnist']:
        input_size = (32, 32) 
        transform_id=  transforms.Compose([
            transforms.Resize(input_size), 
            # transforms.RandomCrop(32, padding=4),
            # transforms.RandomHorizontalFlip(),
            # transforms.Resize(input_size), 
            transforms.Grayscale(num_output_channels=3), 
            transforms.ToTensor(), 
            transforms.Normalize((0.1307,), (0.3081,))
        ])
    elif id_name == 'cifar100':
        input_size = (32, 32) 
        transform_id  = transforms.Compose([
            transforms.Resize(input_size), 
            # transforms.RandomCrop(32, padding=4),
            # transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),   
            transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))   
        ])
    if train == True:
        datasets_id = {
            'cifar10': (
                datasets.CIFAR10(root=data_root, train=True, download=True, transform=transform_id),
                datasets.CIFAR10(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'cifar100': (
                datasets.CIFAR100(root=data_root, train=True, download=True, transform=transform_id),
                datasets.CIFAR100(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'mnist': (
                datasets.MNIST(root=data_root, train=True, download=True, transform=transform_id),
                datasets.MNIST(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'qmnist': (
                datasets.QMNIST(root=data_root, train=True, download=True, transform=transform_id),
                datasets.QMNIST(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'kmnist': (
                datasets.KMNIST(root=data_root, train=True, download=True, transform=transform_id),
                datasets.KMNIST(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'fmnist': (
                datasets.FashionMNIST(root=data_root, train=True, download=True, transform=transform_id),
                datasets.FashionMNIST(root=data_root, train=False, download=True, transform=transform_id)
            ),
            'svhn': (
                datasets.SVHN(root=data_root, split='train', download=True, transform=transform_id),
                datasets.SVHN(root=data_root, split='test', download=True, transform=transform_id)
            ),
        }
        print(" training dataloaders requested . Tranform = ")
         
        train_dataset, test_dataset = datasets_id[id_name]


        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False,num_workers=4)
        test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,num_workers=4)
        # print(test_loader.dataset.transform)
        # print(" below is the tranform for trainloader")
        # print(train_loader.dataset.transform)
        return train_loader, test_loader

    
    print(" testing loader requested. Transform = ")
    if id_name == 'svhn':
        input_size = (32, 32) 
        transform_rgb = transform_id
        transform_bw =  transforms.Compose([
            transforms.Resize(input_size), 
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Common for SVHN
        ])
    elif id_name == 'cifar10':
        input_size = (32, 32) 
        transform_rgb = transform_id
        transform_bw  = transforms.Compose([
            transforms.Resize(input_size), 
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),   
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))   
        ])
    elif id_name in [ 'mnist','fmnist','kmnist','qmnist']:
        input_size = (32, 32) 
        transform_rgb = transform_id
        transform_bw=  transforms.Compose([
            transforms.Resize(input_size), 
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])
    elif id_name == 'cifar100':
        input_size = (32, 32) 
        transform_rgb = transform_id
        transform_bw  = transforms.Compose([
            transforms.Resize(input_size), 
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),   
            transforms.Normalize((0.5071, 0.4867, 0.4408), (0.2675, 0.2565, 0.2761))   
        ])

    
    datasets_ood = {
        'cifar10': (
            datasets.CIFAR10(root=data_root, train=False, download=True, transform=transform_rgb)
        ),
        'cifar100': (
            datasets.CIFAR100(root=data_root, train=False, download=True, transform=transform_rgb)
        ),
        'mnist': (          
            datasets.MNIST(root=data_root, train=False, download=True, transform=transform_bw)
        ),
        'qmnist': (           
            datasets.QMNIST(root=data_root, train=False, download=True, transform=transform_bw)
        ),
        'kmnist': (             
            datasets.KMNIST(root=data_root, train=False, download=True, transform=transform_bw)
        ),
        'fmnist': (         
            datasets.FashionMNIST(root=data_root, train=False, download=True, transform=transform_bw)
        ),
        'svhn': (
            datasets.SVHN(root=data_root, split='test', download=True, transform=transform_rgb)
        ),
        'eurosat': (
            datasets.EuroSAT(root=data_root, download=True, transform=transform_rgb)   # same for test (no separate split)
        ),
        'fake_data_set': (
             datasets.FakeData(image_size=(3, 32, 32), num_classes=10, transform=transform_rgb, train=False)
        ),
        'isun': (
              datasets.ImageFolder(root=f"{data_root}/iSUN", transform=transform_rgb)
        ),
        'lsun': (
             datasets.LSUN(root=f"{data_root}/lsun_resize", transform=transform_rgb)
        ),
        'dtd': (
            datasets.DTD(root=data_root, split='train', download=True, transform=transform_rgb),
            datasets.DTD(root=data_root, split='test', download=True, transform=transform_rgb)
        ),
        'places365': (
            datasets.Places365(root=data_root, split='val', download=True, transform=transform_rgb)
        )
        # 'inaturalist': (
        #     datasets.INaturalist(root=data_root, version='2021_train', download=False, transform=transform),  # requires download
        #     datasets.INaturalist(root=data_root, version='2021_val', download=True, transform=transform)
        # )
    }
    
   
    
    test_dataset = datasets_ood[id_name]
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False,num_workers=4,)
    print(test_loader.dataset.transform)
    return test_loader




## Generate_ADV_Samples

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import torchattacks
import numpy as np
import os
from tqdm import tqdm 
 
def generate_adversarial_samples(model_name,dataset,training_type):
    """
    Generate and save adversarial samples using FGSM, PGD, DeepFool,CW and AutoAttack.
    
    Args:
        model: PyTorch model to attack
        Dataset: dataset on which model is trained and adv samples are required
        training_type: type of method used to train the model (SGD,ADAM,RMSProp,ADamw)
    """
    batch_size = 4096
    _, test_loader = get_loader( dataset=dataset, batch_size=batch_size,id_name=dataset,train=True)

    ten_classes = ['cifar10','svhn','mnist','fmnist',"kmnist","qmnist"]
    if dataset == 'cifar10' or dataset in ten_classes:
        num_classes = 10
    elif dataset == 'cifar100':  
        num_classes = 100
    else:
        print(" error dataset ")
        exit()
    # Model setup
    if model_name == 'resnet18':
        model = ResNet18(num_c=num_classes).to(device)
    elif model_name == 'resnet34':
        model = ResNet34(num_c=num_classes).to(device)  
    elif model_name == 'resnet50':
        model = ResNet50(num_classes).to(device)          
    elif model_name == 'densenet3':
        model = DenseNet3(100, num_classes, growth_rate=12).to(device)
    
    with open(log_file,'a') as file:
        file.write("model loaded and loader created")
    
    # Initialize attack methods
    os_path = f'/scratch/asing651/mps/Mps'
    model_path = os_path +f'/pre_trained/{training_type}/{model_name}_{dataset}/{model_name}_{dataset}.pth'
    # print("Current Working Directory:", os.getcwd())
    
    # if os.path.exists(os_path):
    #     print( os_path, " exists")
        
    # else:
    #     print( os_path, " not found")
    #     exit()
    # os_path += f'/pre_trained/{training_type}'   
    # if os.path.exists(os_path):
    #     print( os_path, " exists")  
    # else:
    #     print( os_path, " not found")
    #     exit()
    # os_path += f"/{model_name}_{dataset}"
    # if os.path.exists(os_path):
    #     print(os_path, " exists \n")

    if os.path.exists(model_path):
        print(model_path, " exists \n")
        model.load_state_dict(torch.load(model_path, weights_only=True))
    else:
        print("-------------------------------------\n")
        print(model_path)
        print(f" no weights found for {model_name},{dataset},{training_type}")
        return
    model.eval()
    attacks = {
        'fgsm': torchattacks.FGSM(model, eps=0.03),
        'pgd': torchattacks.PGD(model, eps=0.03, alpha=0.01, steps=10),
        'deepfool': torchattacks.DeepFool(model, steps=50, overshoot=0.02),
        'autoattack': torchattacks.AutoAttack(model, eps=0.03,version = 'standard',  norm='Linf'),
        'cw' :  torchattacks.CW(model, c=1e-4, kappa=0, steps=1000, lr=0.01)  ,

        'cw_strong': torchattacks.CW(model, c=0.01, kappa=0, steps=500, lr=0.005),
        'autoattack_strong': torchattacks.AutoAttack(model, eps=0.05, version='standard', norm='Linf'),
        'deepfool_strong': torchattacks.DeepFool(model, steps=50, overshoot=0.05),
        'pgd_strong': torchattacks.PGD(model, eps=0.05, alpha=0.05/40, steps=40),
        'fgsm_strong': torchattacks.FGSM(model, eps=0.05)


    }
    
    # Ensure output directory exists
    os.makedirs(f'./adv_samples/{training_type}', exist_ok=True)
   
    for attack_name, attack in attacks.items():
        save_path = f'./adv_samples/{training_type}/{model_name}_{dataset}_{attack_name}.pt'
        temp_name = attack_name[:-7] 
        if temp_name in attacks:
            # Verify saved data can be loaded
            
            save_path_prev = f'./adv_samples/{training_type}/{model_name}_{dataset}_{temp_name}.pt'
            if os.path.exists(save_path_prev) :
                loaded_data = torch.load(save_path_prev, weights_only=False)
                loaded_data_list, loaded_label_list = zip(*loaded_data)
                inputs_tensor = torch.stack([torch.from_numpy(data).float() for data in loaded_data_list])
                labels_tensor = torch.tensor(loaded_label_list, dtype=torch.long)
                adv_dataset = TensorDataset(inputs_tensor, labels_tensor)
                adv_test_loader = DataLoader(adv_dataset, batch_size=batch_size, shuffle=False,num_workers=4)
                accuracy = eval_accuracy(model,log_file,model_name,dataset,adv_test_loader,attack_name)
                
                print(f'Verified loading for {attack_name} dataset: {len(adv_dataset)} samples')
                if accuracy < 25 : 
                    continue
        if os.path.exists(save_path) :
            loaded_data = torch.load(save_path, weights_only=False)
            loaded_data_list, loaded_label_list = zip(*loaded_data)
            inputs_tensor = torch.stack([torch.from_numpy(data).float() for data in loaded_data_list])
            labels_tensor = torch.tensor(loaded_label_list, dtype=torch.long)
            adv_dataset = TensorDataset(inputs_tensor, labels_tensor)
            adv_test_loader = DataLoader(adv_dataset, batch_size=test_loader.batch_size, shuffle=False,num_workers=4)
            accuracy = eval_accuracy(model,log_file,model_name,dataset,adv_test_loader,temp_name)
            print(" samples already exist ")
            continue
        
        with open(log_file, 'a') as file:
            file.write(f" Creating {attack_name} samples for  {model_name} {dataset} .. \n")
        data_list = []
        label_list = []
    
        for data, target in tqdm(test_loader, desc=f"Generating {attack_name} samples"):
            data, target = data.to(device), target.to(device)
            
            # Generate adversarial samples
            adv_data = attack(data, target)
            
            # Convert to numpy for saving
            # data_list.extend(adv_data.cpu().numpy())
            # label_list.extend(target.cpu().numpy())
            data_list.extend(adv_data.detach().cpu().numpy())
            label_list.extend(target.detach().cpu().numpy())
        
        # Save adversarial samples
        adv_samples = list(zip(data_list, label_list))
        
        torch.save(adv_samples, save_path)
        with open(log_file, 'a') as file:
            file.write(f'Saved {attack_name} adversarial samples to {save_path} \n')
        print(f'Saved {attack_name} adversarial samples to {save_path}')
    
    # Verify saved data can be loaded
    loaded_data = torch.load(save_path, weights_only=False)
    loaded_data_list, loaded_label_list = zip(*loaded_data)
    inputs_tensor = torch.stack([torch.from_numpy(data).float() for data in loaded_data_list])
    labels_tensor = torch.tensor(loaded_label_list, dtype=torch.long)
    adv_dataset = TensorDataset(inputs_tensor, labels_tensor)
    adv_test_loader = DataLoader(adv_dataset, batch_size=test_loader.batch_size, shuffle=False,num_workers=4)
    accuracy = eval_accuracy(model,log_file,model_name,dataset,adv_test_loader,attack_name)
    print(f" accuracy of {model_name , dataset, attack_name,training_type } = {accuracy}  ")
    print(f'Verified loading for {attack_name} dataset: {len(adv_dataset)} samples')

 



## Get DataLoader

# Script

In [None]:
models = ["resnet34","densenet3",'resnet50',"resnet18"]
id_names = ["svhn","fmnist",'cifar10',"cifar100","mnist"]

optim_types = ["SGD"]
optim_types += optim_types
optim_types += optim_types
print(optim_types)
with open(log_file,"a") as file:
    for optim_type in optim_types:
        for id_name in id_names:
            for model in models:
                file.write("----------Gen_ADV-------------\n\n")   
                file.write(f" processing {model} and {id_name} with training optim = {optim_type}\n")
                print(f" processing {model} and {id_name} with training optim = {optim_type}")
                generate_adversarial_samples(model,id_name,optim_type)   
        file.write("-----------------------------------------------\n\n")
 

['SGD', 'SGD', 'SGD', 'SGD']
 processing resnet34 and svhn with training optim = SGD
loader requested for svhn, to be used on model trained for svhn 
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Using downloaded and verified file: ./data/train_32x32.mat
Using downloaded and verified file: ./data/test_32x32.mat
 training dataloaders requested . Tranform = 
/scratch/asing651/mps/Mps/pre_trained/SGD/resnet34_svhn/resnet34_svhn.pth  exists 



Generating fgsm samples: 100%|██████████| 26/26 [00:07<00:00,  3.61it/s]


Saved fgsm adversarial samples to ./adv_samples/SGD/resnet34_svhn_fgsm.pt


Generating pgd samples: 100%|██████████| 26/26 [00:52<00:00,  2.03s/it]


Saved pgd adversarial samples to ./adv_samples/SGD/resnet34_svhn_pgd.pt


Generating deepfool samples: 100%|██████████| 26/26 [2:22:39<00:00, 329.21s/it]  


Saved deepfool adversarial samples to ./adv_samples/SGD/resnet34_svhn_deepfool.pt


Generating autoattack samples:  69%|██████▉   | 18/26 [1:01:22<25:43, 192.96s/it]