In [85]:
# All Import Statements Defined Here
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import time
import os

We conventionally train a LeNet-5 classifier on MNIST.

In [86]:
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv = nn.Sequential(
            # nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, padding=2), # 28*28*6
            nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5), # 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 14*14*6
            # nn.Conv2d(in_channels=6, out_channels=16, kernel_size=6), # 8*8*16
            nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2), # 4*4*16
        )
        self.fc = nn.Sequential(
            nn.Linear(in_features=16*4*4, out_features=120),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=120, out_features=84),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=84, out_features=10),
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.shape[0], -1)
        out = self.fc(x)
        return out

In [87]:
args = {
    'epoch': 10,
    'batch_size': 64,
    'learning_rate': 5e-4,
    'path': r'D:\EricYANG\HKUST\22Spring\comp5214\project', # chage to your local path
    'seed': 2022,
    'perturb_norm': 1.0,
}
# dataset
train_dataset = datasets.MNIST(root=args['path'], train=True, transform=transforms.ToTensor(), download=False)
test_dataset = datasets.MNIST(root=args['path'], train=False, transform=transforms.ToTensor(), download=False)
# DataLoader, shuffle=True
train_loader = DataLoader(train_dataset, batch_size=args['batch_size'], num_workers=0, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=args['batch_size'], num_workers=0, shuffle=True)
# put model to GPU
model = LeNet().type(torch.FloatTensor)
model.to(torch.device("cuda:0"))
# fix random seed
np.random.seed(args['seed'])
torch.manual_seed(args['seed'])

<torch._C.Generator at 0x203bc0876f0>

In [88]:
def train(epch, model, train_loader, test_loader):
    optimizer = optim.RMSprop(model.parameters(), lr=args['learning_rate'], weight_decay=0.9)
    BCELoss = nn.CrossEntropyLoss().type(torch.FloatTensor)
    train_loss = 0.0
    for data, target in train_loader:
        # put to GPU
        data = data.to(torch.device("cuda:0"))
        target = target.to(torch.device("cuda:0"))
        optimizer.zero_grad
        output = model(data)
        loss = BCELoss(output, target)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()*data.size(0)
    
    train_loss = train_loss / len(train_loader.dataset)
    print('Epoch: {} \t Training Loss: {:.6f}'.format(epch + 1, train_loss))
    acc = test(epch, model, test_loader)
    return acc


def test(epch, model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for img, label in test_loader:
            # put to GPU
            img = img.to(torch.device("cuda:0"))
            label = label.to(torch.device("cuda:0"))
            output = model(img)
            _, predict = torch.max(output.data, dim=1)
            total += label.size(0)
            correct += (predict == label).sum().item()
    acc = 100.0 * correct / total
    print('Acc of epoch %d on testing: %5f %%'% (epch + 1, acc))
    return acc

In [89]:
# Training Loop
for epch in range(args['epoch']):
    acc = train(epch, model, train_loader, test_loader)

Epoch: 1 	 Training Loss: 1.208358
Acc of epoch 1 on testing: 96.340000 %
Epoch: 2 	 Training Loss: 0.117635
Acc of epoch 2 on testing: 97.240000 %
Epoch: 3 	 Training Loss: 0.095359
Acc of epoch 3 on testing: 97.130000 %
Epoch: 4 	 Training Loss: 0.092998
Acc of epoch 4 on testing: 97.510000 %
Epoch: 5 	 Training Loss: 0.087707
Acc of epoch 5 on testing: 97.740000 %
Epoch: 6 	 Training Loss: 0.078226
Acc of epoch 6 on testing: 97.630000 %
Epoch: 7 	 Training Loss: 0.067688
Acc of epoch 7 on testing: 97.870000 %
Epoch: 8 	 Training Loss: 0.063805
Acc of epoch 8 on testing: 98.310000 %
Epoch: 9 	 Training Loss: 0.064193
Acc of epoch 9 on testing: 98.080000 %
Epoch: 10 	 Training Loss: 0.057940
Acc of epoch 10 on testing: 98.350000 %


We add randomized Fourier-domain noise on a given image.

In [98]:
def get_symmetric_pos(dims, pos):
  # compute symmtric position of point in 2D FFT
  x = np.array(dims)
  p = np.array(pos)
  return np.where(np.mod(x, 2) == 0, np.mod(x-p, x), x-1-p)


def get_fourier_basis(x, y):
  # compute real-valued basis vectors of 2D discrete Fourier transform
  marker = np.zeros([x, y], dtype=np.uint8)
  fourier_basis = np.zeros([x, y, x, y], dtype=np.float32)
  for i in range(x):
    for j in range(y):
      if marker[i, j] > 0:
        continue
      freq = np.zeros([x, y], dtype=np.complex64)
      sym = get_symmetric_pos((x, y), (i, j))
      sym_i = sym[0]
      sym_j = sym[1]
      if (sym_i, sym_j) == (i, j):
        freq[i, j] = 1.0
        marker[i, j] = 1
      else:
        freq[i, j] = 0.5 + 0.5j
        freq[sym_i, sym_j] = 0.5 - 0.5j
        marker[i, j] = 1
        marker[sym_i, sym_j] = 1
      basis = np.fft.ifft2(np.fft.ifftshift(freq))
      basis = np.sqrt(x * y) * np.real(basis)
      fourier_basis[i, j, :, :] = basis
      if (sym_i, sym_j) != (i, j):
        fourier_basis[sym_i, sym_j, :, :] = basis
  return fourier_basis


def generate_perturbed_img(img, perturb_basis, perturb_norm=1.0):
  # generate perturbed imgs given a perturbation basis
  if len(img.shape) != 3:
    raise ValueError('Incorrect input image shape')
  clean_img = np.expand_dims(img, axis=3)
  batch_size = clean_img.shape[0]
  num_channnels = clean_img.shape[3] # MNIST num channels = 1
  clean_img_t = np.transpose(clean_img, (0, 3, 1, 2))
  perturb_img_t = clean_img_t + perturb_norm * perturb_basis
  perturb_img = np.transpose(perturb_img_t, (0, 2, 3, 1))
  return np.squeeze(perturb_img, axis=3)


def generate_heat_map(network, img, label, pertub_norm=1.0):
  # 1. input one image to the network, record tensor value
  # 2. image add Fourier-basis perturbations, record tensor value
  # 3. compare difference
  if len(img.shape) != 3:
    raise ValueError('Incorrect input image shape')
  num_channnels = img.shape[0] # MNIST num channels = 1
  height = img.shape[1]
  width = img.shape[2]
  basis = get_fourier_basis(height, width)

  # evaluate over all frequency basis
  for i in range(height):
    for j in range(width):
      perturb_img = generate_perturbed_img(img, basis[i, j, Ellipsis], pertub_norm)
      tensor_pb = torch.from_numpy(perturb_img).to(torch.device("cuda:0")).unsqueeze(dim=0)
      label = label.to(torch.device("cuda:0"))
      print(type(tensor_pb), tensor_pb.shape, type(label), label)
      output = network(tensor_pb)
      _, predict = torch.max(output.data, dim=1)
      print(predict)
    #   break
    # break
  

In [99]:
fourier_loader = DataLoader(test_dataset, batch_size=1, num_workers=0, shuffle=True)
sample_img, sample_label = None, None

# select random img from test dataset for fourier pertubation
for imgs, label in fourier_loader:
    sample_img = imgs[0].numpy()
    sample_label = label[0]
    break

print(type(sample_img), sample_img.shape, type(sample_label), sample_label)
generate_heat_map(model, sample_img, sample_label, args['perturb_norm'])

<class 'numpy.ndarray'> (1, 28, 28) <class 'torch.Tensor'> tensor(6)
<class 'torch.Tensor'> torch.Size([1, 1, 28, 28]) <class 'torch.Tensor'> tensor(6, device='cuda:0')
tensor([6], device='cuda:0')
