# Prepare Environment

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
from torchsummary import summary
import matplotlib.pyplot as plt
import time
import os
import copy

In [None]:
# CPU or GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Dataset

In [None]:
data_dir = '/content/drive/MyDrive/Colab Notebooks/data/thaifood'

In [None]:
# Transformations
# Train set: Data augmentation and normalization
# Validation set: Just normalization (no randomness)
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'valid': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

# Dataset
image_datasets = {}
for k in data_transforms.keys():
    image_datasets[k] = datasets.ImageFolder(
        root=os.path.join(data_dir, k), 
        transform=data_transforms[k])

# DataLoader
batch_size = 16
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], 
                                              batch_size=batch_size,
                                              shuffle=True, 
                                              num_workers=4)
              for x in ['train', 'valid', 'test']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'valid', 'test']}
class_names = image_datasets['train'].classes

In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    # c, h, w --> h, w, c
    inp = inp.numpy().transpose((1, 2, 0))
    
    # De-normalize
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)

    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

# Plot
imshow(out, title=[class_names[x] for x in classes])

# Transfer Learning

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'valid' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best valid Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['valid']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)

    # Test mode
    model.eval()

    # Predict on test set
    loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            # Prediction
            pred = model(X)
            y_pred = pred.argmax(1)
            # Compute loss
            loss += loss_fn(pred, y).item()
            # Correct predictions
            correct += (y_pred == y).type(torch.float).sum().item()

    # Average loss
    loss /= num_batches

    # Accuracy
    accuracy = correct / size

    return loss, accuracy

## Pre-trained Model as Fixed Feature Extractor

In [None]:
# Pre-trained weights
model = torchvision.models.resnet18(pretrained=True)
# Freeze pre-trained weights --> no update during training
for param in model.parameters():
    param.requires_grad = False

In [None]:
# Let see the layers in the model
print(model)

In [None]:
# Get the last layer of the resnet18
num_ftrs = model.fc.in_features

# Replace the last layer with the classification layer
# Note: Parameters of newly constructed modules have requires_grad=True by default
model.fc = nn.Linear(num_ftrs, len(class_names))

# Move to desired device
model = model.to(device)

In [None]:
# Check number of trainable parameters
summary(model, input_size=(3, 224, 224))

In [None]:
# Loss
criterion = nn.CrossEntropyLoss()

# Observe that only parameters of final layer are being optimized as
# opposed to before.
optimizer_conv = optim.Adam(model.fc.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_conv, step_size=7, gamma=0.1)

In [None]:
# Train the model
model = train_model(
    model, criterion, 
    optimizer_conv, exp_lr_scheduler, 
    num_epochs=10)

In [None]:
visualize_model(model)

In [None]:
test_loss, test_acc = test(dataloaders['test'], model, criterion)
print(f"Test: loss={test_loss:>8f}, acc={(100*test_acc):>0.1f}%")

## Model Fine-tuning

In [None]:
layers = list(model.children())
for li, l in enumerate(layers):
    print(f"{li}: {l}")

In [None]:
# Determine how many layers to freeze
fine_tune_at = 7
ct = 0
for child in model.children():
    ct += 1
    if ct < 7:
        for param in child.parameters():
            param.requires_grad = False
    else:
        for param in child.parameters():
            param.requires_grad = True

In [None]:
summary(model, input_size=(3, 224, 224))

In [None]:
# Loss
criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
# Note: we typically use a lower learning rate for model fine-tuning
optimizer_ft = optim.Adam(model.parameters(), lr=0.0001)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
model = train_model(
    model, criterion, 
    optimizer_ft, exp_lr_scheduler,
    num_epochs=20)

In [None]:
visualize_model(model)

In [None]:
test_loss, test_acc = test(dataloaders['test'], model, criterion)
print(f"Test: loss={test_loss:>8f}, acc={(100*test_acc):>0.1f}%")

# Exercise: Create your own custom dataset

You need to create a new custom dataset for image classification whose datasets can be easily obtained from the Internet. Your dataset **MUST** have at least 4 classes. Then use this notebook to do transfer learning. You may want to adjust the code as appropriate:
* Change the model (e.g., EfficientNet, Inception, etc.) [[link](https://pytorch.org/vision/stable/models.html#table-of-all-available-classification-weights)]
* Change the training parameters (e.g., batch_size, learning rate, epoch, etc.)
* Change the number of layer to be frozen.
* and so on.

Please make sure that you have enough training (>=15), valid (>=5) and test (>=5) examples for each class.

In [None]:
# YOUR CODE HERE