In [276]:
from __future__ import print_function, division

import torch
import torch.optim as optim

from torch.optim import lr_scheduler
from torchvision import datasets, transforms
import time
import os

import torch.nn as nn

# TODO: You can modify the network architecture
class Net(nn.Module):   # references: https://medium.com/analytics-vidhya/resnet-understand-and-implement-from-scratch-d0eb9725e0db, https://towardsdatascience.com/resnets-for-cifar-10-e63e900524e0
    """
    Input - 1x32x32
    Output - 10
    """
    def __init__(self):
        super().__init__()
        self.network = "ResNet"
        self.initconv = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1, stride=1),   # initial convolution layer
            nn.BatchNorm2d(16),
            nn.ReLU(),
            )
        self.conv1 = nn.Sequential(     # no downsampling, 16 filters
            nn.Conv2d(16, 16, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(16)
        )
        self.conv2 = nn.Sequential(     # with downsampling, 16 filters
            nn.Conv2d(16, 16, kernel_size=3, padding=1, stride=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(16)
        )
        self.conv3 = nn.Sequential(     # no downsampling, 32 filters
            nn.Conv2d(32, 32, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(32)
        )
        self.conv4 = nn.Sequential(     # with downsampling, 32 filters
            nn.Conv2d(16, 32, kernel_size=3, padding=1, stride=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(32)
        )
        self.conv5 = nn.Sequential(     # no downsampling, 64 filters
            nn.Conv2d(64, 64, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(64)
        )
        self.conv6 = nn.Sequential(     # with downsampling, 64 filters
            nn.Conv2d(32, 64, kernel_size=3, padding=1, stride=2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(64)
        )
        self.identity1 = nn.Conv2d(16, 32, kernel_size=1, padding=0, stride=2)
        self.identity2 = nn.Conv2d(32, 64, kernel_size=1, padding=0, stride=2)
        
        self.relu = nn.ReLU()
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.fc = nn.Linear(64, 10)
        
    def forward(self, xb):
        
        xb = self.initconv(xb)
        
        convLayer = self.conv1(xb)
        xb = convLayer + xb
        xb = self.relu(xb)

        convLayer = self.conv1(xb)
        xb = convLayer + xb
        xb = self.relu(xb)
    
        convLayer = self.conv4(xb)      # downsample, increase channels to 32
        xb = convLayer + self.identity1(xb)
        xb = self.relu(xb)

        convLayer = self.conv3(xb)
        xb = convLayer + xb
        xb = self.relu(xb)
        
        convLayer = self.conv6(xb)      # downsample, increase channels to 64
        xb = convLayer + self.identity2(xb)
        xb = self.relu(xb)
        
        convLayer = self.conv5(xb)
        xb = convLayer + xb
        xb = self.relu(xb)

        xb = self.avgpool(xb)
        xb = self.flatten(xb)
        xb = self.fc(xb)
        
        return xb

In [277]:
# TODO: You can try different augmentation strategies
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomAffine(degrees=10, translate=(0,0.1)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.2),
        transforms.Resize((32,32)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),

    ]),
    'test': transforms.Compose([
        transforms.RandomAffine(degrees=10, translate=(0,0.1)),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.2),
        transforms.Resize((32,32)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]),
}

In [278]:
data_dir = 'data' # Suppose the dataset is stored under this folder
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']} # Read train and test sets, respectively.

train_dataloader = torch.utils.data.DataLoader(image_datasets['train'], batch_size=128,
                                             shuffle=True, num_workers=4)

test_dataloader = torch.utils.data.DataLoader(image_datasets['test'], batch_size=128,
                                             shuffle=False, num_workers=4)

train_size =len(image_datasets['train'])


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Set device to "cpu" if you have no gpu

In [279]:
def train_test(model, criterion, optimizer, scheduler, num_epochs=25):
    x = []  # training accuracy after each epoch
    y = []  # testing accuracy after each epoch
    train_loss = []
    train_accuracy = []
    val_loss = [] 
    val_accuracy = []
    history = dict()
    model.train()
    for epoch in range(num_epochs):
        running_training_loss = 0.0
        running_training_accuracy = 0.0
        iteration_training_loss = 0.0
        total_training_predictions = 0
       
        start_time = time.time()
        for i, data in enumerate(train_dataloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_training_loss += loss.item()*inputs.size(0)
            
            _, predicted = torch.max(outputs.data, 1)
            total_training_predictions += labels.size(0)
            running_training_accuracy += (predicted == labels).sum().item()
            iteration_training_loss += loss.item() 
            if (i+1) % 100 == 0:
                print('Epoch:[%d]-Iteration:[%d], training loss: %.3f' %
                      (epoch + 1,i+1,iteration_training_loss/(i+1)))
        end_time = time.time()
        print('Time cost of one epoch: [%d]s' % (end_time-start_time))
        
        epoch_training_accuracy = running_training_accuracy / train_size*100
        epoch_training_loss = running_training_loss / train_size
        
        print('Epoch:[%d], training accuracy: %.1f, training loss: %.3f' %
              (epoch + 1,epoch_training_accuracy, epoch_training_loss))
        
        x.append(round(epoch_training_accuracy, 2))

        # testing accuracy
        correct = 0
        total = 0
        model.eval()
        # Since we're not training, we don't need to calculate the gradients for our outputs
        with torch.no_grad():
            for data in test_dataloader:
                images, labels = data[0].to(device), data[1].to(device)
                # calculate outputs by running images through the network
                outputs = model(images)
                # the class with the highest energy is what we choose as prediction
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        accuracy = 100 * correct / total
        print('Accuracy of the network on test images: %d %%' % (
                accuracy))
        y.append(accuracy)

        model.train()

        train_loss.append(epoch_training_loss)
        train_accuracy.append(epoch_training_accuracy)
        
        scheduler.step()
        
    print('Finished Training')

    history['train_loss'] = train_loss
    history['train_accuracy'] = train_accuracy

    correct = 0
    total = 0
    classAccuracy = [[0,0] for i in range(10)]
    model.eval()
    # Since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for data in test_dataloader:
            images, labels = data[0].to(device), data[1].to(device)
            # calculate outputs by running images through the network
            outputs = model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            for c in range(0,10):
                c_labels = labels.eq(c)
                c_predicted = predicted.eq(c)
                classAccuracy[c][0] += (c_predicted & c_labels).sum().item()    # total number of correct predictions for c
                classAccuracy[c][1] += c_labels.sum().item()                     # total number of label c
    accuracy = 100 * correct / total
    print('Total accuracy of the network on test images: %d %%' % (
            accuracy))
    print(f"Class Accuracy:")
    for c in range(0,10):
        print(f"{c}: {100 * classAccuracy[c][0] / classAccuracy[c][1]}%")

    print(x)
    print(y)
    return history, accuracy


In [None]:
if __name__ == '__main__':
    end = time.time()
    model_ft = Net().to(device) # Model initialization
    print(model_ft.network)
    criterion = nn.CrossEntropyLoss() # Loss function initialization

    # TODO: Adjust the following hyper-parameters: initial learning rate, decay strategy of the learning rate, 
    #       number of training epochs
    optimizer_ft = optim.Adam(model_ft.parameters(), lr=1e-3) # The initial learning rate is 1e-3

    exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=5, gamma=0.8)
    
    history, accuracy = train_test(model_ft, criterion, optimizer_ft, exp_lr_scheduler,
               num_epochs=16)
    
    print("time required %.2fs" %(time.time() - end))