Define dataset for cifar10

In [None]:
from torch.utils.data import Dataset
import glob
import numpy as np
import albumentations as A
import pickle


class MyDataset(Dataset):
    def __init__(self, train=True):
        super().__init__()
        # get all files in dataset folder, different files for train and test mode
        if train:
            files = sorted(glob.glob("./cifar-10-batches-py/data_*"))
        else:
            files = glob.glob("./cifar-10-batches-py/test_*")
        self.train = train
        data = []
        label = []
        # load files
        for file in files:
            with open(file, 'rb') as fo:
                dic = pickle.load(fo, encoding='latin1')
                data.append(dic['data'])
                label.append(dic['labels'])
        self.data = np.concatenate(data)
        self.label = np.concatenate(label)
        # specify preprocess operation for different mode
        if train:
            self.transform = A.Compose([
                A.Cutout(),
                A.PadIfNeeded(36, 36),
                A.RandomCrop(32, 32),
                A.HorizontalFlip(),
                A.RandomRotate90(),
                A.ChannelShuffle(),
                A.ISONoise(),
                A.RandomBrightnessContrast(),
                A.ColorJitter(),
                A.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
            ])
        else:
            self.transform = A.Compose([
                A.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
            ])

    def __getitem__(self, index):
        # get image with specified index and perform related process defined in initial part
        x = self.data[index]
        y = self.label[index]
        x = np.reshape(x, (3, 32, 32))
        x = np.transpose(x, (1, 2, 0))
        x = self.transform(image=x)['image']
        x = np.transpose(x, (2, 0, 1))
        return {"x": x, "y": y}

    def __len__(self):
        # return dataset length
        return len(self.data)

Define neural network model

In [None]:
import torch.nn as nn

# define residual block ( equation in report)
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU())
        self.conv2 = nn.Sequential(
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels))
        self.downsample = downsample
        self.relu = nn.ReLU()
        self.out_channels = out_channels

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.conv2(out)
        if self.downsample:
            residual = self.downsample(x)
        out += residual
        out = self.relu(out)
        return out

# define a resnet with 3 residual blocks and 2 Fully connected layer
class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet, self).__init__()
        self.inplanes = 32
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=7, stride=1, padding=3),
            nn.BatchNorm2d(32),
            nn.ReLU())
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer0 = self._make_layer(32, 2, stride=1)
        self.layer1 = self._make_layer(64, 2, stride=1)
        self.layer2 = self._make_layer(128, 2, stride=1)
        # self.layer3 = self._make_layer(512, 2, stride=2)
        self.fc1 = nn.Linear(32768, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def _make_layer(self, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes, kernel_size=1, stride=stride),
                nn.BatchNorm2d(planes),
            )
        layers = [ResidualBlock(self.inplanes, planes, stride, downsample)]
        self.inplanes = planes
        for i in range(1, blocks):
            layers.append(ResidualBlock(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.maxpool(x)
        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        # x = self.layer3(x)
        x = x.view(x.size(0), -1)
        # print(x.shape)
        x = self.fc1(x)
        x = self.fc2(x)

        return x


Define training and testing loop

In [None]:
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
from torch.optim import Adam
from tqdm import tqdm
from torchsummary import summary
from tensorboardX import SummaryWriter


def train():
    # specify hyper-parameters here
    device = "cuda" if torch.cuda.is_available() else "cpu"
    batch_size = 128
    lr = 1e-3
    max_epoch = 200
    eval_per_epoch = 2
    # prepare two dataloaders
    train_dataset = MyDataset(train=True)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=8, shuffle=True)
    test_dataset = MyDataset(train=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, num_workers=4)
    # initial network model and output to txt for network architecture
    model = ResNet().to(device)
    txt = str(summary(model, (3, 32, 32)))
    text_file = open("model.txt", "w")
    text_file.write(txt)
    text_file.close()
    crit = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=lr)
    # use tensorboard to output collected data
    logger = SummaryWriter()
    global_step = 0
    high_acc = 0
    for epoch in range(max_epoch):
        model.train()
        print("epoch:{}".format(epoch))
        for data in tqdm(train_loader):
            # get data from dataloader
            x = data['x'].to(device)
            y = data["y"].to(device)
            optimizer.zero_grad()
            # forward computing
            pred = model(x)
            # compute loss
            loss = crit(pred, y)

            global_step += 1
            # record data
            logger.add_scalar("loss", loss.item(), global_step=global_step)
            # do optimization
            loss.backward()
            optimizer.step()
        # do evaluation
        if epoch % eval_per_epoch == 0:
            print("eval")
            model.eval()
            with torch.no_grad():
                acc_list = []
                for data in tqdm(test_loader):
                    x = data['x'].to(device)
                    y = data["y"].to(device)
                    pred = model(x)
                    pred = torch.argmax(pred, dim=-1)
                    acc = sum(pred == y) / len(pred)
                    acc_list.append(acc.item())
                acc = np.average(acc_list)
                if acc > high_acc:
                    high_acc = acc
                    # save model if it's better than before
                    torch.save(model, "./model{}.pth".format(high_acc))
                logger.add_scalar("acc", acc, epoch//eval_per_epoch)

                print("accuracy:{}".format(acc))


if __name__ == '__main__':
    train()
