In [12]:
from __future__ import print_function, division

import torch
import torch.optim as optim
import copy

from torch.optim import lr_scheduler
from torchvision import datasets, transforms
import time
import os

import torch.nn as nn

class Net(nn.Module):
    """
    Input - 1x32x32
    Output - 10
    """
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),
            
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2, 2),
            
            nn.Flatten(),
            nn.Linear(128*4*4, 256),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(p=0.5),
            nn.Linear(256, 10)
        )
        
    def forward(self, xb):
        return self.network(xb)

In [13]:
data_dir = '/Users/hannaholivia/Downloads/ASS_2_ML/data' # Suppose the dataset is stored under this folder
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'test']} # Read train and test sets, respectively.

train_dataloader = torch.utils.data.DataLoader(image_datasets['train'], batch_size=256, ##
                                             shuffle=True, num_workers=4)

test_dataloader = torch.utils.data.DataLoader(image_datasets['test'], batch_size=256, ##
                                             shuffle=False, num_workers=4)

train_size =len(image_datasets['train'])


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Set device to "cpu" if you have no gpu

# Split the training set into training and validation sets
train_dataset = image_datasets['train']
val_size = int(0.1 * len(train_dataset))
train_size = len(train_dataset) - val_size
train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=4)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=4)
test_dataloader = torch.utils.data.DataLoader(image_datasets['test'], batch_size=256, shuffle=False, num_workers=4)

In [14]:
def train_test(model, criterion, optimizer, scheduler, num_epochs=25):
    train_loss = []
    train_accuracy = []
    val_loss = []
    val_accuracy = []
    history = dict()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")

        # Training Phase
        model.train()
        running_training_loss = 0.0
        running_training_accuracy = 0.0
        total_training_predictions = 0

        start_time = time.time()
        for i, data in enumerate(train_dataloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_training_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs.data, 1)
            total_training_predictions += labels.size(0)
            running_training_accuracy += (predicted == labels).sum().item()

        end_time = time.time()
        epoch_time = end_time - start_time


        epoch_training_accuracy = running_training_accuracy / train_size * 100
        epoch_training_loss = running_training_loss / train_size

        train_loss.append(epoch_training_loss)
        train_accuracy.append(epoch_training_accuracy)

        # Validation Phase
        model.eval()
        running_val_loss = 0.0
        running_val_accuracy = 0.0
        total_val_predictions = 0

        with torch.no_grad():
            for data in val_dataloader:
                inputs, labels = data[0].to(device), data[1].to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                running_val_loss += loss.item() * inputs.size(0)

                _, predicted = torch.max(outputs.data, 1)
                total_val_predictions += labels.size(0)
                running_val_accuracy += (predicted == labels).sum().item()

        epoch_val_accuracy = running_val_accuracy / val_size * 100
        epoch_val_loss = running_val_loss / val_size

        val_loss.append(epoch_val_loss)
        val_accuracy.append(epoch_val_accuracy)

        print(f"Epoch {epoch+1} time: {epoch_time:.2f}s | Train Loss: {epoch_training_loss:.4f}, Train Acc: {epoch_training_accuracy:.2f}% | Val Loss: {epoch_val_loss:.4f}, Val Acc: {epoch_val_accuracy:.2f}%")

        if epoch_val_accuracy > best_acc:
            best_acc = epoch_val_accuracy
            best_model_wts = copy.deepcopy(model.state_dict())

        scheduler.step(epoch_val_accuracy)

    print('Finished Training')

    model.load_state_dict(best_model_wts)


    history['train_loss'] = train_loss
    history['train_accuracy'] = train_accuracy
    history['val_loss'] = val_loss
    history['val_accuracy'] = val_accuracy

    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in test_dataloader:
            images, labels = data[0].to(device), data[1].to(device)
            # calculate outputs by running images through the network
            outputs = model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print('Accuracy of the network on test images: %d %%' % (accuracy))
    
    class_correct = list(0. for i in range(10))
    class_total = list(0. for i in range(10))
    model.eval()
    with torch.no_grad():
        for data in test_dataloader:
            images, labels = data[0].to(device), data[1].to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            c = (predicted == labels).squeeze()
            for i in range(labels.size(0)):
                label = labels[i]
                class_correct[label] += c[i].item()
                class_total[label] += 1
    
    for i in range(10):
        accuracy = 100 * class_correct[i] / class_total[i]
        print('Accuracy of digit %d: %2d %%' % (i, accuracy))

    best_val_acc = max(val_accuracy)
    best_train_acc = max(train_accuracy)
    min_train_loss = min(train_loss)
    min_val_loss = min(val_loss)
    
    print('Best Validation Accuracy: {:.2f}%'.format(best_val_acc))
    print('Best Training Accuracy: {:.2f}%'.format(best_train_acc))
    print('Minimum Training Loss: {:.4f}'.format(min_train_loss))
    print('Minimum Validation Loss: {:.4f}'.format(min_val_loss))

    return history, accuracy

In [15]:
if __name__ == '__main__':
    end = time.time()
    model_ft = Net().to(device) # Model initialization
    print(model_ft.network)
    criterion = nn.CrossEntropyLoss() # Loss function initialization

    optimizer_ft = optim.Adam(model_ft.parameters(), lr=1e-3) # The initial learning rate is 1e-3

    exp_lr_scheduler = lr_scheduler.ReduceLROnPlateau(optimizer_ft, mode='min', factor=0.1, patience=5, verbose=True)

    history, accuracy = train_test(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)
    
    print("Time required: %.2fs" % (time.time() - end))

Sequential(
  (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (1): ReLU()
  (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (4): ReLU()
  (5): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (7): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (8): ReLU()
  (9): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (10): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (11): ReLU()
  (12): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (13): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (14): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (15): ReLU()
  (16): BatchNorm2d(128, eps=1e-05,