In [70]:
import torch
from torchvision import datasets, transforms
from torch.autograd import Variable
from torch import nn
import numpy as np
import torch.nn.functional as F
from torch.utils.data import Dataset

from torchvision.models import resnet50, ResNet50_Weights
from torch.utils.data.sampler import  SubsetRandomSampler  #for validation test

In [24]:
# Check that MPS is available, if not, check if CUDA is available, if not, use CPU
if not torch.backends.mps.is_available():
    if torch.cuda.is_available():
        device = torch.device("cuda:1")
    else:
        device = torch.device("cpu")
else:
    device = torch.device("mps")

In [37]:
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize(0.2859, 0.3530)
                               ])

trainset = datasets.FashionMNIST('MNIST_data/', download = True, train = True, transform = transform)
testset = datasets.FashionMNIST('MNIST_data/', download = True, train = False, transform = transform)

#Preparing for validaion test
indices = list(range(len(trainset)))
np.random.shuffle(indices)
#to get 20% of the train set
split = int(np.floor(0.2 * len(trainset)))
valid_sample = SubsetRandomSampler(indices[:split])
train_sample = SubsetRandomSampler(indices[split:])

#Data Loader
trainloader = torch.utils.data.DataLoader(trainset, sampler=train_sample, batch_size=64)
validloader = torch.utils.data.DataLoader(trainset, sampler=valid_sample, batch_size=64)
testloader = torch.utils.data.DataLoader(testset, batch_size = 64, shuffle = True)

len(trainloader)

750

In [38]:
# define some helper functions
def get_item(preds, labels):
    """function that returns the accuracy of our architecture"""
    return preds.argmax(dim=1).eq(labels).sum().item()

@torch.no_grad() # turn off gradients during inference for memory effieciency
def get_all_preds(network, dataloader):
    """function to return the number of correct predictions across data set"""
    all_preds = torch.tensor([])
    model = network
    for batch in dataloader:
        images, labels = batch
        preds = model(images) # get preds
        all_preds = torch.cat((all_preds, preds), dim=0) # join along existing axis
        
    return all_preds

def plot_confusion_matrix(cm,
                          target_names,
                          title='Confusion matrix',
                          cmap=None,
                          normalize=True):
    """
    given a sklearn confusion matrix (cm), make a nice plot

    Arguments
    ---------
    cm:           confusion matrix from sklearn.metrics.confusion_matrix

    target_names: given classification classes such as [0, 1, 2]
                  the class names, for example: ['high', 'medium', 'low']

    title:        the text to display at the top of the matrix

    cmap:         the gradient of the values displayed from matplotlib.pyplot.cm
                  see http://matplotlib.org/examples/color/colormaps_reference.html
                  plt.get_cmap('jet') or plt.cm.Blues

    normalize:    If False, plot the raw numbers
                  If True, plot the proportions

    Usage
    -----
    plot_confusion_matrix(cm           = cm,                  # confusion matrix created by
                                                              # sklearn.metrics.confusion_matrix
                          normalize    = True,                # show proportions
                          target_names = y_labels_vals,       # list of names of the classes
                          title        = best_estimator_name) # title of graph

    Citiation
    ---------
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html

    """
    import matplotlib.pyplot as plt
    import numpy as np
    import itertools

    accuracy = np.trace(cm) / np.sum(cm).astype('float')
    misclass = 1 - accuracy

    if cmap is None:
        cmap = plt.get_cmap('Blues')

    plt.figure(figsize=(15, 10))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    if target_names is not None:
        tick_marks = np.arange(len(target_names))
        plt.xticks(tick_marks, target_names, rotation=45)
        plt.yticks(tick_marks, target_names)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]


    thresh = cm.max() / 1.5 if normalize else cm.max() / 2
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        if normalize:
            plt.text(j, i, "{:0.4f}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
        else:
            plt.text(j, i, "{:,}".format(cm[i, j]),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")


    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
    plt.show()

In [39]:
class FashionCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels = 1, out_channels=6, kernel_size = 5)
        self.conv2 = nn.Conv2d(in_channels = 6, out_channels=12, kernel_size=5)
        
        self.fc1 = nn.Linear(in_features=12*4*4, out_features= 120)
        self.fc2 = nn.Linear(in_features = 120, out_features = 60)
        self.out = nn.Linear(in_features= 60, out_features = 10)
        
        
    def forward(self, tensor):
        
        # hidden layer 1
        tensor = self.conv1(tensor)
        tensor = F.relu(tensor)
        tensor = F.max_pool2d(tensor, kernel_size = 2, stride= 2)
        
        # hidden layer 2
        
        tensor = self.conv2(tensor)
        tensor = F.relu(tensor)
        tensor = F.max_pool2d(tensor, kernel_size = 2, stride = 2)
        
        #hidden layer 3
        
        tensor = tensor.reshape(-1, 12 * 4* 4)
        tensor = self.fc1(tensor)
        tensor = F.relu(tensor)
        
        #hidden layer 4
        
        tensor = self.fc2(tensor)
        tensor = F.relu(tensor)
        
        #output layer
        
        tensor = self.out(tensor)
        
        return tensor

In [40]:
cnn_model = FashionCNN().to(device)
optimizer = torch.optim.Adam(cnn_model.parameters(), lr = 0.005)
criterion = nn.CrossEntropyLoss()
print(cnn_model) # print model structure

FashionCNN(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(6, 12, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=192, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=60, bias=True)
  (out): Linear(in_features=60, out_features=10, bias=True)
)


In [41]:
train_losses, valid_losses = [], []
epochs = 10

# Lists for knowing classwise accuracy
predictions_list, labels_list = [], []

for e in range(epochs):
  running_loss = 0
  for images, labels in trainloader:
    # Flatten Fashion-MNIST images into a 784 long vector
    images = images.to(device)
    labels = labels.to(device)
    # Training pass
    optimizer.zero_grad()
    
    output = cnn_model.forward(images)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()
    
    running_loss += loss.item()
  else:
    valid_loss, correct, total = 0, 0, 0
    
    # Turn off gradients for validation, saves memory and computation
    with torch.no_grad():
      # Set the model to evaluation mode
      cnn_model.eval()
      
      # Validation pass
      for images, labels in validloader:
        images = images.to(device)
        labels = labels.to(device)
        log_ps = cnn_model(images)
        valid_loss += criterion(log_ps, labels)
        
        predictions = torch.max(log_ps, 1)[1].to(device)
        predictions_list.append(predictions)
        correct += (predictions == labels).sum()
        total += len(labels)
    
    accuracy = correct * 100 / total
    train_losses.append(running_loss/len(trainloader))
    valid_losses.append(valid_loss/len(validloader))
    cnn_model.train()
    
    
    print("Epoch: {}/{}..".format(e+1, epochs),
          "Training loss: {:.3f}..".format(running_loss/len(trainloader)),
          "Validation loss: {:.3f}..".format(valid_loss/len(validloader)),
          "Validation Accuracy: {:.3f}".format(accuracy))
    torch.save(cnn_model.state_dict(), 'model.pt')

Epoch: 1/10.. Training loss: 0.541.. Validation loss: 0.392.. Validation Accuracy: 85.567
Epoch: 2/10.. Training loss: 0.377.. Validation loss: 0.344.. Validation Accuracy: 87.308
Epoch: 3/10.. Training loss: 0.340.. Validation loss: 0.336.. Validation Accuracy: 87.533
Epoch: 4/10.. Training loss: 0.321.. Validation loss: 0.342.. Validation Accuracy: 87.825
Epoch: 5/10.. Training loss: 0.308.. Validation loss: 0.338.. Validation Accuracy: 87.692
Epoch: 6/10.. Training loss: 0.297.. Validation loss: 0.348.. Validation Accuracy: 87.600
Epoch: 7/10.. Training loss: 0.289.. Validation loss: 0.344.. Validation Accuracy: 87.750
Epoch: 8/10.. Training loss: 0.277.. Validation loss: 0.341.. Validation Accuracy: 87.642
Epoch: 9/10.. Training loss: 0.275.. Validation loss: 0.319.. Validation Accuracy: 88.267
Epoch: 10/10.. Training loss: 0.272.. Validation loss: 0.350.. Validation Accuracy: 87.442


In [42]:
def output_label(label):
    output_mapping = {
                 0: "T-shirt/Top",
                 1: "Trouser",
                 2: "Pullover",
                 3: "Dress",
                 4: "Coat", 
                 5: "Sandal", 
                 6: "Shirt",
                 7: "Sneaker",
                 8: "Bag",
                 9: "Ankle Boot"
                 }
    input = (label.item() if type(label) == torch.Tensor else label)
    return output_mapping[input]

In [43]:
class_correct = [0. for _ in range(10)]
total_correct = [0. for _ in range(10)]

with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        test = Variable(images)
        outputs = cnn_model(test)
        predicted = torch.max(outputs, 1)[1]
        c = (predicted == labels).squeeze()
        
        for i in range(len(labels)):
            label = labels[i]
            class_correct[label] += c[i].item()
            total_correct[label] += 1
        
for i in range(10):
    print("Accuracy of {}: {:.2f}%".format(output_label(i), class_correct[i] * 100 / total_correct[i]))

Accuracy of T-shirt/Top: 85.40%
Accuracy of Trouser: 96.30%
Accuracy of Pullover: 67.00%
Accuracy of Dress: 90.60%
Accuracy of Coat: 92.00%
Accuracy of Sandal: 94.60%
Accuracy of Shirt: 51.70%
Accuracy of Sneaker: 96.90%
Accuracy of Bag: 97.40%
Accuracy of Ankle Boot: 95.20%


# Training with Poisoned Attack

- knowledge-oblivious–the attacker shall have no knowledge of the target model’s parameters/structures, nor the original training datasets, 

- cleanlabel–the attacker shall not be able to control the labeling process, and

- clean-test–test-time instances shall not be required to be modified using added adversarial perturbations for attacking effectiveness (https://www.ecva.net/papers/eccv_2020/papers_ECCV/papers/123720137.pdf)

In [89]:
class FashionDataset(Dataset):
    """User defined class to build a datset using Pytorch class Dataset."""
    
    def __init__(self,X,Y, transform = None):
        """Method to initilaize variables.""" 
        self.images = X
        self.labels = Y
        self.transform = transform
        

    def __getitem__(self, index):
        label = self.labels[index]
        image = self.images[index]
        
        if self.transform is not None:
            image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.images)

In [99]:
size_of_infected=30000

x_train, y_train = trainset.data, trainset.targets

#generate the infected random indices from the training sample
generatePoisonIdx = torch.from_numpy(np.random.choice(len(x_train), size=(size_of_infected,), replace=False)) #poison index
#generate normal indices excluding from the training sample
trainIdx = torch.tensor([i for i in range(len(x_train)) if i not in generatePoisonIdx]) #true index

In [100]:
#indexing all the infected
x_pois_train, y_pois_train = x_train[generatePoisonIdx],y_train[generatePoisonIdx]
#indexing all the noromal
x_train_new, y_train_new = x_train[trainIdx],y_train[trainIdx]

In [101]:
#mix up the labels(infecting the infected) 
for index,value in enumerate(y_pois_train):
    randomIdx = np.random.choice(10)
    if randomIdx!=value.item():
        y_pois_train[index]=randomIdx
    else:
        randomIdx = np.random.choice(10)
        y_pois_train[index]=randomIdx
        
x_poison_train = torch.cat([x_train_new,x_pois_train],dim=0)
y_poison_train = torch.cat([y_train_new,y_pois_train],dim=0)

print("Training samples after infection : ",x_poison_train.size())
print("Labels samples after infection : ",y_poison_train.size())

Training samples after infection :  torch.Size([60000, 28, 28])
Labels samples after infection :  torch.Size([60000])


In [102]:
transform_new = transforms.Compose([
                                transforms.ToPILImage(),
                                transforms.ToTensor(),
                                transforms.Normalize(0.2859, 0.3530)
                               ])

dataset = FashionDataset(x_poison_train,y_poison_train,transform_new)

# Preparing for validaion test
indices = list(range(len(dataset)))
np.random.shuffle(indices)

#to get 20% of the train set
split = int(np.floor(0.2 * len(dataset)))
valid_sample = SubsetRandomSampler(indices[:split])
train_sample = SubsetRandomSampler(indices[split:])

#Data Loader
trainloader = torch.utils.data.DataLoader(dataset, sampler=train_sample, batch_size=64)
validloader = torch.utils.data.DataLoader(dataset, sampler=valid_sample, batch_size=64)
testloader = torch.utils.data.DataLoader(testset, batch_size = 12, shuffle = True)

In [103]:
train_losses, valid_losses = [], []
epochs = 10

# Lists for knowing classwise accuracy
predictions_list, labels_list = [], []

for e in range(epochs):
  running_loss = 0
  for images, labels in trainloader:
    # Flatten Fashion-MNIST images into a 784 long vector
    images = images.to(device)
    labels = labels.to(device)
    # Training pass
    optimizer.zero_grad()
    
    output = cnn_model.forward(images)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()
    
    running_loss += loss.item()
  else:
    valid_loss, correct, total = 0, 0, 0
    
    # Turn off gradients for validation, saves memory and computation
    with torch.no_grad():
      # Set the model to evaluation mode
      cnn_model.eval()
      
      # Validation pass
      for images, labels in validloader:
        images = images.to(device)
        labels = labels.to(device)
        log_ps = cnn_model(images)
        valid_loss += criterion(log_ps, labels)
        
        predictions = torch.max(log_ps, 1)[1].to(device)
        predictions_list.append(predictions)
        correct += (predictions == labels).sum()
        total += len(labels)
    
    accuracy = correct * 100 / total
    train_losses.append(running_loss/len(trainloader))
    valid_losses.append(valid_loss/len(validloader))
    cnn_model.train()
    
    
    print("Epoch: {}/{}..".format(e+1, epochs),
          "Training loss: {:.3f}..".format(running_loss/len(trainloader)),
          "Validation loss: {:.3f}..".format(valid_loss/len(validloader)),
          "Validation Accuracy: {:.3f}".format(accuracy))
    torch.save(cnn_model.state_dict(), 'model.pt')

Epoch: 1/10.. Training loss: 2.250.. Validation loss: 2.241.. Validation Accuracy: 21.767
Epoch: 2/10.. Training loss: 2.239.. Validation loss: 2.235.. Validation Accuracy: 22.175
Epoch: 3/10.. Training loss: 2.238.. Validation loss: 2.242.. Validation Accuracy: 21.958
Epoch: 4/10.. Training loss: 2.236.. Validation loss: 2.243.. Validation Accuracy: 21.475
Epoch: 5/10.. Training loss: 2.237.. Validation loss: 2.241.. Validation Accuracy: 21.858
Epoch: 6/10.. Training loss: 2.234.. Validation loss: 2.244.. Validation Accuracy: 21.533
Epoch: 7/10.. Training loss: 2.233.. Validation loss: 2.243.. Validation Accuracy: 22.225
Epoch: 8/10.. Training loss: 2.231.. Validation loss: 2.245.. Validation Accuracy: 21.700
Epoch: 9/10.. Training loss: 2.230.. Validation loss: 2.246.. Validation Accuracy: 22.058
Epoch: 10/10.. Training loss: 2.228.. Validation loss: 2.250.. Validation Accuracy: 21.558


In [104]:
def output_label(label):
    output_mapping = {
                 0: "T-shirt/Top",
                 1: "Trouser",
                 2: "Pullover",
                 3: "Dress",
                 4: "Coat", 
                 5: "Sandal", 
                 6: "Shirt",
                 7: "Sneaker",
                 8: "Bag",
                 9: "Ankle Boot"
                 }
    input = (label.item() if type(label) == torch.Tensor else label)
    return output_mapping[input]

In [105]:
class_correct = [0. for _ in range(10)]
total_correct = [0. for _ in range(10)]

with torch.no_grad():
    for images, labels in testloader:
        images, labels = images.to(device), labels.to(device)
        test = Variable(images)
        outputs = cnn_model(test)
        predicted = torch.max(outputs, 1)[1]
        c = (predicted == labels).squeeze()
        
        for i in range(len(labels)):
            label = labels[i]
            class_correct[label] += c[i].item()
            total_correct[label] += 1
        
for i in range(10):
    print("Accuracy of {}: {:.2f}%".format(output_label(i), class_correct[i] * 100 / total_correct[i]))

Accuracy of T-shirt/Top: 79.00%
Accuracy of Trouser: 95.40%
Accuracy of Pullover: 73.30%
Accuracy of Dress: 75.90%
Accuracy of Coat: 59.30%
Accuracy of Sandal: 78.90%
Accuracy of Shirt: 26.40%
Accuracy of Sneaker: 90.10%
Accuracy of Bag: 83.30%
Accuracy of Ankle Boot: 82.60%


: 