In [1]:
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import torch.backends.cudnn as cudnn
cudnn.benchmark = True  # fire on all cylinders
import matplotlib.pyplot as plt

In [2]:
device = torch.device('cpu')
if torch.cuda.is_available():
  device = torch.device('cuda')
print(device)


dtype = torch.float32

# torch.cuda.is_available()


cuda


In [3]:
# Importing MNIST train and test data
train_data = datasets.CIFAR10(root='data', train=True, transform=transforms.ToTensor(), download=True)
test_data = datasets.CIFAR10(root='data', train=False, transform=transforms.ToTensor(), download=True)

Files already downloaded and verified
Files already downloaded and verified


In [4]:
import torch
from torchvision.models import resnet18

class MyModel(nn.Module):
    def __init__(self, output_dim):  # Only output_dim is needed
        super().__init__()
        # Load pre-trained ResNet18 model and freeze its weights
        self.resnet = resnet18(pretrained=True)

        # Adjust the final layer to match your output dimension
        # self.flatten = nn.Flatten()

        self.flatten = nn.Flatten()  # Add the flatten layer
        self.fc = nn.Linear(1000, output_dim,bias=True)  # Input size based on pre-trained model


    def forward(self, x):
        # Pass the input through the ResNet18 model
        x = self.resnet(x)
        # print("X ka shape after resnet ",x.shape)
        # Apply the final linear layer
        # print("X ka shape before flatten ",x.shape)
        x= self.flatten(x)
        # print("X ka shape aafter flatten ",x.shape)
        x = self.fc(x)
        # print("X ka shape which is output",x.shape)

        return x



In [5]:
def check_acc(data, model, loss_fn):
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    with torch.no_grad():
        for x, y in data:
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            scores = model(x)
            _,preds = scores.max(1)
            total_loss += loss_fn(scores, y).item()
            n_samples += preds.size(0)
            n_correct += (preds == y).sum()
            n_batches += 1
    acc = float(n_correct/n_samples)
    loss = float(total_loss/n_batches)
    return acc, loss


In [6]:
def train_model(model, train_data, test_data, loss_fn, num_epochs=5, batch_size=64):

    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.0008)
    accuracy = 0
    for epoch in range(num_epochs):
        train_loss = 0
        for t, (x, y) in enumerate(train_loader):
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            # print("inside train",x.shape)

            train_preds = model(x)
            loss = loss_fn(train_preds, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)
        train_acc, _ = check_acc(train_loader, model, loss_fn)
        print(f'Epoch {epoch}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
        #save the highest accuracy model 
        
        if train_acc > accuracy and train_acc > 0.9:
            accuracy = train_acc
            torch.save(model.state_dict(), 'best_model100.pth')


def eval_model(model, test_data, loss_fn):
    test_loader = DataLoader(test_data, batch_size=64, shuffle=False)
    test_acc, test_loss = check_acc(test_loader, model, loss_fn)
    print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')



def eval_untargetted_fgsm(model, test_data, loss_fn, e=0.1):
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False)
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    
    for x, y in test_loader:
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            x.requires_grad = True
            scores = model(x)
            loss = loss_fn(scores, y)
            model.zero_grad()
            loss.backward()
            x_grad = x.grad.data
            x_adv = x + e * x_grad.sign()
            x_adv = torch.clamp(x_adv, 0, 1)
            out_adv = model(x_adv)
            _,preds = out_adv.max(1)
            total_loss += loss_fn(out_adv, y).item()
            n_samples += preds.size(0)
            n_correct += (preds == y).sum()
            n_batches += 1
    acc = float(n_correct/n_samples)
    loss = float(total_loss/n_batches)
    print(f'Untargetted FGSM Test Loss: {loss:.4f}, Untargetted FGSM Test Acc: {acc:.4f}')
    return acc

# write code for targetted fgsm attack
def eval_targeted_fgsm(model, test_data, loss_fn, target_label, e=0.1):
    test_loader = DataLoader(test_data, batch_size=20, shuffle=False)
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    target_label = target_label.to(device=device, dtype=torch.long)
    
    for x, y in test_loader:
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            x.requires_grad = True
            scores = model(x)
            # loss1 = loss_fn(scores, y)
            # loss2 = loss_fn(scores, target_label)
            # loss = loss1-loss2
            loss = loss_fn(scores, target_label)
            model.zero_grad()
            loss.backward()
            x_grad = x.grad.data
            x_adv = x - e * x_grad.sign()
            x_adv = torch.clamp(x_adv, 0, 1)
            out_adv = model(x_adv)
            _,preds = out_adv.max(1)
            total_loss += loss_fn(out_adv, y).item()
            n_samples += preds.size(0)
            n_correct += (preds == y).sum()
            n_batches += 1
    acc = float(n_correct/n_samples)
    loss = float(total_loss/n_batches)
    print(f'Targetted FGSM Test Loss: {loss:.4f}, Targetted FGSM Test Acc: {acc:.4f}')
    return acc

def eval_untargetted_mim(model, test_data, loss_fn, eps=0.1, alpha=0.9):
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
    ]))
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    momentum = 0

    for x, y in test_loader:
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)
        x.requires_grad = True
        scores = model(x)
        loss = loss_fn(scores, y)
        model.zero_grad()
        loss.backward()
        x_grad = x.grad.data

        # Update momentum and create adversarial example
        momentum = alpha * momentum + (1 - alpha) * x_grad.sign()
        x_adv = x + eps * momentum

        # Clip adversarial example to valid range
        x_adv = torch.clamp(x_adv, 0, 1)

        out_adv = model(x_adv)
        _, preds = out_adv.max(1)
        total_loss += loss_fn(out_adv, y).item()
        n_samples += preds.size(0)
        n_correct += (preds == y).sum()
        n_batches += 1

    acc = float(n_correct / n_samples)
    loss = float(total_loss / n_batches)
    print(f'Untargetted MIM Test Loss: {loss:.4f}, Untargetted MIM Test Acc: {acc:.4f}')
    return acc

def eval_targetted_mim(model, test_data, loss_fn, eps=0.1, alpha=0.9, target_label=1):
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False, transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,)),
    ]))
    target_label = target_label.to(device=device, dtype=torch.long)
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    momentum = 0

    for x, y in test_loader:
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)
        x.requires_grad = True
        scores = model(x)
        loss = loss_fn(scores, target_label)
        model.zero_grad()
        loss.backward()
        x_grad = x.grad.data

        # Update momentum and create adversarial example
        momentum = alpha * momentum + (1 - alpha) * x_grad.sign()
        x_adv = x - eps * momentum

        # Clip adversarial example to valid range
        x_adv = torch.clamp(x_adv, 0, 1)

        out_adv = model(x_adv)
        _, preds = out_adv.max(1)
        total_loss += loss_fn(out_adv, y).item()
        n_samples += preds.size(0)
        n_correct += (preds == y).sum()
        n_batches += 1

    acc = float(n_correct / n_samples)
    loss = float(total_loss / n_batches)
    print(f'Untargetted MIM Test Loss: {loss:.4f}, Untargetted MIM Test Acc: {acc:.4f}')
    return acc

def eval_untargetted_rays(model, test_data, loss_fn, eps=0.1, num_iter=10, alpha=0.75):

  test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

  n_samples = 0
  n_correct = 0
  n_batches = 0
  total_loss = 0

  for x, y in test_loader:
    x = x.to(device=device, dtype=dtype)
    y = y.to(device=device, dtype=torch.long)
    x_adv = x.clone().detach().requires_grad_(True)

    for _ in range(num_iter):
      scores = model(x_adv)
      loss = loss_fn(scores, y)
      model.zero_grad()
      loss.backward()

      # Get gradient and normalize
      grad = x_adv.grad.data / torch.clamp(x_adv.grad.norm(dim=1, keepdim=True), min=1e-6)

      # Momentum update
      grad_update = alpha * grad + (1 - alpha) * x_adv - x

      # Project onto feasible space
      eta = torch.min(eps, torch.linalg.norm(grad_update, dim=1, keepdim=True))
      x_adv = x_adv + eta * grad_update / torch.clamp(torch.linalg.norm(grad_update, dim=1, keepdim=True), min=1e-6)

      # Clamp to image range
      x_adv = torch.clamp(x_adv, 0, 1)

      # Detach and require grad for next iteration
      x_adv = x_adv.detach().requires_grad_(True)

    scores_adv = model(x_adv)
    _, preds = scores_adv.max(1)

    total_loss += loss_fn(scores_adv, y).item()
    n_samples += preds.size(0)
    n_correct += (preds == y).sum().item()
    n_batches += 1

  acc = float(n_correct / n_samples)
  loss = float(total_loss / n_batches)
  print(f'Untargeted RayS Test Loss: {loss:.4f}, Untargeted RayS Test Acc: {acc:.4f}')
  return acc

def eval_targetted_rays(model, test_data, loss_fn, eps=0.1, num_iter=10, alpha=0.75,target_label=1):

  test_loader = DataLoader(test_data, batch_size=32, shuffle=False)
  target_label = target_label.to(device=device, dtype=torch.long)

  n_samples = 0
  n_correct = 0
  n_batches = 0
  total_loss = 0
  for x, y in test_loader:
    x = x.to(device=device, dtype=dtype)
    y = y.to(device=device, dtype=torch.long)
    x_adv = x.clone().detach().requires_grad_(True)

    for _ in range(num_iter):
      scores = model(x_adv)
      loss = loss_fn(scores, target_label)
      model.zero_grad()
      loss.backward()

      # Get gradient and normalize
      grad = x_adv.grad.data / torch.clamp(x_adv.grad.norm(dim=1, keepdim=True), min=1e-6)

      # Momentum update
      grad_update = alpha * grad + (1 - alpha) * x_adv - x

      # Project onto feasible space
      eta = torch.min(eps, torch.linalg.norm(grad_update, dim=1, keepdim=True))
      x_adv = x_adv - eta * grad_update / torch.clamp(torch.linalg.norm(grad_update, dim=1, keepdim=True), min=1e-6)

      # Clamp to image range
      x_adv = torch.clamp(x_adv, 0, 1)

      # Detach and require grad for next iteration
      x_adv = x_adv.detach().requires_grad_(True)

    scores_adv = model(x_adv)
    _, preds = scores_adv.max(1)

    total_loss += loss_fn(scores_adv, y).item()
    n_samples += preds.size(0)
    n_correct += (preds == y).sum().item()
    n_batches += 1

  acc = float(n_correct / n_samples)
  loss = float(total_loss / n_batches)
  print(f'Untargeted RayS Test Loss: {loss:.4f}, Untargeted RayS Test Acc: {acc:.4f}')
  return acc

def eval_untargetted_pgd(model, test_data, loss_fn, e=0.1, a=0.01, num_iter=30):
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False)
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    
    for x, y in test_loader:
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)
        x_adv = x.clone().detach().requires_grad_(True)
        
        for t in range(num_iter):
            scores = model(x_adv)
            loss = loss_fn(scores, y)
            model.zero_grad()
            loss.backward()
            x_adv_grad = x_adv.grad.data
            x_adv = x_adv + a * x_adv_grad.sign()
            x_adv = torch.min(torch.max(x_adv, x - e), x + e)
            x_adv = torch.clamp(x_adv, 0, 1)
            x_adv = x_adv.detach().requires_grad_(True)
        
        scores_adv = model(x_adv)
        _, preds = scores_adv.max(1)
        
        total_loss += loss_fn(scores_adv, y).item()
        n_samples += preds.size(0)
        n_correct += (preds == y).sum().item()
        n_batches += 1
    
    acc = float(n_correct / n_samples)
    loss = float(total_loss / n_batches)
    print(f'Untargeted PGD Test Loss: {loss:.4f}, Untargeted PGD Test Acc: {acc:.4f}')
    return acc
     
def eval_targeted_pgd(model, test_data, loss_fn, target_label, e=0.1, a=0.01, num_iter=30):
    test_loader = DataLoader(test_data, batch_size=20, shuffle=False)
    n_samples = 0
    n_correct = 0
    n_batches = 0
    total_loss = 0
    
    for x, y in test_loader:
        x = x.to(device=device, dtype=dtype)
        y = y.to(device=device, dtype=torch.long)
        target_label = target_label.to(device=device, dtype=torch.long)
        x_adv = x.clone().detach().requires_grad_(True)
        
        for t in range(num_iter):
            scores = model(x_adv)
            # loss1 = loss_fn(scores, y)
            # loss2 = loss_fn(scores, target_label)
            # loss = loss1-loss2
            loss = loss_fn(scores, target_label)
            model.zero_grad()
            loss.backward()
            x_grad = x_adv.grad.data
            
            # Perturb the input with PGD
            x_adv = x_adv - a * x_grad.sign()
            x_adv = torch.min(torch.max(x_adv, x - e), x + e)
            x_adv = torch.clamp(x_adv, 0, 1)
            x_adv = x_adv.detach().requires_grad_(True)
        
        scores_adv = model(x_adv)
        loss_adv = loss_fn(scores_adv, y)
        _, preds = scores_adv.max(1)
        
        total_loss += loss_adv.item()
        n_samples += preds.size(0)
        n_correct += (preds == y).sum().item()
        n_batches += 1
    
    acc = float(n_correct / n_samples)
    loss = float(total_loss / n_batches)
    print(f'Targeted PGD Test Loss: {loss:.4f}, Targeted PGD Test Acc: {acc:.4f}')
    return acc


In [7]:
# Now performing adversarial training on the model and checking the accuracies again

def adversarial_training(model, train_data, test_data, loss_fn, e=0.05, a=0.01, num_iter=20, num_epochs=10, batch_size=32):
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

    for epoch in range(num_epochs):
        for t, (x, y) in enumerate(train_loader):
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            x.requires_grad = True
            # 25% samples are adversarial
            if np.random.rand() < 0.05:
                for t in range(num_iter):
                    scores = model(x)
                    loss = loss_fn(scores, y)
                    model.zero_grad()
                    loss.backward()
                    x_grad = x.grad.data
                    x_adv = x + a * x_grad.sign()
                    x_adv = torch.min(torch.max(x_adv, x - e), x + e)
                    x_adv = torch.clamp(x_adv, 0, 1)
                    x = x_adv.detach().requires_grad_(True)
            train_preds = model(x)
            loss = loss_fn(train_preds, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss += loss.item()
        loss /= len(train_loader)
        train_acc, _ = check_acc(train_loader, model, loss_fn)
        print(f'Epoch {epoch}, Train Loss: {loss:.4f}, Train Acc: {train_acc:.4f}')
    
    print("Clean Accuracy:")
    eval_model(model, test_data, loss_fn)

In [8]:
from torch.utils.data import random_split

intermed_len = int(len(train_data) * 0.25)
robust_len = len(train_data) - intermed_len
intermed_train, robust_train = random_split(train_data, [intermed_len, robust_len])

In [21]:
rst_model = MyModel(10).to(device=device)
from torchvision.models.resnet import resnet18 as _resnet18

# test_model = _resnet18(pretrained=True,).to(device=device)
loss_fn = nn.CrossEntropyLoss()
train_model(rst_model, intermed_train, test_data, loss_fn, num_epochs=50)



Epoch 0, Train Loss: 1.4084, Train Acc: 0.6847
Epoch 1, Train Loss: 0.9529, Train Acc: 0.7438
Epoch 2, Train Loss: 0.7297, Train Acc: 0.8360
Epoch 3, Train Loss: 0.5419, Train Acc: 0.8598
Epoch 4, Train Loss: 0.4607, Train Acc: 0.9041
Epoch 5, Train Loss: 0.3776, Train Acc: 0.9261
Epoch 6, Train Loss: 0.3029, Train Acc: 0.9412
Epoch 7, Train Loss: 0.2961, Train Acc: 0.9452
Epoch 8, Train Loss: 0.2122, Train Acc: 0.9618
Epoch 9, Train Loss: 0.1459, Train Acc: 0.9670
Epoch 10, Train Loss: 0.2650, Train Acc: 0.9264
Epoch 11, Train Loss: 0.1828, Train Acc: 0.9618
Epoch 12, Train Loss: 0.1344, Train Acc: 0.9687
Epoch 13, Train Loss: 0.0957, Train Acc: 0.9775
Epoch 14, Train Loss: 0.0960, Train Acc: 0.9810
Epoch 15, Train Loss: 0.0901, Train Acc: 0.9761
Epoch 16, Train Loss: 0.0933, Train Acc: 0.9806
Epoch 17, Train Loss: 0.0920, Train Acc: 0.9751
Epoch 18, Train Loss: 0.0986, Train Acc: 0.9775
Epoch 19, Train Loss: 0.1043, Train Acc: 0.9773
Epoch 20, Train Loss: 0.1142, Train Acc: 0.9572
Ep

In [22]:
def get_predictions(model, dataset, device='cuda'):
    """
    Generate predictions for a given dataset using a trained model.

    Parameters:
    - model: Trained PyTorch model.
    - dataset: Dataset for which to predict labels. Should be a DataLoader.
    - device: Device to use for computation ('cuda' or 'cpu').

    Returns:
    - Tensor of predictions.
    """
    model.eval()  # Set the model to evaluation mode
    predictions = []

    with torch.no_grad():  # No need to track gradients
        for data in dataset:
            inputs = data[0].to(device)  # Assumes data is a tuple of (inputs, labels)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)  # Get the index of the max log-probability
            predictions.extend(predicted.cpu().tolist())

    return torch.tensor(predictions)  # Convert list of predictions to a tensor

# Create DataLoader for the remainder dataset
robust_loader = DataLoader(robust_train, batch_size=75, shuffle=False)

# # Assuming your model and remainder_loader are defined, and you've set the appropriate device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
predictions = get_predictions(rst_model, robust_loader, device=device)

eval_model(rst_model, robust_train, loss_fn)

Test Loss: 1.4298, Test Acc: 0.7346


In [11]:
from torch.utils.data import Dataset, ConcatDataset

pseudo_data = torch.stack([data for data, _ in robust_train])

class PseudoLabeledDataset(Dataset):
    def __init__(self, data, pseudo_labels):
        """
        A dataset wrapping tensors of data and pseudo-labels.
        
        Parameters:
        - data (Tensor): The data points.
        - pseudo_labels (Tensor): The pseudo-labels for the data points.
        """
        assert data.size(0) == pseudo_labels.size(0), "Data and labels must have the same size"
        self.data = data
        self.labels = pseudo_labels

    def __len__(self):
        return self.data.size(0)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
    
class TensorLabelDataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        data, label = self.dataset[idx]
        return data, torch.tensor(label)

intermed_train = TensorLabelDataset(intermed_train)
pseudo_labeled_dataset = PseudoLabeledDataset(pseudo_data, predictions)

final_dataset = ConcatDataset([pseudo_labeled_dataset, intermed_train])

In [18]:
import torch
import torch.nn as nn
import torch.nn.functional as F

def fgsm_attack(model, loss_fn, images, labels, epsilon):
    
    """
    Generates adversarial examples using the Fast Gradient Sign Method.

    Parameters:
    - model: The neural network model.
    - loss_fn: Loss function used to compute gradients.
    - images: Original images.
    - labels: True labels for the images.
    - epsilon: Perturbation magnitude.

    Returns:
    - Adversarial examples.
    """

    images = images.clone().detach().requires_grad_(True).to(device)
    labels = labels.to(device)

    # print(images.shape)

    # Forward pass
    outputs = model(images)
    loss = loss_fn(outputs, labels)
    loss.requires_grad_(True)
    loss.backward()

    # Backward pass to compute gradients
    model.zero_grad()

    images.requires_grad_(True)

    # Generate adversarial examples
    adv_images = images + epsilon * images.grad.sign()
    adv_images = torch.clamp(adv_images, 0, 1)  # Ensure the perturbed images are valid

    return adv_images

class robust_loss():
    """
    Computes the robust loss function, combining cross-entropy with a regularization term.

    Parameters:
    - model: The neural network model.
    - inputs: Input images.
    - labels: True labels for the images.
    - epsilon: Perturbation magnitude for adversarial examples.
    - alpha: Weighting factor for the regularization term.

    Returns:
    - The computed loss.
    """
    def __init__(self, model, dataset, epsilon=0.03, alpha=0.5):
        self.model = model
        self.dataset = dataset
        self.epsilon = epsilon
        self.alpha = alpha

    def get_inputs_for_label(self, target_label):
        matching_images = []
        found_labels = {}
        for data, label in self.dataset:
            if label in target_label and label not in found_labels:
                matching_images.append(data.unsqueeze(0))  # Add an extra dimension to match the batch dimension
                found_labels[label] = True
            if len(found_labels) == len(target_label):  # If we've found a match for each label
                break
        if not matching_images:  # If no matching images were found
            return None
        return torch.cat(matching_images, 0).to(device)  # Concatenate along the batch dimension and move to the correct device

    def robust_loss_fn(self, train_preds, y):

        # Standard cross-entropy loss
        train_preds = train_preds.to(device)
        y = y.to(device)
        loss = nn.CrossEntropyLoss()
        std_loss = loss(train_preds, y)

        # Generate adversarial examples
        adv_inputs = fgsm_attack(self.model, nn.CrossEntropyLoss(), self.get_inputs_for_label(y), y, self.epsilon)

        # Compute loss for adversarial examples
        adv_outputs = self.model(adv_inputs)
        adv_loss = F.cross_entropy(adv_outputs, y)

        # Combine standard loss with adversarial loss
        combined_loss = (1 - self.alpha) * std_loss + self.alpha * adv_loss

        return combined_loss


In [None]:
# print(intermed_train[0][1], pseudo_labeled_dataset[0][1])


rst_model = rst_model.to(device)
loss_fn = robust_loss(model = rst_model, dataset = final_dataset)
adversarial_training(rst_model, final_dataset, test_data, loss_fn.robust_loss_fn, e=0.1, a=0.01, num_iter=30, num_epochs=10, batch_size=32)