In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "6"

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
from torchvision.utils import save_image
from torchvision.io import read_image
from torch.utils.data import DataLoader,Dataset, ConcatDataset, Subset
# import torchsummary
import torchattacks
from autoattack import AutoAttack
from pytorchcv.model_provider import get_model as ptcv_get_model
 
import time
import os
import copy
import robustbench
from robustbench.data import load_cifar10
from robustbench.utils import load_model, clean_accuracy
from scripts.custom_Dataset import CustomDataset


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
def train_model(model, criterion, optimizer, scheduler, trainloader, num_epochs=15):
    global train_dataset_1
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        model.train()  # Set model to training mode
        running_loss = 0.0
        running_corrects = 0

        # Iterate over data.
        for inputs, labels in trainloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            # track history if only in train
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)

                # backward + optimize only if in training phase
                loss.backward()
                optimizer.step()

            # statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        # scheduler.step()

        epoch_loss = running_loss / len(train_dataset)
        epoch_acc = running_corrects.double() / len(train_dataset)

        print(f'Train Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        # deep copy the model
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

class Normalize(nn.Module):
    def __init__(self, mean, std):
        super(Normalize, self).__init__()
        self.mean = torch.Tensor(mean).view(-1, 1, 1)
        self.std = torch.Tensor(std).view(-1, 1, 1)

    def forward(self, x):
        if x.is_cuda:
            self.mean = self.mean.to(x.device)
            self.std = self.std.to(x.device)
        return (x - self.mean) / self.std

In [7]:
batch_size = 256
num_classes = 100
transform = transforms.Compose([transforms.ToTensor()])
# transform = transforms.Compose([transforms.ToTensor(),
#                                 transforms.Resize((224,224))])
image_folder = '../BLIP/experiment_1/cifar100/clean_train_generated/'
label_file = '../BLIP/experiment_1/cifar100/clean_train_labels.txt'
# # captions_file = "../BLIP/experiment_1/stl_10/clean_test_captions.txt"
train_dataset = CustomDataset(image_folder, label_file, transform=transform)
# train_dataset_2 = torchvision.datasets.CIFAR100(root="../data", train=True, transform=transform)
# train_dataset = ConcatDataset([train_dataset_1, train_dataset_2])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)

# image_folder = '../BLIP/experiment_1/cifar10/clean_test_generated'
# label_file = '../BLIP/experiment_1/cifar10/clean_test_labels.txt'
# # captions_file = "../BLIP/experiment_1/stl_10/clean_test_captions.txt"
# test_dataset = CustomDataset(image_folder, label_file, transform=transform)
# test_dataset = torchvision.datasets.CIFAR10(root="../data", train=False, transform=transform)
# test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [8]:
mean_rgb = (0.4914, 0.4822, 0.4465)
std_rgb = (0.2023, 0.1994, 0.2010)

# torch.save(model, "resnet50_stl10_trained_on_diffusion_images.pth")
# model = torch.load("resnet50_stl10.pth")
# model = torch.load("resnet50_stl10_trained_on_diffusion_images.pth")
# model = model.to(device)
# model = load_model(model_name='Standard_R50', dataset='imagenet', threat_model='Linf')
# model = load_model(model_name='Standard', dataset='cifar10', threat_model='Linf')
# model =  nn.DataParallel(model).to(device)
model = ptcv_get_model("wrn28_10_cifar100", pretrained=True)
norm = Normalize(mean_rgb, std_rgb)   
model = nn.Sequential(norm, model)
model = model.to(device)

In [5]:
model.model.fc = nn.Linear(in_features=model.model.fc.in_features,
                     out_features=num_classes,
                     bias=True)
model = model.to(device)

In [17]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [18]:
model = train_model(model, criterion, optimizer, exp_lr_scheduler,
                       train_loader, num_epochs=5)

Epoch 0/4
----------


Train Loss: 0.0260 Acc: 0.9932

Epoch 1/4
----------
Train Loss: 0.0027 Acc: 0.9998

Epoch 2/4
----------
Train Loss: 0.0013 Acc: 1.0000

Epoch 3/4
----------
Train Loss: 0.0007 Acc: 1.0000

Epoch 4/4
----------
Train Loss: 0.0004 Acc: 1.0000

Training complete in 2m 23s
Best Acc: 1.000000


In [23]:
# torch.save(model, "resnet_50_pretrained_cifar10_trained_on_diffusion_2.pth")

In [11]:
from tqdm import tqdm


def calculate_accuracy(model, dataloader):
    total_samples = 0
    correct_predictions = 0
    model = model.eval()
    with torch.no_grad():
        for images, labels in tqdm(dataloader):
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted = torch.max(outputs, dim=1)

            total_samples += labels.size(0)
            correct_predictions += (predicted == labels).sum().item()

    accuracy = correct_predictions / total_samples
    return accuracy

In [29]:
image_folder = '../BLIP/experiment_1/cifar100/clean_test_generated/'
label_file = '../BLIP/experiment_1/cifar100/clean_test_labels_test.txt'
test_dataset = CustomDataset(image_folder, label_file, transform=transform, num=8160)
# test_dataset = datasets.CIFAR100("../data/", train=False, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=8)


In [30]:
calculate_accuracy(model, test_loader)

100%|██████████| 32/32 [00:02<00:00, 14.60it/s]


0.5870098039215687

In [28]:
torch.save(model, "wrn28_10_cifar100_trained_on_diffusion_images.pth")
# model = torch.load("resnet50_stl10.pth")

In [23]:
model2 = ptcv_get_model("wrn28_10_cifar100", pretrained=True)
norm2 = Normalize(mean_rgb, std_rgb)   
model2 = nn.Sequential(norm2, model2)
model2 = model2.to(device)