In [1]:
# Mount gdrive
from google.colab import drive
drive.mount('gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at gdrive


In [2]:
# Change dir to current directory
%cd 'gdrive/My Drive/ccm'

/content/gdrive/My Drive/ccm


In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, RandomSampler
import numpy as np
import math
import numbers
import sys
# Add directory code to sys.path
sys.path.insert(0, 'code')

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
device

device(type='cuda', index=0)

In [0]:
accuracy_list = []

def train(model, device, train_loader, optimizer, epoch, log_interval = 100):
    # Set model to training mode
    model.train()
    # Loop through data points
    for batch_idx, (data, target) in enumerate(train_loader):    
        # Send data and target to device
        data, target = data.to(device), target.to(device)
        
        # Zero out the optimizer
        optimizer.zero_grad()
        # Pass data through model
        output = model(data)
        # Compute the negative log likelihood loss
        loss = F.nll_loss(output, target)
        # Backpropagate loss
        loss.backward()
        
        # Make a step with the optimizer
        optimizer.step()
        
        # Print loss (uncomment lines below once implemented)
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            
# Define test method
def test(model, device, test_loader):
    # Set model to evaluation mode
    model.eval()
    # Variable for the total loss 
    test_loss = 0
    # Counter for the correct predictions
    num_correct = 0
    
    # don't need autograd for eval
    with torch.no_grad():
        # Loop through data points
        for data, target in test_loader:
            # Send data to device
            data, target = data.to(device), target.to(device)
            # Pass data through model
            output = model(data)
            # Compute the negative log likelihood loss with reduction='sum' and add to total test_loss
            # sum losses over minibatch
            test_loss += F.nll_loss(output, target, reduction = 'sum').item()
            # Get predictions from the model for each data point
            pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability                                                                 
                    
            # Add number of correct predictions to total num_correct 
            num_correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()

    
    # Compute the average test_loss
    avg_test_loss = test_loss / len(test_loader.dataset)
    
    # Print loss (uncomment lines below once implemented)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        avg_test_loss, num_correct, len(test_loader.dataset),
        100. * num_correct / len(test_loader.dataset)))

In [5]:
# Download data
trans = transforms.Compose([transforms.ToTensor(),
                            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
                            ])
cifar100Train = datasets.CIFAR100('data', train=True, transform=trans, download=True)
trainSampler = RandomSampler(cifar100Train)
train_loader = DataLoader(cifar100Train, batch_size=32, sampler=trainSampler)
cifar100test = datasets.CIFAR100('data', train=False, transform=trans, download=True)
testSampler = RandomSampler(cifar100test)
test_loader = DataLoader(cifar100test, batch_size=32, sampler=testSampler)

Files already downloaded and verified
Files already downloaded and verified


In [0]:
class akCNN(nn.Module):
    def __init__(self):
        super(akCNN, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv2d(3, 128, 3, padding=1),nn.ELU())

        self.stack1 = nn.Sequential(
        nn.Conv2d(128, 128, 3),
        nn.ELU(),
        nn.MaxPool2d(2),
        nn.Dropout2d(p=0.1)
        )
    
        self.stack2 = nn.Sequential(
        nn.Conv2d(128, 256, 3, padding=1),
        nn.ELU(),
        nn.Conv2d(256, 256, 3),
        nn.ELU(),
        nn.MaxPool2d(2),
        nn.Dropout2d(p=0.25)
        )
        
        self.stack3 = nn.Sequential(
        nn.Conv2d(256, 512, 3, padding=1),
        nn.ELU(),
        nn.Conv2d(512, 512, 3),
        nn.ELU(),
        nn.MaxPool2d(2),
        nn.Dropout2d(p=0.5)
        )
    
        self.last = nn.Sequential(
        nn.Linear(2048,1024),
        nn.ELU(),
        nn.Dropout2d(p=0.5),
        nn.Linear(1024,100),
        nn.LogSoftmax(dim=1)
        )
    
        
    def forward(self, x, verbose=False):
        x = self.conv1(x)
        x = self.stack1(x)
        x = self.stack2(x)
        x = self.stack3(x)
        x = torch.flatten(x,1)
        x = self.last(x)
        return x

In [0]:
# Define model and sent to device
'''
    sigma (float, optional): relative standard deviation used to generate the
    noise. Relative means that it will be multiplied by the magnitude of
    the value your are adding the noise to. This means that sigma can be
    the same regardless of the scale of the vector.
'''
            
model = akCNN()
model.to(device)
# Optimizer: SGD with learning rate of 1e-2 and momentum of 0.5
optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.5)

# Training loop with 5 epochs
for epoch in range(1, 5 + 1):
    # Train model
    train(model, device, train_loader, optimizer, epoch)
    # Test model
    # test(model, device, test_loader)

In [0]:
# Save the model weights
out_path = 'models/after_5_epochs.pt'
torch.save(model.state_dict(), out_path)
print(f'Model weights after 5 epochs saved to {out_path}')

Model weights after 5 epochs saved to models/after_5_epochs.pt


In [8]:
# load pretrained model weights
model = akCNN().to(device)
model.load_state_dict(torch.load('models/after_5_epochs.pt'))

<All keys matched successfully>

In [0]:
# Eval!
def eval(model, device, test_loader):
  # true_list, pred_list, pred_prob_list, rep_list = [], [], [], []
  true_list, pred_list, pred_prob_list = [], [], []
  model.eval()
  with torch.no_grad():
    for inputs, labels in test_loader:
      # Mount to device
      inputs = inputs.to(device)
      labels = labels.to(device)
      # Forward
      # out, rep = model(inputs)
      out = model(inputs)
      # Record outputs
      true_list.extend(labels.detach().cpu().tolist())
      pred_prob = F.softmax(out, dim=1).detach().cpu()
      pred_prob_list.extend(pred_prob.tolist())
      _, pred = torch.max(pred_prob, 1)
      pred_list.extend(pred.tolist())
      # rep_list.extend(rep.detach().cpu().tolist())
  return np.array(true_list), np.array(pred_list),np.array(pred_prob_list)

In [10]:
true_list, pred_list, pred_prob_list = eval(model, device, test_loader)
# rep_list = np.array(rep_list)
from sklearn.metrics import accuracy_score
print(accuracy_score(true_list, pred_list))

0.4299


In [0]:
import json
with open('data/mappings.json', 'r') as f:
  mappings = json.load(f)
  idxToLabel, mapping = mappings['idx2class'], mappings['superclass_classes']

In [0]:
class_2_superclass = {}
for key, values in mapping.items():
  for item in values:
    class_2_superclass[item] = key

In [13]:
mask = (pred_list != true_list)
mask.sum()

5701

In [15]:
def compute_superclass_match(true_list, pred_list, idxToLabel, class_2_superclass):
  vfunc = np.vectorize(lambda x: class_2_superclass[idxToLabel[x]])
  true_superclasses = vfunc(true_list)
  pred_superclasses = vfunc(pred_list)
  match = (true_superclasses == pred_superclasses)
  return match.mean()
match1 = compute_superclass_match(true_list[mask], pred_list[mask], idxToLabel, class_2_superclass)
match1

0.2345202596035783

Epoch 6-10

In [0]:
for epoch in range(5, 10 + 1):
    # Train model
    optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.5)
    train(model, device, train_loader, optimizer, epoch)



In [0]:
# Save the model weights
out_path2 = 'models/after_10_epochs.pt'
torch.save(model.state_dict(), out_path2)
print(f'Model weights after 10 epochs saved to {out_path2}')

Model weights after 10 epochs saved to models/after_10_epochs.pt


In [0]:
# Eval!
true_list2, pred_list2, pred_prob_list2 = eval(model, device, test_loader)
from sklearn.metrics import accuracy_score
print(accuracy_score(true_list2, pred_list2))

0.5253


In [0]:
mask = (pred_list2 != true_list2)
compute_superclass_match(true_list2[mask], pred_list2[mask], idxToLabel, class_2_superclass)

0.291763218875079

Epoch 11-16

In [0]:
for epoch in range(11, 16):
    # Train model
    optimizer = optim.SGD(model.parameters(), lr=1e-2, momentum=0.5)
    train(model, device, train_loader, optimizer, epoch)



In [0]:
# Save the model weights
out_path3 = 'models/after_15_epochs.pt'
torch.save(model.state_dict(), out_path3)
print(f'Model weights after 15 epochs saved to {out_path3}')

Model weights after 15 epochs saved to models/after_15_epochs.pt


In [0]:
true_list3, pred_list3, pred_prob_list3 = eval(model, device, test_loader)
print(accuracy_score(true_list3, pred_list3))
mask = (pred_list3 != true_list3)
compute_superclass_match(true_list3[mask], pred_list3[mask], idxToLabel, class_2_superclass)

0.5516


0.2986173059768064