In [76]:
import torch
import copy

print(torch.__version__)
print(torch.cuda.is_available())

2.3.1
False


In [77]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

tensor([1.], device='mps:0')


In [78]:
import torch.nn as nn
import gamspy as gp
from gamspy.math.matrix import dim

from torch.optim import SGD
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.transforms import ToTensor
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as ptl

In [79]:
train_data = torchvision.datasets.MNIST('mnist_data', train=True, download=True, transform= ToTensor() )
test_data = torchvision.datasets.MNIST('mnist_data', train=False, download=True, transform= ToTensor() ) 

In [80]:
loaders = {
    'train' : DataLoader(train_data, batch_size=100, shuffle=True, num_workers=1),
    'test' : DataLoader(test_data, batch_size=100, shuffle=True, num_workers=1)  
}

In [81]:
loaders['train']

<torch.utils.data.dataloader.DataLoader at 0x165360e20>

In [82]:
class CNN(nn.Module):

    def __init__(self):
        super(CNN,self).__init__()

        self.conv1 = nn.Conv2d(1, 10, 3, padding=1) 
        self.fc1 = nn.Linear(10 * 28 * 28, 10)  

    
    def forward(self, x):
        x = F.relu(self.conv1(x))  # Convolutional layer with ReLU activation
        x = x.reshape(-1, 10 * 28 * 28)  # Flatten the tensor
        x = self.fc1(x)  
        return x

In [83]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = CNN().to(device)

In [84]:
optimizer = optim.Adam(model.parameters(), lr=1e-3)

loss_fn = nn.CrossEntropyLoss()

def train(epoch):
    model.train()
    for batch_index, (data,target) in enumerate(loaders['train']):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
        if batch_index % 20 == 0:
            print(f'Train Epoch: (epoch) [{batch_index * len(data)}/{len(loaders["train"].dataset)} ({100. * batch_index / len(loaders["train"]):.0f}%)]\t{loss.item():.6f}')

In [85]:
def test():
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in loaders['test']: 
            data, target = data.to(device), target.to(device) 
            output = model(data)
            test_loss += loss_fn(output, target).item()
            pred = output.argmax(dim=1, keepdim = True)
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(loaders['test'].dataset)

    print(f"\nTest set: Average loss: {test_loss : .4f}, Accuracy {correct}/ {len(loaders['test'].dataset)} ({100. * correct / len(loaders['test'].dataset):.0f}%\)")

In [86]:
for epoch in range(10):
    train(epoch)
    test()


Test set: Average loss:  0.0013, Accuracy 9662/ 10000 (97%\)

Test set: Average loss:  0.0009, Accuracy 9718/ 10000 (97%\)

Test set: Average loss:  0.0008, Accuracy 9774/ 10000 (98%\)

Test set: Average loss:  0.0007, Accuracy 9780/ 10000 (98%\)

Test set: Average loss:  0.0007, Accuracy 9786/ 10000 (98%\)

Test set: Average loss:  0.0007, Accuracy 9784/ 10000 (98%\)

Test set: Average loss:  0.0006, Accuracy 9792/ 10000 (98%\)

Test set: Average loss:  0.0007, Accuracy 9792/ 10000 (98%\)

Test set: Average loss:  0.0006, Accuracy 9812/ 10000 (98%\)

Test set: Average loss:  0.0007, Accuracy 9808/ 10000 (98%\)


In [87]:
import torch
import torchvision.transforms as transforms
from torchvision.datasets import MNIST
from torch.utils.data import DataLoader

# Load the training data
transform = transforms.Compose([transforms.ToTensor()])
train_data = MNIST(root='mnist_data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

# Calculate mean and std
mean = 0.0
std = 0.0
for images, _ in train_loader:
    batch_samples = images.size(0)  # Batch size (the last batch can have smaller size)
    images = images.view(batch_samples, images.size(1), -1)
    mean += images.mean(2).sum(0)
    std += images.std(2).sum(0)

mean /= len(train_loader.dataset)
std /= len(train_loader.dataset)

mean = mean.numpy()
std = std.numpy()

# Define mean and std values (assuming they have been calculated earlier)
mean = np.array([0.1307], dtype=np.float32)  # Replace with actual mean value
std = np.array([0.3081], dtype=np.float32)   # Replace with actual std value

# Convert mean and std to GAMSPy parameters
mean_param = gp.Parameter(m, name="mean", domain=dim(mean.shape), records=mean)
std_param = gp.Parameter(m, name="std", domain=dim(std.shape), records=std)

In [88]:
from gamspy.math.matrix import dim

mean = (0.1307,)
std = (0.3081,)

# Get a single batch of data
for data, target in loaders['test']:
    data, target = data.to(device), target.to(device)
    break

batch = data.shape[0]

# Reshape the input so it matches our declaration in GAMSPy
data = data.reshape(batch, -1).T

# Reshape the target, labels, so that we can provide them to GAMSPy
target_df = pd.DataFrame(target.cpu())
target_df["val"] = 1
target_df = target_df.pivot(columns=[0], values="val").fillna(0).astype(bool)

# Create a container
m = gp.Container()

# Set epsilon as you wish, higher it is, harder to solve
diff_eps = 0.01

# Extract weights from CNN model
w_conv1_data = model.conv1.weight.cpu().detach().numpy().reshape(10, -1).T  # Flatten the conv layer weights
w_fc1_data = model.fc1.weight.cpu().detach().numpy().T
init_data = data.cpu().detach().numpy()

# Define GAMSPy parameters
w_conv1 = gp.Parameter(m, name="w_conv1", domain=dim(w_conv1_data.shape), records=w_conv1_data)
w_fc1 = gp.Parameter(m, name="w_fc1", domain=dim(w_fc1_data.shape), records=w_fc1_data)
init = gp.Parameter(m, name="inp", domain=dim(init_data.shape), records=init_data)

# Variables
xn = gp.Variable(m, name="xn", domain=dim((784, batch)))
x1 = gp.Variable(m, name="x1", domain=dim((784, batch)))
x2 = gp.Variable(m, name="x2", domain=dim((10 * 28 * 28, batch)))
x3 = gp.Variable(m, name="x3", domain=dim((10, batch)))
a2 = gp.Variable(m, name="a2", domain=dim((10 * 28 * 28, batch)))
a3 = gp.Variable(m, name="a3", domain=dim((10, batch)))

sample_domain = xn.domain[1]
digits_domain = a3.domain[0]

target_set = gp.Set(m, name="targets", domain=[sample_domain, digits_domain], records=target_df, uels_on_axes=True)

# Assume we will get non-normalized input
# This step is important because when we trained our neural network we normalized
# with a mean and standard deviation, and here we need to do the same
normalize_input = gp.Equation(m, name="transform_input", domain=x1.domain)

# Input to the neural network is noise + input image normalized
normalize_input[...] = x1[...] == (xn[...] + init[...] - mean[0]) / std[0]

# Noise has some limits since neural network was trained with the assumption
# that the values are between 0-1 for the input
xn.lo[...] = - init[...]
xn.up[...] = - init[...] + 1 


# Define convolution manually
def manual_conv2d(x, w, b, output_shape):
    out = np.zeros(output_shape)
    batch_size, num_filters, height, width = output_shape
    num_channels, kernel_height, kernel_width = w.shape[1:]
    
    for b_idx in range(batch_size):
        for f_idx in range(num_filters):
            for h in range(height):
                for w in range(width):
                    conv_sum = 0
                    for c in range(num_channels):
                        for kh in range(kernel_height):
                            for kw in range(kernel_width):
                                h_in = h + kh - kernel_height // 2
                                w_in = w + kw - kernel_width // 2
                                if 0 <= h_in < height and 0 <= w_in < width:
                                    conv_sum += x[c, h_in, w_in, b_idx] * w[f_idx, c, kh, kw]
                    out[b_idx, f_idx, h, w] = conv_sum + b[f_idx]
    return out

# Apply convolution manually
conv_out_shape = (batch, 10, 28, 28)
conv_out_data = manual_conv2d(x1.cpu().detach().numpy().reshape((1, 28, 28, batch)), 
                              w_conv1.cpu().detach().numpy().reshape((10, 1, 3, 3)), 
                              b_conv1.cpu().detach().numpy(), conv_out_shape)
conv_out = gp.Parameter(m, name="conv_out", domain=dim(conv_out_shape), records=conv_out_data.reshape(-1))

# ReLU activation function
calc_activation = gp.Equation(m, name="calc_activation", domain=x2.domain)
calc_activation[...] = a2[...] == gp.math.relu(conv_out.reshape(10 * 28 * 28, batch))

# Fully connected layer operation
calc_fc1 = gp.Equation(m, name="calc_fc1", domain=[w_fc1.domain[1], x2.domain[1]])
calc_fc1[...] = a3[...] == w_fc1.T @ a2[...] + b_fc1

# Objective and constraints
obj = gp.Variable(m, name="obj", domain=[sample_domain])
eq_so_far = m.getEquations()

results = []
result_z = []
result_a = []

# For every sample we need to solve another optimization problem
# to find the minimal vector that changes the label
for s in range(batch):
    sample_target = int(target[s])
    print(f"sample {s + 1}/{batch}")

    # Ensure the correct label gets less probability than the incorrect labels
    make_noise = gp.Equation(m, name=f"false_label_{s}", domain=[digits_domain])
    make_noise[...] = a3[:, s] >= a3[sample_target, s] + diff_eps
    
    z = gp.Variable(m, name="z")
    specific_equations = [make_noise]

    # Pick which norm you would like to use
    norm = "l2"
    if norm == "l2":
        noise_magnitude = gp.Equation(m, name=f"noise_magnitude_{s}")
        noise_magnitude[...] = z == gp.math.vector_norm(xn[:, s]) ** 2
        specific_equations.append(noise_magnitude)
    elif norm == "linf":
        noise_magnitude_1 = gp.Equation(m, name=f"noise_magnitude_1_{s}", domain=xn.domain)
        noise_magnitude_2 = gp.Equation(m, name=f"noise_magnitude_2_{s}", domain=xn.domain)
        noise_magnitude_1[...] = z >= xn[:, s]
        noise_magnitude_2[...] = z >= -xn[:, s]
        specific_equations.append(noise_magnitude_1)
        specific_equations.append(noise_magnitude_2)
    
    model_noise = gp.Model(
        m,
        name="noise",
        equations=[*eq_so_far, *specific_equations],
        problem="NLP",
        sense="min",
        objective=z,
    )

    # Solve the optimization problem
    model_noise.solve(solver='CONOPT3') 
    res = xn.records.copy()

    noise = np.array(res[res[f"DenseDim{batch}_1"] == str(s)].level).reshape(28, 28)
    output = a3.records.copy()
    
    output = np.array(output[output[f"DenseDim{batch}_1"] == str(s)].level)
    result_a.append(output)
    results.append(noise)
    result_z.append(z.records.copy().level[0])

print("Adversarial examples generated.")