In [15]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms,datasets
import torch
import torchvision
import torchvision.transforms as transforms

In [16]:
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x7f05ec414d50>

In [17]:
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])






transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [18]:

dataset = datasets.CIFAR10(root = './data', train=True, transform = transform_train, download=True)
train_set, val_set = torch.utils.data.random_split(dataset, [40000,10000])


testset = datasets.CIFAR10(root = './data', train=False, transform = transform_test, download=True)
train_loader = torch.utils.data.DataLoader(train_set,batch_size=100,shuffle=True)


val_loader = torch.utils.data.DataLoader(val_set,batch_size=100,shuffle=True)
test_loader = torch.utils.data.DataLoader(testset,batch_size=1,shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [19]:
print("Training data:",len(train_loader),"Validation data:",len(val_loader),"Test data: ",len(test_loader))

Training data: 400 Validation data: 100 Test data:  10000


In [20]:
use_cuda=True
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")

In [21]:
import torch.nn.functional as F
import torch.nn as nn

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        """
            input   - (3, 32, 32)
            block 1 - (32, 32, 32)
            maxpool - (32, 16, 16)
            block 2 - (64, 16, 16)
            maxpool - (64, 8, 8)
            block 3 - (128, 8, 8)
            maxpool - (128, 4, 4)
            block 4 - (128, 4, 4)
            avgpool - (128, 1, 1), reshpe to (128,)
            fc      - (128,) -> (10,)
        """
        # block 1
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)

        # block 2
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        # block 3
        self.conv5 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)

        # block 4
        self.conv7 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv8 = nn.Conv2d(256, 256, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(256)

        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(256, 10)

    def forward(self, x):

        # block 1
        x = F.relu(self.conv1(x))
        x = F.relu(self.bn1(self.conv2(x)))

        # maxpool
        x = F.max_pool2d(x, 2)

        # block 2
        x = F.relu(self.conv3(x))
        x = F.relu(self.bn2(self.conv4(x)))

        # maxpool
        x = F.max_pool2d(x, 2)

        # block 3
        x = F.relu(self.conv5(x))
        x = F.relu(self.bn3(self.conv6(x)))

        # maxpool
        x = F.max_pool2d(x, 2)

        # block 4
        x = F.relu(self.conv7(x))
        x = F.relu(self.bn4(self.conv8(x)))

        # avgpool and reshape to 1D
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)

        # fc
        x = self.fc(x)

        output = F.log_softmax(x, dim=1)
        return output

In [22]:
model = Net().to(device)
optimizer = optim.Adam(model.parameters(),lr=0.001, betas=(0.9, 0.999))
criterion = nn.NLLLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)

In [23]:
def fit(model,device,train_loader,val_loader,epochs):
  data_loader = {'train':train_loader,'val':val_loader}
  print("Fitting the model...")
  train_loss,val_loss=[],[]
  for epoch in range(epochs):

    loss_per_epoch,val_loss_per_epoch=0,0

    for phase in ('train','val'):
      for i,data in enumerate(data_loader[phase]):

        input,label  = data[0].to(device),data[1].to(device)

        output = model(input)
        #calculating loss on the output
        loss = criterion(output,label)

        if phase == 'train':
          optimizer.zero_grad()
          #grad calc w.r.t Loss func
          loss.backward()
          #update weights
          optimizer.step()
          loss_per_epoch+=loss.item()
        else:
          val_loss_per_epoch+=loss.item()

    scheduler.step(val_loss_per_epoch/len(val_loader))
    print("Epoch: {} Loss: {} Val_Loss: {}".format(epoch+1,loss_per_epoch/len(train_loader),val_loss_per_epoch/len(val_loader)))
    train_loss.append(loss_per_epoch/len(train_loader))
    val_loss.append(val_loss_per_epoch/len(val_loader))
  return train_loss,val_loss

In [None]:
loss,val_loss=fit(model,device,train_loader,val_loader,30)

Fitting the model...


In [None]:
fig = plt.figure(figsize=(5,5))
plt.plot(np.arange(1,31), loss, "*-",label="Loss")
plt.plot(np.arange(1,31), val_loss,"o-",label="Val Loss")
plt.xlabel("Num of epochs")
plt.legend()
plt.show()

In [None]:
def fgsm_attack(input,epsilon,data_grad):
  pert_out = input + epsilon*data_grad.sign()
  pert_out = torch.clamp(pert_out, 0, 1)
  return pert_out

def ifgsm_attack(input,epsilon,data_grad):
  iter = 10
  alpha = epsilon/iter
  pert_out = input
  for i in range(iter-1):
    pert_out = pert_out + alpha*data_grad.sign()
    pert_out = torch.clamp(pert_out, 0, 1)
    if torch.norm((pert_out-input),p=float('inf')) > epsilon:
      break
  return pert_out


In [None]:
model = Net()
saved = torch.load("/content/cifar10_basiccnn.pth (1).tar", map_location='cpu')
model.load_state_dict(saved['state_dict'])
model.eval()

In [None]:
def test(model,device,test_loader,epsilon,attack):
  correct = 0


  for data, target in test_loader:
      data, target = data.to(device), target.to(device)
      data.requires_grad = True
      output = model(data)

      init_pred = output.max(1, keepdim=True)[1]


      if init_pred.item() != target.item():
          continue

      loss = F.nll_loss(output, target)
      model.zero_grad()
      loss.backward()
      data_grad = data.grad.data

      if attack == "fgsm":
        perturbed_data = fgsm_attack(data,epsilon,data_grad)
      elif attack == "ifgsm":
        perturbed_data = ifgsm_attack(data,epsilon,data_grad)


      output = model(perturbed_data)
      final_pred = output.max(1, keepdim=True)[1]

      if final_pred.item() == target.item():
          correct += 1

  final_acc = correct/float(len(test_loader))
  print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(test_loader), final_acc))

  return final_acc


In [None]:
epsilons = [0,0.007,0.01,0.02,0.03,0.05,0.1,0.2,0.3]
for attack in ("fgsm","ifgsm"):


  print("Attack----------->",attack)
  accuracies = []


  for eps in epsilons:
      acc= test(model, device,test_loader,eps,attack)
      accuracies.append(acc)

  plt.figure(figsize=(5,5))
  plt.plot(epsilons, accuracies, "*-")
  plt.title(attack)
  plt.xlabel("Epsilon")
  plt.ylabel("Accuracy")
  plt.show()


Attack-----------> fgsm
Epsilon: 0	Test Accuracy = 3244 / 10000 = 0.3244


KeyboardInterrupt: ignored

In [None]:
def fgsm_attack(input,epsilon,data_grad):

  pert_out = input + epsilon*data_grad.sign()
  pert_out = torch.clamp(pert_out, 0, 1)

  return pert_out

def ifgsm_attack(input,epsilon,data_grad):
  iter = 10
  alpha = epsilon/iter
  pert_out = input
  for i in range(iter-1):
    pert_out = pert_out + alpha*data_grad.sign()
    pert_out = torch.clamp(pert_out, 0, 1)
    if torch.norm((pert_out-input),p=float('inf')) > epsilon:
      break
  return pert_out



In [None]:
def fit(model,device,optimizer,scheduler,criterion,train_loader,val_loader,Temp,epochs):


  data_loader = {'train':train_loader,'val':val_loader}
  print("Fitting the model...")

  train_loss,val_loss=[],[]

  for epoch in range(epochs):

    loss_per_epoch,val_loss_per_epoch=0,0
    for phase in ('train','val'):

      for i,data in enumerate(data_loader[phase]):

        input,label  = data[0].to(device),data[1].to(device)
        output = model(input)
        output = F.log_softmax(output/Temp,dim=1)
        #calculating loss on the output
        loss = criterion(output,label)
        if phase == 'train':
          optimizer.zero_grad()
          #grad calc w.r.t Loss func
          loss.backward()
          #update weights
          optimizer.step()
          loss_per_epoch+=loss.item()
        else:
          val_loss_per_epoch+=loss.item()
    scheduler.step(val_loss_per_epoch/len(val_loader))

    print("Epoch: {} Loss: {} Val_Loss: {}".format(epoch+1,loss_per_epoch/len(train_loader),val_loss_per_epoch/len(val_loader)))
    train_loss.append(loss_per_epoch/len(train_loader))
    val_loss.append(val_loss_per_epoch/len(val_loader))
  return train_loss,val_loss








def test(model,device,test_loader,epsilon,Temp,attack):
  correct=0
  adv_examples = []
  for data, target in test_loader:
    data, target = data.to(device), target.to(device)
    data.requires_grad = True
    output = model(data)
    output = F.log_softmax(output/Temp,dim=1)
    init_pred = output.max(1, keepdim=True)[1]

    if init_pred.item() != target.item():
        continue

    loss = F.nll_loss(output, target)
    model.zero_grad()
    loss.backward()
    data_grad = data.grad.data

    if attack == "fgsm":
      perturbed_data = fgsm_attack(data,epsilon,data_grad)
    elif attack == "ifgsm":
      perturbed_data = ifgsm_attack(data,epsilon,data_grad)


    output = model(perturbed_data)
    final_pred = output.max(1, keepdim=True)[1]
    if final_pred.item() == target.item():
        correct += 1


  final_acc = correct/float(len(test_loader))
  print("Epsilon: {}\tTest Accuracy = {} / {} = {}".format(epsilon, correct, len(test_loader), final_acc))

  return final_acc,adv_examples

In [None]:
def defense(device,train_loader,val_loader,test_loader,epochs,Temp,epsilons):

  modelF = Net().to(device)
  optimizerF = optim.Adam(modelF.parameters(),lr=0.0001, betas=(0.9, 0.999))
  schedulerF = optim.lr_scheduler.ReduceLROnPlateau(optimizerF, mode='min', factor=0.1, patience=3)

  modelF1 = Net().to(device)
  optimizerF1 = optim.Adam(modelF1.parameters(),lr=0.0001, betas=(0.9, 0.999))
  schedulerF1 = optim.lr_scheduler.ReduceLROnPlateau(optimizerF1, mode='min', factor=0.1, patience=3)

  criterion = nn.NLLLoss()

  lossF,val_lossF=fit(modelF,device,optimizerF,schedulerF,criterion,train_loader,val_loader,Temp,epochs)
  fig = plt.figure(figsize=(5,5))
  plt.plot(np.arange(1,epochs+1), lossF, "*-",label="Loss")
  plt.plot(np.arange(1,epochs+1), val_lossF,"o-",label="Val Loss")
  plt.title("Network F")
  plt.xlabel("Num of epochs")
  plt.legend()
  plt.show()

  #converting target labels to soft labels
  for data in train_loader:
    input, label  = data[0].to(device),data[1].to(device)
    softlabel  = F.log_softmax(modelF(input),dim=1)
    data[1] = softlabel

  lossF1,val_lossF1=fit(modelF1,device,optimizerF1,schedulerF1,criterion,train_loader,val_loader,Temp,epochs)
  fig = plt.figure(figsize=(5,5))
  plt.plot(np.arange(1,epochs+1), lossF1, "*-",label="Loss")
  plt.plot(np.arange(1,epochs+1), val_lossF1,"o-",label="Val Loss")
  plt.title("Network F'")
  plt.xlabel("Num of epochs")
  plt.legend()
  plt.show()

  model = Net().to(device)
  model.load_state_dict(modelF1.state_dict())

  for attack in ("fgsm","ifgsm"):
    accuracies = []
    for eps in epsilons:
        acc, ex = test(model,device,test_loader,eps,1,attack)
        accuracies.append(acc)

    plt.figure(figsize=(5,5))
    plt.plot(epsilons, accuracies, "*-")
    plt.title(attack)
    plt.xlabel("Epsilon")
    plt.ylabel("Accuracy")
    plt.show()


In [None]:
Temp=100
epochs=10
epsilons=[0,0.007,0.01,0.02,0.03,0.05,0.1,0.2,0.3]
defense(device,train_loader,val_loader,test_loader,epochs,Temp,epsilons)