# Homework 2, Step 2. CNN for CIFAR-100

In the step 2, you need to try CNN on the CIFAR-100 classification.

## 1. Prepare the dataset and the model
### Import Library

In [None]:
import os
import sys

import numpy as np
import torch
import torch.backends.cudnn as cudnn
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import time

import tqdm

### Hyperparameters
**For step 3, 4, 5,** you need to change the value of momentum, weight decay, data augmentation and batch normalization, to see the difference.

`mmt`: momentum for the optimizer. Use `0` if you do not want to use the momentum.

`wd`: weight decay for the optimizer. Use `0` if you do not want to use the weight decay.

`data_augmentation`: whether to use the data augmentation for the training.

`use_BN`: wheter to use the batch normalization for the training. 

In [None]:
lr = 0.05 # learning rate
opt = 'sgd'
batchsize = 256 # training batchsize

mmt = 0. # momentum for optimizer
wd = 0. # weight_decay for optimizer
data_augmentation = False
use_BN = False

### Load Dataset
In pytorch, you can use the following API to load the dataset.

The RGB mean and std are pre-calculated values for normalizing the data. **Do not modify them**.

In [None]:
rgb_mean = np.array([0.4914, 0.4822, 0.4465])
rgb_std = np.array([0.2023, 0.1994, 0.2010])
if data_augmentation:
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(), 
        transforms.ToTensor(),
        transforms.Normalize(rgb_mean, rgb_std),
    ])
else:
    transform_train = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(rgb_mean, rgb_std),
    ])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(rgb_mean, rgb_std),
])

trainset = torchvision.datasets.CIFAR100(
    root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=batchsize, shuffle=True, num_workers=1)

testset = torchvision.datasets.CIFAR100(
    root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=500, shuffle=False, num_workers=1)

### Define the CNN Model

In [None]:
class Identity(nn.Module):
    def __init__(self) -> None:
        super().__init__()

    def forward(self, x):
        return x

def _weights_init(m):
    if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
        init.kaiming_normal_(m.weight)

def compute_feature_map_size(ks, pks):
    s = 32
    for k, pk in zip(ks, pks):
        s = s-k+1
        s = int(s/pk)
    return s

class LeNet(nn.Module):
    def __init__(self, num_classes, hidden_chns, ks, pks):
        super(LeNet, self).__init__()
        in_chns, out_chns = [3, *hidden_chns[:-1]], hidden_chns
        layers = []
        for ic, oc, k, pk in zip(in_chns, out_chns, ks, pks):
            layers.append(nn.Conv2d(ic, oc, kernel_size=k))
            if use_BN:
                layers.append(nn.BatchNorm2d(oc))
            layers.append(nn.MaxPool2d(kernel_size=pk) if pk>1 else Identity()) 
            layers.append(nn.ReLU(inplace=True))
        self.layers = nn.Sequential(*layers)

        s = compute_feature_map_size(ks, pks)
        print(f"feature size: {s}")

        self.fc = nn.Linear(hidden_chns[-1] * s * s, num_classes)

    def forward(self, x):
        x = self.layers(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

def lenet(hidden_chns, ks, pks):
    return LeNet(num_classes=100, hidden_chns=hidden_chns, ks=ks, pks=pks)

## 2. Define the model and run

### Training settings
You may modify the `num_epochs` for the fast training or the better performance.

In [None]:
num_epochs = 200  # training epochs
best_acc = 0.0  # best accuracy

### Define the model, optimizer, loss function, learning rate scheduler
For step 2, 3, 4, 5, you need to change the network structure of the CNN.

`hidden_chns`: a list of the hidden channels of the conv layer.

`ks`: a list of the kernel sizes of the conv layer.

`pks`: a list of the pooling kernel sizes of the pooling layer. Use `1` if you do not want to use a pooling layer (and it will be an identity function).

In [None]:
net = lenet(hidden_chns=[16, 16], ks=[3,3], pks=[2,2])
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)
net = net.to(device)  # put the model on the specified device(e.g. gpu/cpu)

# loss function
criterion = nn.CrossEntropyLoss()

# optimizer
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=mmt, weight_decay=wd) # momentum

# learning rate scheduler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

### Check the total number of parameters

In [None]:
def count_params(net):
    # you can use this function to count amount of your model parameters
    import numpy as np
    total_params = 0

    for x in filter(lambda p: p.requires_grad, net.parameters()):
        total_params += np.prod(x.data.cpu().numpy().shape)
    print("Total number of params", total_params)
    print("Total layers", len(list(filter(lambda p: p.requires_grad and len(p.data.size())>1, net.parameters()))))


count_params(net)

### Training logs
The training logs are saved in the `exp` folder. You can use tensorboard to see the logs.

In [None]:
if not os.path.exists("exp"):
    os.mkdir("exp")
last_train = max([eval(s.split("-")[-1]) for s in os.listdir("exp")] + [0])
current_train = last_train + 1
save_dir = "exp/cifar100-{}".format(current_train)
os.makedirs(save_dir)
writer = SummaryWriter(save_dir)

best_acc = 0

### Training and testing

In [None]:
# Training
def train(epoch):
    net.train()
    train_loss = 0
    correct = 0
    total = 0

    with tqdm.tqdm(enumerate(trainloader), total=len(trainloader)) as t:
        t.set_description(f"Epoch {epoch} train")
        for batch_idx, (inputs, targets) in t:
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            writer.add_scalars(
                "loss",
                {"train": loss.item()},
                global_step=epoch * len(trainloader) + batch_idx,
            )

            t.set_postfix(
                {
                    "loss": f"{train_loss/(batch_idx+1):.3f}",
                    "acc": f"{100.*correct/total:.3f}%, {correct}/{total}",
                }
            )


# validation
def test(epoch):
    global best_acc
    net.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        with tqdm.tqdm(enumerate(testloader), total=len(testloader)) as t:
            t.set_description(f"Epoch {epoch}  test")
            for batch_idx, (inputs, targets) in t:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = net(inputs)
                loss = criterion(outputs, targets)

                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += targets.size(0)
                correct += predicted.eq(targets).sum().item()

                t.set_postfix(
                    {
                        "loss": f"{test_loss / (batch_idx + 1):.3f}",
                        "acc": f"{correct*100./total:.3f}%, {correct}/{total}",
                    }
                )

    writer.add_scalars(
        "loss", {"test": loss.item()}, global_step=epoch * len(trainloader)
    )
    # Save checkpoint.
    acc = 100.0 * correct / total
    if acc > best_acc:
        print("Saving..")
        state = {
            "net": net.state_dict(),
            "acc": acc,
            "epoch": epoch,
        }
        torch.save(state, os.path.join(save_dir, "ckpt.pth"))
        best_acc = acc

    return acc


for epoch in range(0, num_epochs):
    tic = time.time()
    train(epoch)
    test_acc = test(epoch)
    t = time.time() - tic
    print(
        f"Epoch {epoch} | total time: {t:.0f}s, test acc: {test_acc:.3f}%, best acc: {best_acc:.3f}%"
    )
    scheduler.step()