In [None]:
#import libraries

import os
import numpy as np
import torch
import glob
import torch.nn as nn
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
import pathlib
import torchvision

In [None]:
#check for device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
train_path = "./train/" 

In [None]:
train_transform = transforms.Compose([transforms.ToTensor()]) 

In [None]:
train_dataset = torchvision.datasets.ImageFolder(root=train_path, transform=train_transform)

In [None]:
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size =32, shuffle=False)

In [None]:
#calculate the mean and std to normalize the images

def mean_std(loader):
    mean = 0.
    std = 0.
    total_images = 0
    for images, _ in loader:
        image_count = images.size(0)
        images = images.view(image_count, images.size(1), -1)
        mean += images.mean(2).sum(0)
        std += images.std(2).sum(0)
        total_images += image_count
        
    mean /= total_images
    std  /= total_images
    
    return mean, std

In [None]:
mean_std(train_loader)

In [None]:
#Transforming the inputs

transformer = transforms.Compose([
    transforms.Resize((150,150)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.3230, 0.5492, 0.2603], [0.2481, 0.0883, 0.1172])
])

In [None]:
#DataLoaders for training
train_path = "./train/"  
test_path  = "./test/"   

train_loader = DataLoader(
                torchvision.datasets.ImageFolder(train_path, transform=transformer),
                batch_size=16, shuffle=True)

test_loader = DataLoader(
                torchvision.datasets.ImageFolder(test_path, transform=transformer),
                batch_size=16, shuffle=True)

In [None]:
#lables
root = pathlib.Path(train_path)
classes = sorted([j.name.split('/')[-1] for j in root.iterdir()])

print(classes)

In [None]:
#CNN Network
class ConvNet(nn.Module):
    def __init__(self, num_classes=3):
        
        
        super(ConvNet, self).__init__()
    
        #output size after convolution filter
        #((w-f+2P)/s) + 1
    
        #Input shape = (16 ,3,150,150)
    
        self.conv1=nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)
        #shape = (16, 12, 150, 150)
    
        self.bn1 = nn.BatchNorm2d(num_features=12) #num_features == out_channels
        #shape = (16, 12, 150, 150)
    
        self.relu1 = nn.ReLU()
        #shape = (16, 12, 150, 150)
    
        self.pool = nn.MaxPool2d(kernel_size=2)
        # Reduce the image size by factor 2
        
    
        ############################################
        #shape = (16, 12, 75, 75)
        self.conv2=nn.Conv2d(in_channels=12, out_channels=20, kernel_size=3, stride=1, padding=1)
        #shape = (16, 20, 75, 75)
    
        self.relu2 = nn.ReLU()
        #shape = (16, 20, 75, 75)
    
        #################################################
    
        self.conv3=nn.Conv2d(in_channels=20, out_channels=32, kernel_size=3, stride=1, padding=1)
        #shape = (16, 32, 75, 75)
    
        self.bn3 = nn.BatchNorm2d(num_features=32) #num_features == out_channels
        #shape = (16, 32, 75, 75)
    
        self.relu3 = nn.ReLU()
        #shape = (16, 32, 75, 75)
    
    
        self.fc = nn.Linear(in_features=32*75*75, out_features=3)
    
    
        #Forward Propagation
    def forward(self, input):
        
        output = self.conv1(input)
        output = self.bn1(output)
        output = self.relu1(output)
        
        output = self.pool(output)
        
        output = self.conv2(output)
        output = self.relu2(output)
        
        output = self.conv3(output)
        output = self.bn3(output)
        output = self.relu3(output)
        
        
        #Above output will be in matrix form,with shape (16,32,75,75)
        
        
        output = output.view(-1, 32*75*75)
        
        output = self.fc(output)
        
        return output   

In [None]:
model = ConvNet(num_classes=3).to(device)

In [None]:
#Optimizer and loss function
optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
loss_function = nn.CrossEntropyLoss()

In [None]:
num_epochs = 20

In [None]:
#calculate the size of training and testing images
train_count = len(glob.glob(train_path+"/**/*.png"))
test_count = len(glob.glob(test_path+"/**/*.png"))

In [None]:
#Model training and saving best model

best_accuracy = 0.0

for epoch in range(num_epochs):
    
    #Training of model on train dataset
    model.train()
    
    train_accuracy = 0.0
    train_loss= 0.0
    
    for i, (images, labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            images = Variable(images.cuda())
            labels = Variable(labels.cuda())
            
        optimizer.zero_grad()
        
        outputs = model(images)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.cpu().data*images.size(0)
        _, prediction = torch.max(outputs.data, 1)
        
        train_accuracy += int(torch.sum(prediction == labels.data))
        
    train_accuracy = train_accuracy/train_count
    train_loss = train_loss/train_count
    
    
    #Evalutation on testing dataset
    model.eval()
    
    test_accuracy = 0.0
    
    for i, (images, labels) in enumerate(test_loader):
        if torch.cuda.is_available():
            images = Variable(images.cuda())
            labels = Variable(labels.cuda())
            
        outputs = model(images)
        _, prediction = torch.max(outputs.data, 1)
        test_accuracy += int(torch.sum(prediction == labels.data))
        
    test_accuracy = test_accuracy/test_count
    
    
    print("Epoch: "  +  str(epoch) +   " loss: " + str(train_loss)          + "  Train Accuracy: "+ str(train_accuracy)  +  "  Test Accuracy: "+ str(test_accuracy))
    
    
    #save the best model
    if test_accuracy>best_accuracy:
        torch.save(model.state_dict(), "best_checkpoint.model")  
        best_accuracy = test_accuracy
    

In [None]:
#Prediction Phase

import numpy as np
from torchvision.models import squeezenet1_1
import torch.nn.functional as F
from io import open
from PIL import Image
import cv2
import matplotlib.pyplot as plt

In [None]:
pred_path  = "./pred/"   

In [None]:
trained_model = torch.load("./best_checkpoint.model")
model = ConvNet(num_classes=6)
model.load_state_dict(trained_model)
model.eval()

In [None]:
def prediction(img_path, transformer):
    
    image = Image.open(img_path)
    
    image_tensor = transformer(image).float()
    
    image_tensor = image_tensor.unsqueeze_(0)
    
    if torch.cuda.is_available():
        image_tensor.cuda()
        
    input = Variable(image_tensor)
    
    output = model(input)
    
    index = output.data.numpy().argmax()
    
    pred = classes[index]
    
    return pred

In [None]:
images_path = glob.glob(pred_path+"/*.png")

In [None]:
pred_dict = {}

for i in images_path:
    pred_dict[i[i.rfind("/")+1:]] = prediction(i, transformer)

In [None]:
pred_dict

In [None]:
#Adversarial learning Phase
epsilons = [0, .05, .1, .15, .2, .25, .3]
pretrained_model = "./best_checkpoint.model"
use_cuda=True

In [None]:
pred_loader = DataLoader(torchvision.datasets.ImageFolder(root="./adv_ex", transform=transformer),batch_size=1, shuffle=True)

device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")


#Initialize the network
model = ConvNet().to(device)

#Load the pretrained model
model.load_state_dict(torch.load(pretrained_model, map_location='cpu'))


#Set the model in evaluation mode. In this case this is for the Dropout layers
model.eval()    

In [None]:
def fgsm_attack(image, epsilon, data_grad):
    # Collect the element-wise sign of the data gradient
    sign_data_grad = data_grad.sign()
    
    # Create the perturbed image by adjusting each pixel of the input image
    perturbed_image = image + epsilon*sign_data_grad
    
    # Adding clipping to maintain [0,1] range
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    
    # Return the perturbed image
    return perturbed_image

In [None]:
def test(model, device, test_loader, epsilon ):

    # Accuracy counter
    correct = 0
    adv_examples = []

    # Loop over all examples in test set
    for data, target in test_loader:

        # Send the data and label to the device
        data, target = data.to(device), target.to(device)

        # Set requires_grad attribute of tensor. Important for Attack
        data.requires_grad = True

        # Forward pass the data through the model
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability

        # If the initial prediction is wrong, dont bother attacking, just move on
        if init_pred.item() != target.item():
            continue

        # Calculate the loss
        loss = F.nll_loss(output, target)

        # Zero all existing gradients
        model.zero_grad()

        # Calculate gradients of model in backward pass
        loss.backward()

        # Collect datagrad
        data_grad = data.grad.data

        # Call FGSM Attack
        perturbed_data = fgsm_attack(data, epsilon, data_grad)

        # Re-classify the perturbed image
        output = model(perturbed_data)

        # Check for success
        final_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        if final_pred.item() == target.item():
            correct += 1
            # Special case for saving 0 epsilon examples
            if (epsilon == 0) and (len(adv_examples) < 5):
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )
        else:
            # Save some adv examples for visualization later
            if len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )

    # Calculate final accuracy for this epsilon
    final_acc = correct/float(len(test_loader))
    print("Epsilon: {}\tTest Accuracy = {}/{} = {}".format(epsilon, correct, len(test_loader), final_acc))

    # Return the accuracy and an adversarial example
    return final_acc, adv_examples

In [None]:
accuracies = []
examples = []

# Run test for each epsilon
for eps in epsilons:
    acc, ex = test(model=model, device=device, test_loader=pred_loader, epsilon=eps)
    accuracies.append(acc)
    examples.append(ex)

In [None]:
# Plot several examples of adversarial samples at each epsilon
cnt = 0
plt.figure(figsize=(8,10))
for i in range(len(epsilons)):
    for j in range(len(examples[i])):
        cnt += 1
        plt.subplot(len(epsilons),len(examples[0]),cnt)
        plt.xticks([], [])
        plt.yticks([], [])
        if j == 0:
            plt.ylabel("Eps: {}".format(epsilons[i]), fontsize=14)
        orig,adv,ex = examples[i][j]
        plt.title("{} -> {}".format(orig, adv))
        plt.imshow(ex[0,:,:], cmap="gray")
plt.tight_layout()
plt.show()