In [None]:
!pip install datasets
!pip install -U "huggingface_hub[cli]"

In [1]:
import os
import random
import uuid
from tqdm import tqdm
import torch
from datasets import load_dataset
from torch import nn
from torch.utils.data import DataLoader
from torchvision.models import ResNet50_Weights, resnet50
import shutil
import zipfile

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
class ResnetPGDAttacker:
    def __init__(self):
        '''
        The PGD attack on Resnet model.
        :param model: The resnet model on which we perform the attack
        :param dataloader: The dataloader loading the input data on which we perform the attack
        '''
        self.model = resnet50(weights=ResNet50_Weights.DEFAULT)
        self.loss_fn = nn.CrossEntropyLoss()
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.model.to(self.device)
        # Nullify gradient for model params
        for p in self.model.parameters():
            p.requires_grad = False

    def pgd_attack(self, image, label, eps, alpha, steps):
        '''
        Create adversarial images for given batch of images and labels

        :param image: Batch of input images on which we perform the attack, size (BATCH_SIZE, 3, 224, 224)
        :param label: Batch of input labels on which we perform the attack, size (BATCH_SIZE)
        :return: Adversarial images for the given input images
        '''
        images = image.clone().detach().to(self.device)
        adv_images = images.clone()
        labels = label.clone().detach().to(self.device)

        # Starting at a uniformly random point within the eps ball
        random_noise = torch.zeros_like(adv_images).uniform_(-eps, eps)
        adv_images = adv_images + random_noise

        self.model.eval()
        for _ in range(steps):
            # Enable gradient tracking for adversarial images
            adv_images.requires_grad = True

            # Get model predictions and apply softmax
            outputs = self.model(adv_images).softmax(1)

            # Calculate loss
            loss = self.loss_fn(outputs, labels)

            # Compute gradient wrt images
            grad = torch.autograd.grad(
                loss, adv_images, retain_graph=False, create_graph=False
            )[0]
            adv_images = adv_images.detach()

            # Gradient update
            adv_images = adv_images + alpha * grad.sign()  # Update adversarial images using the sign of the gradient

            # Projection step
            # Clamping the adversarial images to ensure they are within the L∞ ball of eps radius of original image
            adv_images = torch.clamp(adv_images, images - eps, images + eps)

            adv_images = adv_images.detach()

        return adv_images  # Return the generated adversarial images


In [8]:
class FineTuneDatasetGenerator:
    def __init__(self, batch_size, batch_num, num_perturbations, local_save_path, zip_save_path,
                 zip_every_n_batches=10, add_original=True):
        self.batch_size = batch_size
        self.batch_num = batch_num
        self.num_perturbations = num_perturbations
        self.local_save_path = local_save_path
        self.zip_number = zip_every_n_batches
        self.zip_save_path = zip_save_path
        self.zip_buffer = []
        self.add_original = add_original

        # Create the save directory if it doesn't exist
        os.makedirs(self.local_save_path, exist_ok=True)
        os.makedirs(self.zip_save_path, exist_ok=True)

        weights = ResNet50_Weights.DEFAULT
        self.resnet_transform = weights.transforms()  # PIL -> tensor

        self.pgd_attacker = ResnetPGDAttacker()

        self.ds = load_dataset("ILSVRC/imagenet-1k", split="train", streaming=True, trust_remote_code=True)
        self.ds = self.ds.shuffle()
        self.ds = self.ds.filter(lambda example: example['image'].mode == 'RGB')
        self.ds = self.ds.take(self.batch_num * self.batch_size)
        self.ds = self.ds.map(self.preprocess_img)
        self.dataloader = DataLoader(self.ds, batch_size=self.batch_size)
        print(f"Fine Tune Dataset Generator has been initialized. save path is {self.local_save_path}, zips path is {self.zip_save_path}")

    def preprocess_img(self, example):
        example['image'] = self.resnet_transform(example['image'])
        return example
    
    def add_batch_to_buffer(self, images, labels):
        for image, label in zip(images, labels):
            self.zip_buffer.append((image, label))
        return


    def generate(self):
        for i, batch in enumerate(tqdm(self.dataloader, total=self.batch_num)):
            if i % self.zip_number == 0 and len(self.zip_buffer) > 0:
                self.save_files_in_buffer_and_zip()

            images, labels = batch["image"], batch["label"]
            if self.add_original:
                self.add_batch_to_buffer(images, labels)

            #do perturbations and then add 
            for _ in range(self.num_perturbations):
                # Generate random parameters for PGD attack
                random_eps = random.uniform(0.01, 0.3)
                random_alpha = random.uniform(0.01, 0.07)
                random_steps = random.randint(15, 20)

                # Perform the PGD attack
                perturbed_images = self.pgd_attacker.pgd_attack(images,
                                                                labels,
                                                                eps=random_eps,
                                                                alpha=random_alpha,
                                                                steps=random_steps)
                self.add_batch_to_buffer(perturbed_images, labels)

        if len(self.zip_buffer) > 0:
            # for the last batch
            self.save_files_in_buffer_and_zip()


    def zip_folder(self):
        """Zips the contents of a folder into a zip file."""
        id = str(uuid.uuid4()) + ".zip"
        zip_path = os.path.join(self.zip_save_path, id)
        
        print(f"Zipping current folder {self.local_save_path} with {len(os.listdir(self.local_save_path))} items to {zip_path}")
        
        with zipfile.ZipFile(zip_path, "w") as zip_file:
            for root, dirs, files in os.walk(self.local_save_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    zip_file.write(file_path)
        print(f"Created zip in {zip_path}")
        
    def remove_files_in_directory(self, directory):
        # Iterate over all files and directories in the specified directory
        for filename in os.listdir(directory):
            file_path = os.path.join(directory, filename)
            try:
                if os.path.isfile(file_path):
                    os.remove(file_path)
                elif os.path.isdir(file_path):
                    shutil.rmtree(file_path)
            except Exception as e:
                print(f"Error removing {file_path}: {e}")

    def save_files_in_buffer_and_zip(self):
        for data in tqdm(self.zip_buffer, desc="Saving batch as file in temp location"):
            image, label = data
            img_id = str(uuid.uuid4())
            save_file = os.path.join(self.local_save_path, f"class_{label}_img_{img_id}.pt")
            torch.save(image, save_file)

        #creating zip and copying zip to zip_save_path
        self.zip_folder()

        #deleting all files from local_save_path
        self.remove_files_in_directory(self.local_save_path)

        #clearing buffer for next time
        self.zip_buffer.clear()
        

#Local test:
generator = FineTuneDatasetGenerator(
    add_original=False,
    batch_size=2, 
    batch_num=6, 
    num_perturbations=2, 
    local_save_path= "./data_gen_test",
    zip_save_path="./data_gen_zips_test",
    zip_every_n_batches=2
)
generator.generate()


Fine Tune Dataset Generator has been initialized. save path is ./data_gen_test, zips path is ./data_gen_zips_test


 33%|███▎      | 2/6 [00:13<00:22,  5.57s/it]
Saving batch as file in temp location: 100%|██████████| 8/8 [00:00<00:00, 230.58it/s]


Zipping current folder ./data_gen_test with 8 items to ./data_gen_zips_test\52816ff0-f3f3-4f97-ab3e-4c5c7dab17aa.zip
Created zip in ./data_gen_zips_test\52816ff0-f3f3-4f97-ab3e-4c5c7dab17aa.zip


 67%|██████▋   | 4/6 [00:15<00:05,  2.50s/it]
Saving batch as file in temp location: 100%|██████████| 8/8 [00:00<00:00, 266.53it/s]


Zipping current folder ./data_gen_test with 8 items to ./data_gen_zips_test\ad74644d-bf1c-47de-b3d6-179a66f0c4f8.zip
Created zip in ./data_gen_zips_test\ad74644d-bf1c-47de-b3d6-179a66f0c4f8.zip


100%|██████████| 6/6 [00:17<00:00,  2.84s/it]
Saving batch as file in temp location: 100%|██████████| 8/8 [00:00<00:00, 397.66it/s]


Zipping current folder ./data_gen_test with 8 items to ./data_gen_zips_test\ed7a6850-2a10-49d1-9c1f-0782305da764.zip
Created zip in ./data_gen_zips_test\ed7a6850-2a10-49d1-9c1f-0782305da764.zip


In [None]:
local_path = "/content/dataset"
base_path = "/content/drive/MyDrive/trustworthyml"
zips_path = os.path.join(base_path, "zips")

generator = FineTuneDatasetGenerator(
    batch_size=16, 
    batch_num=1000, 
    num_perturbations=3, 
    local_save_path= local_path,
    zip_save_path=zips_path,
    zip_every_n_batches=4
)
generator.generate()

Unzipping code