In [None]:
# Mount Google dirve
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# Define Project Folder
FOLDERNAME = 'Colab\ Notebooks'

%cd drive/MyDrive/$FOLDERNAME

Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks


In [None]:
# Define device
import torch
if torch.cuda.is_available():
  device = torch.device('cuda')
else:
  device = torch.device('cpu')
print('Device:', device)

Device: cuda


In [None]:
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

# download MNIST dataset --> (1, 28*28)
# create mini batch meanwhile
mnist_transform = transforms.Compose([transforms.ToTensor()])
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./r_mnist_test', train=False, download=True, transform=mnist_transform),
        batch_size=10, shuffle=True)
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./r_mnist_train', train=True, download=True, transform=mnist_transform),
        batch_size=10, shuffle=True)

# hyperparameter
batch_size = 10
epoch = 1

In [None]:
# LeNet Model definition
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28*28, 300)
        self.fc2 = nn.Linear(300, 100)
        self.fc3 = nn.Linear(100, 10)
    def forward(self, x):
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [None]:
# training model
def train(model,optimizer):
  for i in range(epoch):
    for j,(data,target) in tqdm(enumerate(train_loader)):
      model.train()
      data = data.to(device)
      target = target.to(device)
      # print(data.shape)
      # print(target.shape)
      logit = model(data)
      loss = F.cross_entropy(logit,target)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      if j % 1000 == 0:
        print ('第{}筆資料，loss值等於{}'.format(j,loss))

# validation process
def test(model,name):
  model.eval()
  correct_num = torch.tensor(0).to(device)
  with torch.no_grad():
    for j,(data,target) in tqdm(enumerate(test_loader)):
      data = data.to(device)
      target = target.to(device)
      logit = model(data)
      pred = logit.max(1)[1]
      # acc = pred.eq(target).sum().item() / 10000
      num = torch.sum(pred==target)
      correct_num = correct_num + num
    print (correct_num)
    # print ('\n{} correct rate is {}'.format(name, acc))
    print ('\n{} correct rate is {}'.format(name, correct_num / 10000))

In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F
import torchvision
from torchvision import transforms
import matplotlib.pyplot as plt

# calculate jacobian matrix for forward deviation
def compute_jacobian(model, input):
    var_input = input.clone()

    var_input.detach_()
    var_input.requires_grad = True
    output = model(var_input)

    num_features = int(np.prod(var_input.shape[1:]))
    jacobian = torch.zeros([output.size()[1], num_features])
    for i in range(output.size()[1]):
        # zero_gradients(input)
        if var_input.grad is not None:
          var_input.grad.zero_()
        # output.backward(mask,retain_graph=True)
        output[0][i].backward(retain_graph=True)
        # copy the derivative to the target place
        jacobian[i] = var_input.grad.squeeze().view(-1, num_features).clone()

    return jacobian.to(device)

In [None]:
# Saliency map calcuation

def saliency_map(jacobian, target_index, increasing, search_space, nb_features):
    domain = torch.eq(search_space, 1).float()  # The search domain
    # the sum of all features' derivative with respect to each class
    all_sum = torch.sum(jacobian, dim=0, keepdim=True)
    target_grad = jacobian[target_index]  # The forward derivative of the target class
    others_grad = all_sum - target_grad  # The sum of forward derivative of other classes

    # this list blanks out those that are not in the search domain
    if increasing:
        increase_coef = 2 * (torch.eq(domain, 0)).float().to(device)
    else:
        increase_coef = -1 * 2 * (torch.eq(domain, 0)).float().to(device)
    increase_coef = increase_coef.view(-1, nb_features)

    # calculate sum of target forward derivative of any 2 features.
    target_tmp = target_grad.clone()
    target_tmp -= increase_coef * torch.max(torch.abs(target_grad))
    alpha = target_tmp.view(-1, 1, nb_features) + target_tmp.view(-1, nb_features, 1)  # PyTorch will automatically extend the dimensions

    # calculate sum of other forward derivative of any 2 features.
    others_tmp = others_grad.clone()
    others_tmp += increase_coef * torch.max(torch.abs(others_grad))
    beta = others_tmp.view(-1, 1, nb_features) + others_tmp.view(-1, nb_features, 1)

    # zero out the situation where a feature sums with itself
    tmp = np.ones((nb_features, nb_features), int)
    np.fill_diagonal(tmp, 0)
    zero_diagonal = torch.from_numpy(tmp).byte().to(device)

    # According to the definition of saliency map in the paper (formulas 8 and 9),
    # those elements in the saliency map that doesn't satisfy the requirement will be blanked out.
    if increasing:
        mask1 = torch.gt(alpha, 0.0)
        mask2 = torch.lt(beta, 0.0)
    else:
        mask1 = torch.lt(alpha, 0.0)
        mask2 = torch.gt(beta, 0.0)
    # apply the mask to the saliency map
    mask = torch.mul(torch.mul(mask1, mask2), zero_diagonal.view_as(mask1))
    # do the multiplication according to formula 10 in the paper
    saliency_map = torch.mul(torch.mul(alpha, torch.abs(beta)), mask.float())
    # get the most significant two pixels
    max_value, max_idx = torch.max(saliency_map.view(-1, nb_features * nb_features), dim=1)
    p = max_idx // nb_features
    q = max_idx % nb_features
    return p, q, saliency_map

In [None]:
def perturbation_single(image, ys_target, theta, gamma, model):

    copy_sample = np.copy(image)
    var_sample =Variable(torch.from_numpy(copy_sample), requires_grad=True).to(device)

    # outputs = model(var_sample)
    # predicted = torch.max(outputs.data, 1)[1]
    # print('测试样本扰动前的预测值：{}'.format(predicted[0]))

    var_target = Variable(torch.LongTensor([ys_target,])).to(device)

    if theta > 0:
        increasing = True
    else:
        increasing = False

    num_features = int(np.prod(copy_sample.shape[1:]))
    shape = var_sample.size()

    # perturb two pixels in one iteration, thus max_iters is divided by 2.0
    max_iters = int(np.ceil(num_features * gamma / 2.0))

    # masked search domain, if the pixel has already reached the top or bottom, we don't bother to modify it.
    if increasing:
        search_domain = torch.lt(var_sample, 0.99) #逐一元素比較var_sample和0.99
    else:
        search_domain = torch.gt(var_sample, 0.01)
    search_domain = search_domain.view(num_features)

    model.eval().to(device)
    output = model(var_sample)
    current = torch.max(output.data, 1)[1].cpu().numpy()
    saliency = torch.zeros((1, 784, 784)).to(device)

    iter = 0
    while (iter < max_iters) and (current[0] != ys_target) and (search_domain.sum() != 0):
        # calculate Jacobian matrix of forward derivative
        jacobian = compute_jacobian(model, var_sample)
        # get the saliency map and calculate the two pixels that have the greatest influence
        p1, p2, s = saliency_map(jacobian, var_target, increasing, search_domain, num_features)
        saliency += s
        # apply modifications
        var_sample_flatten = var_sample.view(-1, num_features).clone().detach_()
        var_sample_flatten[0, p1] += theta
        var_sample_flatten[0, p2] += theta

        new_sample = torch.clamp(var_sample_flatten, min=0.0, max=1.0)
        new_sample = new_sample.view(shape)
        search_domain[p1] = 0
        search_domain[p2] = 0
        var_sample = Variable(torch.tensor(new_sample), requires_grad=True ).to(device)

        output = model(var_sample)
        current = torch.max(output.data, 1)[1].cpu().numpy()
        iter += 1

    adv_samples = var_sample.data.cpu().numpy()
    return adv_samples, saliency

In [None]:
# adversarial examples
model_adv_filter = Net().to(device)

In [None]:
# optimizer
optimizer2 = torch.optim.Adam(model_adv_filter.parameters())

In [None]:
train(model_adv_filter,optimizer2)
test(model_adv_filter, 'adv_classifer')

36it [00:03, 14.99it/s]

第0筆資料，loss值等於2.299304723739624


1045it [00:06, 336.87it/s]

第1000筆資料，loss值等於0.0342840813100338


2044it [00:09, 345.50it/s]

第2000筆資料，loss值等於0.021585002541542053


3045it [00:12, 267.22it/s]

第3000筆資料，loss值等於0.27309170365333557


4065it [00:16, 318.20it/s]

第4000筆資料，loss值等於0.012569770216941833


5052it [00:19, 346.48it/s]

第5000筆資料，loss值等於0.16115450859069824


6000it [00:21, 273.28it/s]
1000it [00:01, 550.93it/s]

tensor(9672, device='cuda:0')

adv_classifer correct rate is 0.967199981212616





In [None]:
# randomly choose 5000 clean data and gen 5000 adversarial examples from train_loader
# hyperparameter setting
adver_nums_filter = 5000
y_adv_target = 8
theta = 1.0
gamma = 0.1

adver_example_by_JSMA_f = torch.zeros((batch_size,1,28,28)).to(device)
adver_target_f = torch.zeros(5000).to(device)
clean_example_f = torch.zeros((batch_size,1,28,28)).to(device)
clean_target_f = torch.ones(5000).to(device)

for i,(data,target) in enumerate(train_loader):
  if i >= adver_nums_filter/batch_size :
    break
  if i == 0:
    clean_example_f = data
  else:
    clean_example_f = torch.cat((clean_example_f,data),dim = 0)

  cur_adver_example_by_JSMA_f = torch.zeros_like(data).to(device)

  for j in range(batch_size):

    pert_image_f, _ = perturbation_single(data[j].resize_(1,28*28).numpy(),y_adv_target,theta,gamma,model_adv_filter)
    cur_adver_example_by_JSMA_f[j] = torch.from_numpy(pert_image_f).resize_(1, 28, 28).to(device)

  #
  if i == 0:
    adver_example_by_JSMA_f = cur_adver_example_by_JSMA_f
  else:
    adver_example_by_JSMA_f = torch.cat((adver_example_by_JSMA_f , cur_adver_example_by_JSMA_f), dim = 0)

print (adver_example_by_JSMA_f.shape)
# print (adver_target)
print (clean_example_f.shape)
# print (clean_target)


  var_sample = Variable(torch.tensor(new_sample), requires_grad=True ).to(device)


torch.Size([5000, 1, 28, 28])
torch.Size([5000, 1, 28, 28])


In [None]:
# randomly choose 1000 clean data and gen 1000 adversarial examples from test_loader
# hyperparameter setting
adver_nums_filter_t = 1000
y_adv_target = 8
theta = 1.0
gamma = 0.1

adver_example_by_JSMA_t = torch.zeros((batch_size,1,28,28)).to(device)
adver_target_t = torch.zeros(1000) .to(device)
clean_example_t = torch.zeros((batch_size,1,28,28)).to(device)
clean_target_t = torch.ones(1000).to(device)

for i,(data,target) in enumerate(test_loader):
  if i >= adver_nums_filter_t/batch_size :
    break
  if i == 0:
    clean_example_t = data
  else:
    clean_example_t = torch.cat((clean_example_t,data),dim = 0)

  cur_adver_example_by_JSMA_t = torch.zeros_like(data).to(device)

  for j in range(batch_size):

    pert_image_t, _ = perturbation_single(data[j].resize_(1,28*28).numpy(),y_adv_target,theta,gamma,model_adv_filter)
    cur_adver_example_by_JSMA_t[j] = torch.from_numpy(pert_image_t).resize_(1, 28, 28).to(device)

  #
  if i == 0:
    adver_example_by_JSMA_t = cur_adver_example_by_JSMA_t
  else:
    adver_example_by_JSMA_t = torch.cat((adver_example_by_JSMA_t , cur_adver_example_by_JSMA_t), dim = 0)

print(adver_example_by_JSMA_t.shape)
print(clean_example_t.shape)

  var_sample = Variable(torch.tensor(new_sample), requires_grad=True ).to(device)


torch.Size([1000, 1, 28, 28])
torch.Size([1000, 1, 28, 28])


In [None]:
# shuffle data
# train data
import random
new_train_data = torch.cat((clean_example_f.to(device), adver_example_by_JSMA_f), dim=0)
new_target_data = torch.cat((clean_target_f, adver_target_f), dim=0)
batch_s = np.random.choice(len(clean_example_f) + len(adver_example_by_JSMA_f), len(clean_example_f) + len(adver_example_by_JSMA_f))
new_train_data = new_train_data[batch_s]
new_target_data = new_target_data[batch_s]

print(new_train_data.shape)
print(new_target_data.shape)
print(new_target_data[0].dtype)

# test data
new_val_data = torch.cat((clean_example_t.to(device), adver_example_by_JSMA_t), dim=0)
new_val_target_data = torch.cat((clean_target_t, adver_target_t), dim=0)
batch_sv = np.random.choice(len(clean_example_t) + len(clean_example_t), len(clean_example_t) + len(clean_example_t))
new_val_data = new_train_data[batch_sv]
new_val_target_data = new_target_data[batch_sv]
print(new_val_data.shape)
print(new_val_target_data.shape)

torch.Size([10000, 1, 28, 28])
torch.Size([10000])
torch.float32
torch.Size([2000, 1, 28, 28])
torch.Size([2000])


In [None]:
# other model classifier

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(2)
        self.fc1 = nn.Linear(256, 120)
        self.relu3 = nn.ReLU()
        self.fc2 = nn.Linear(120, 84)
        self.relu4 = nn.ReLU()
        self.fc3 = nn.Linear(84, 1)
        self.relu5 = nn.ReLU()

    def forward(self, x):
        y = self.conv1(x)
        y = self.relu1(y)
        y = self.pool1(y)
        y = self.conv2(y)
        y = self.relu2(y)
        y = self.pool2(y)
        y = torch.flatten(y, 1)
        y = self.fc1(y)
        y = self.relu3(y)
        y = self.fc2(y)
        y = self.relu4(y)
        y = self.fc3(y)
        y = self.relu5(y)
        return y

In [None]:
# adver filter
model_adv = Model().to(device)

In [None]:
# adver filter model training
loss_fn = nn.BCELoss()
epoch_c = 100
def train_c(model,optimizer):
  for i in range(epoch_c):
    batch = np.random.choice(len(new_train_data), 200)
    for j, data in enumerate(new_train_data[batch]):
      model.train()
      data = data.unsqueeze(0).to(device)
      target = new_target_data[j].unsqueeze(0).unsqueeze(1)
      # print(data.shape)
      # print(target.shape)
      # print(data)
      # print(target)
      logit = model(data)
      loss = loss_fn(logit,target)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
    if i % 10 == 0:
      print ('第{}個epoch，loss值等於{}'.format(i,loss))

In [None]:
train_c(model_adv, optimizer2)

第0個epoch，loss值等於2.844424247741699
第10個epoch，loss值等於2.8650243282318115
第20個epoch，loss值等於3.115415573120117
第30個epoch，loss值等於2.9540939331054688
第40個epoch，loss值等於2.975090265274048
第50個epoch，loss值等於2.8946244716644287
第60個epoch，loss值等於2.9866344928741455
第70個epoch，loss值等於2.931440830230713
第80個epoch，loss值等於3.0564801692962646
第90個epoch，loss值等於2.8399658203125


In [None]:
# validation of new lenet5 adver filter model

def test_c(model,name):
  model.eval()
  correct_num = torch.tensor(0).to(device)
  with torch.no_grad():
    for j, data in tqdm(enumerate(new_val_data)):
      data = data.unsqueeze(0).to(device)
      target = new_val_target_data[j].unsqueeze(0).unsqueeze(1)
      logit = model(data)
      # print(logit)
      pred = 1 if logit > 0.5 else 0
      # pred = logit.max(1)[1].item()
      # acc = logit.eq(target).sum().item() / 2000
      # num = torch.sum(pred==target)
      num = torch.sum(pred == target)
      correct_num = correct_num + num
    # print (correct_num)
    # print ('\n{} correct rate is {}'.format(name, acc))
    print ('\n{} correct rate is {}'.format(name, correct_num / 2000))

In [None]:
test_c(model_adv, "model_res accuacy")

2000it [00:01, 1634.38it/s]



model_res accuacy correct rate is 0.5215000510215759
