In [1]:
import numpy as np
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt

from tqdm import tqdm

In [2]:
batch_size = 64
num_classes = 10
learning_rate = 10e-3
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
#Loading the dataset and preprocessing
train_dataset = torchvision.datasets.MNIST(root = './data',
                                           train = True,
                                           transform = transforms.Compose([
                                                  transforms.Resize((32,32)),
                                                  transforms.ToTensor(),
                                                  transforms.Normalize(mean = (0.1307,), std = (0.3081,))]),
                                           download = True)


test_dataset = torchvision.datasets.MNIST(root = './data',
                                          train = False,
                                          transform = transforms.Compose([
                                                  transforms.Resize((32,32)),
                                                  transforms.ToTensor(),
                                                  transforms.Normalize(mean = (0.1325,), std = (0.3105,))]),
                                          download=True)


train_loader = torch.utils.data.DataLoader(dataset = train_dataset,
                                           batch_size = batch_size,
                                           shuffle = False)


test_loader = torch.utils.data.DataLoader(dataset = test_dataset,
                                           batch_size = 64, # TEST BATCH SIZE
                                           shuffle = False)

In [4]:
#Defining the convolutional neural network
class LeNet5(nn.Module):
    def __init__(self, num_classes):
        super(LeNet5, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=0),
            nn.BatchNorm2d(6),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2))
        self.fc = nn.Linear(400, 120)
        self.relu = nn.ReLU()
        self.fc1 = nn.Linear(120, 84)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(84, num_classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.relu(out)
        out = self.fc1(out)
        out = self.relu1(out)
        out = self.fc2(out)
        return out

In [5]:
LeNet = LeNet5(num_classes).to(device)

#Setting the loss function
cost = nn.CrossEntropyLoss()

#Setting the optimizer with the model parameters and learning rate
optimizer = torch.optim.Adam(LeNet.parameters(), lr=learning_rate)

#this is defined to print how many steps are remaining when training
total_step = len(train_loader)

In [6]:
# LOAD FROM SAVED FILE
LeNet = torch.load('LeNet.pth')
LeNet.eval()

LeNet5(
  (layer1): Sequential(
    (0): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
    (1): BatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc): Linear(in_features=400, out_features=120, bias=True)
  (relu): ReLU()
  (fc1): Linear(in_features=120, out_features=84, bias=True)
  (relu1): ReLU()
  (fc2): Linear(in_features=84, out_features=10, bias=True)
)

https://federicozanotti.github.io/2021-09-16-frank-wolfe/

In [None]:
def flatten_data(batched_data):
    b, _, d, _ = batched_data.shape
    return batched_data.view(b, 1, d*d)

def unflatten_data(flattened_data, d):
    b, _ = flattened_data.shape[:2]
    return flattened_data.view(b, 1, d, d)

In [23]:
out = True
for a,l in test_loader:
    if out:
        print(l)
        #print("loss test")
        #print(F_torch(a,l,LeNet))
        #print("grad test")
        #print(RandGradEst(a,l,0.3,32,LeNet))
        print(Avg_RandGradEst(a,l,30,0.3,32,LeNet).shape)
        print('Avg_RandGradEst_Par')
        print(Avg_RandGradEst_Par(a, l, 30, 0.3, 32, LeNet).shape)
    out = False

tensor([7, 2, 1, 0, 4, 1, 4, 9, 5, 9, 0, 6, 9, 0, 1, 5, 9, 7, 3, 4, 9, 6, 6, 5,
        4, 0, 7, 4, 0, 1, 3, 1, 3, 4, 7, 2, 7, 1, 2, 1, 1, 7, 4, 2, 3, 5, 1, 2,
        4, 4, 6, 3, 5, 5, 6, 0, 4, 1, 9, 5, 7, 8, 9, 3])
torch.Size([32, 32])
Avg_RandGradEst_Par


RuntimeError: The size of tensor a (1920) must match the size of tensor b (30) at non-singleton dimension 0

In [26]:
def F_torch(x, y_true, model):
    """
    Loss function for all the examples

    Input:
    - x: images [batch_size, channels, height, width]
    - y_true: true labels of the images [batch_size]
    - model: the PyTorch model used for prediction

    Returns:
    - Loss: computed loss
    """

    # Predict class scores
    f = model(x)
    
    # Gather the scores for the correct class
    f_yi = torch.gather(f, 1, y_true.view(-1, 1)).squeeze()
    
    # Create a mask for the correct class, setting them to a very small value
    mask = torch.arange(f.size(1)).expand(f.size(0), f.size(1)).to(y_true.device)  # This creates a matrix of size batch_size x num_classes
    mask = mask != y_true.view(-1, 1)  # This gives a mask where the correct class for each example is set to False
    
    # Using the mask, set scores of the correct class to a very low value so they won't be selected
    f_j = torch.where(mask, f, torch.tensor(-1e10).to(f.device)).max(dim=1).values
    
    # Compute the loss and apply the ReLU operation
    loss = torch.relu(f_yi - f_j)
    
    return loss.mean()

def RandGradEst(x, y_true, v, d, model):
    """
    Two-point (gaussian) random gradient estimator

    Input:
    - x: image [batch_size, channels, height, width]
    - y_true: true labels of the images [batch_size]
    - v: smoothing parameter
    - d: dimensionality of the image (typically, channels x height x width)
    - model: the PyTorch model used for prediction inside the loss function F_torch

    Returns:
    - Gradient estimate
    """
    device = x.device  # Get the device of the input tensor (either CPU or CUDA)

    # Create a tensor with standard normal values
    u = torch.randn((1, d, d), device=device)

    # Compute the F function values for the two points
    F_plus = F_torch(x + v*u, y_true, model)
    F_ = F_torch(x, y_true, model)

    # Calculate the gradient estimate
    grad_estimate = (d/v)*(F_plus - F_)*u

    return grad_estimate

def Avg_RandGradEst(x, y_true, q, v, d, model):
    """
    Averaged (gaussian) random gradient estimator

    Input:
    - x: image [batch_size, channels, height, width]
    - y_true: true labels of the images [batch_size]
    - q: number of random directions
    - v: smoothing parameter
    - d: dimensionality of the image (typically, channels x height x width)
    - model: the PyTorch model used for prediction inside the F_torch function

    Returns:
    - Averaged gradient estimate
    """
    device = x.device  # Get the device of the input tensor (either CPU or CUDA)

    # Create a tensor with standard normal values for all q directions
    u = torch.randn((q, d, d), device=device)

    # Compute the F function value for the original point
    F_ = F_torch(x, y_true, model)
    
    g = 0
    for j in range(q):
        F_plus = F_torch(x + v*u[j], y_true, model)
        g = g + (F_plus - F_)*u[j]

    # Calculate the averaged gradient estimate
    avg_grad_estimate = (d/(v*q))*g

    return avg_grad_estimate

def stop_attack(x, y_true, model):
    with torch.no_grad():
        # Assuming the model outputs raw logits
        logits = model(x)
        success = torch.argmax(logits, dim=1)
        return torch.sum(success == y_true).item() == 0


In [40]:
def F_Par_torch(x, y_true, model):
    """
    Loss function for only one example

    Input:
    - x: image [batch_size, channels, height, width]
    - y_true: true label of the image [batch_size]

    Returns:
    - Computed loss values
    """
    # Model prediction
    f = model(x)
    
    # Gather the predictions for the true class label
    f_yi = torch.gather(f, 1, y_true.view(-1, 1)).squeeze()
    
    # Mask out the true class labels
    f_j_values, _ = torch.max(f + (y_true.view(-1, 1) == 1).float() * -1e10, dim=1)

    return torch.clamp(f_yi - f_j_values, min=0)

def Avg_RandGradEst_Par(x, y_true, q, v, d, model):
    """
    Averaged (gaussian) random gradient estimator in parallel

    Input:
    - x: image [batch_size, channels, height, width]
    - y_true: true labels of the images [batch_size]
    - q: number of random directions
    - v: smoothing parameter
    - d: dimensionality of the image (typically, channels x height x width)
    
    Returns:
    - Averaged gradient estimate
    """
    device = x.device  # Get the device of the input tensor (either CPU or CUDA)
    
    # Generate random directions
    u = torch.randn((q, d), device=device)

    # Compute the F function value for the original point
    F_ = F_Par_torch(x, y_true, model).mean()

    # Create tensors for q directions and compute the difference
    x_par_plus = (x.unsqueeze(0) + v*u.unsqueeze(1)).view(-1, *x.shape[1:])
    diff = F_Par_torch(x_par_plus, y_true.repeat(q), model) - F_

    # Compute the gradient estimate
    g = torch.sum((diff.view(q, -1) / v).unsqueeze(2) * u.unsqueeze(1), dim=0)

    return (d/(q*v))*g.mean(dim=0)


In [22]:
def Avg_RandGradEst_Par(x, y_true, q, v, d, model):
    """
    Averaged (gaussian) random gradient estimator in parallel

    Input:
    - x: image [batch_size, channels, height, width]
    - y_true: true labels of the images [batch_size]
    - q: number of random directions
    - v: smoothing parameter
    - d: dimensionality of the image (typically, height x width)
    - model: the PyTorch model used for prediction inside the F_torch function

    Returns:
    - Averaged gradient estimate
    """
    device = x.device

    # Create a tensor with standard normal values for all q directions
    u = torch.randn((q, 1, d, d), device=device)

    # Duplicate x for all q directions
    x_q = x.repeat(q, 1, 1, 1)
    y_q = y_true.repeat(q)

    # Compute the F function value for the original point
    F_ = F_torch(x, y_true, model)
    
    F_plus = F_torch(x_q + v*u, y_q, model)
    diff = F_plus - F_

    g = (diff[:, None, :, :] * u).sum(dim=0)  # Average over the q directions

    # Calculate the averaged gradient estimate
    avg_grad_estimate = (d/(v*q))*g

    return avg_grad_estimate


In [27]:

def ZSCG(N, d, s, m_k, x, y_true_in, model, v=-1, alpha=-1, B=1, verbose=True, clip=False):
    device = x.device  # Assuming x is already a PyTorch tensor

    if v == -1:
        v = torch.sqrt(2 / (N * (d + 3)**3))
    if alpha == -1:
        alpha = 1 / torch.sqrt(torch.tensor(N, dtype=torch.float32))

    x_ori = x.clone()
    loss_ZSCG = []
    perturbations = []
    loss_ZSCG.append(F_torch(x, y_true_in, model))

    if verbose:
        print("Epoch:", 0, "Loss:", F_torch(x_ori, y_true_in, model).item(), "Distortion:", torch.max(torch.abs(x - x_ori)).item())
    for k in range(N):
        v_k = 0
        for i in tqdm(range(x.shape[0]), disable= not verbose):
            v_k += Avg_RandGradEst(x[i:i+1], y_true_in[i:i+1], m_k, v, d, model)
            #v_k += Avg_RandGradEst_Par(x[i:i+1], y_true_in[i:i+1], m_k, v, d, model)

        v_k = (1 / x.shape[0]) * v_k

        x_k = -s * torch.sign(v_k) + x_ori 
        x = (1 - alpha) * x + alpha * x_k
        if clip:
            x = x_ori + torch.clamp((x - x_ori), 0, 1)
        perturbations.append(x)
        loss_ZSCG.append(F_torch(x, y_true_in, model).item())
        if verbose:
            print("-"*100)
            print("Epoch:", k+1, "Loss:", loss_ZSCG[k], "Distortion:", torch.max(torch.abs(x - x_ori)).item())
        if stop_attack(x, y_true_in):  # Assuming this function handles PyTorch tensors
            print("Attack successful! stopping computation...")
            return loss_ZSCG, x

    ZSCG_x_perturbated = x
    print("ZSCG Final loss =", loss_ZSCG[-1])
    return loss_ZSCG, ZSCG_x_perturbated, perturbations


In [33]:
n=100
x, _, y_true_in = get_data(n, 4,test_grabber)
epochs=100


In [39]:
28**2

784

In [38]:

ZSCG(N, d, s, m_k, x, y_true_in,v=-1,alpha=-1, B=1,verbose=True, clip=False)
loss_Z, x_Z, p1=ZSCG(epochs, 784, 0.1, 30, x, y_true_in,verbose=True)

TypeError: ZSCG() missing 1 required positional argument: 'model'

In [32]:
from torch.utils.data import DataLoader

# Create an unbatched DataLoader
test_grabber = DataLoader(test_dataset, batch_size=1, shuffle=False)

def get_data(n, c, test_loader):
    """
    Return x, x_ori, y_true_in.
    """
    x_ori, y_true_in = extract_images(n, c, test_loader)
    x = x_ori.clone()
    return x, x_ori, y_true_in

def extract_images(n, c, test_loader):
    """
    Extract some images of the same class from a DataLoader.

    Input:
    - n: number of images to extract
    - c: label
    - test_loader: DataLoader to extract images from
    """
    x_extr = []
    y_extr = []
    
    for x, y in test_loader:
        if y.item() == c:
            x_extr.append(x)
            y_extr.append(y)
            
        if len(x_extr) == n:
            break

    return torch.stack(x_extr), torch.stack(y_extr)
