In [None]:
# Reference: 
# Resnet / VGG / Alex net implementation:
# https://github.com/pytorch/vision/blob/master/torchvision/models/resnet.py
# https://gist.github.com/amqdn/211b84d93bf05becbba89ecbca2ba20c
# https://pages.stat.wisc.edu/~sraschka/teaching/stat453-ss2020/
# https://github.com/meliketoy
# https://arxiv.org/pdf/1605.07146v2.pdf
# https://paperswithcode.com/method/wideresnet

In [None]:
import pandas as pd
from PIL import Image
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import os
import numpy as np

In [None]:
# Random seed
import random


def seed_everything(seed):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)


seed_everything(2000)

In [None]:
# Check gpu
device = "mps" if torch.backends.mps.is_built() else "cuda" if torch.cuda.is_available() else "cpu"
print(device)

In [None]:
# Prepare Data
num_classes = 10  #number of classes
epochs = 500    # 200, 250, 300, 350, 400, 450
batch_size = 128 # 256

In [None]:
train_folder_path = "../stat940w24dc1/train/train"
test_folder_path = "../stat940w24dc1/test/test"
train_labels_path = "../stat940w24dc1/train_labels.csv"
df = pd.read_csv(train_labels_path)


# Train data set uses excel file to get its label
class TrainDataset(torch.utils.data.Dataset):
    def __init__(self, root, dataframe, transform=None):
        self.root = root
        self.df = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        img_path = self.root + "/" + str(self.df.iloc[idx, 0]) + ".jpg"
        label = int(self.df.iloc[idx, 1])
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        return image, label


# Test dataset does not have labels, uses file name as label
# Use label to create the output excel
class TestDataset(torch.utils.data.Dataset):
    def __init__(self, root, transform=None):
        self.root_folder = root
        self.transform = transform
        self.images = os.listdir(root)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = self.images[idx]
        img_path = os.path.join(self.root_folder, img_name)
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)
        label = img_name[:-4]

        return image, label


In [None]:
transform = transforms.Compose([
    transforms.ToTensor()
])

train = TrainDataset(root=train_folder_path, dataframe=df, transform=transform)
train_loader = DataLoader(train, batch_size=batch_size, shuffle=False)

# Normalize using mean and std of the training set
mean_list = []
std_list = []

for batch_idx, (inputs, targets) in enumerate(train_loader):
    batch_mean = np.mean(inputs.numpy(), axis=(0, 2, 3))
    batch_std = np.std(inputs.numpy(), axis=(0, 2, 3))

    mean_list.append(batch_mean)
    std_list.append(batch_std)

train_mean = np.mean(mean_list, axis=0)
train_std = np.mean(std_list, axis=0)

In [None]:
transform = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std)
])

# Test transform, try to get as much detail as possible
test_transform = transforms.Compose([
    transforms.RandomAdjustSharpness(5),
    transforms.ToTensor(),
    transforms.Normalize(train_mean, train_std)
])

train = TrainDataset(root=train_folder_path, dataframe=df, transform=transform)
test = TestDataset(root=test_folder_path, transform=test_transform)

train_loader = DataLoader(train, batch_size=batch_size, shuffle=True)  # shuffle training examples
test_loader = DataLoader(test, batch_size=100, shuffle=False)  # don't shuffle test set

In [None]:
# This is a wide resnet 28x10

def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=True,
        dilation=dilation,
    )


def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=True)


# One resnet block (with identity)
# bn -> con -> relu
class ResBlock(nn.Module):
    def __init__(self, in_planes, out_planes, dropout_rate, identity, stride=1):
        super(ResBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = conv3x3(in_planes, out_planes)
        self.relu1 = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout_rate)
        self.bn2 = nn.BatchNorm2d(out_planes)
        self.relu2 = nn.ReLU()
        self.conv2 = conv3x3(out_planes, out_planes, stride=stride)

        # "identity" convolution
        self.identity = nn.Sequential()
        if identity:
            self.identity = nn.Sequential(
                conv1x1(in_planes, out_planes, stride=stride),
            )

    def forward(self, x):
        x_org = x
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv1(x)
        x = self.dropout(x)
        x = self.bn2(x)
        x = self.relu2(x)
        x = self.conv2(x)
        x += self.identity(x_org)

        return x


# each conv group will have 4 ResBlocks, or 4 x 2 convolutions, for a total of 8 per group. 3 "identity" convolution , 1 initial convolution, total 28
class WideResnet28x10(nn.Module):
    def __init__(self, num_classes):
        super(WideResnet28x10, self).__init__()
        self.in_planes = 16

        # 28 * 10
        # multiply by 10
        self.conv1 = conv3x3(3, 16)
        self.layer1 = self.conv_group(160)
        self.layer2 = self.conv_group(320, 2)
        self.layer3 = self.conv_group(640, 2)
        self.bn1 = nn.BatchNorm2d(640, momentum=0.9)
        self.relu1 = nn.ReLU()
        self.avg = nn.AvgPool2d(8)
        self.linear = nn.Linear(640, num_classes)

    def conv_group(self, out_planes, stride=1):
        # 1 "identity" convolution for each group
        layers = [ResBlock(self.in_planes, out_planes, 0.3, True, stride)]
        self.in_planes = out_planes

        for i in range(3):
            layers.append(ResBlock(self.in_planes, out_planes, 0.3, False))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.avg(x)
        x = x.view(x.size(0), -1)
        x = self.linear(x)

        return x



In [None]:
net = WideResnet28x10(10)
net = net.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()  # cross entropy loss for loss function
# lr = 0.02
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9,
                      weight_decay=5e-4)  # stochastic gradient descent as optimizer

In [None]:
for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(train_loader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        # move the data to GPU
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 50 == 0:  # print every 10 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 50:.3f}')
            running_loss = 0.0

print('Finished Training')

In [None]:
PATH = './cifar_net_wide.pth'

torch.save(net.state_dict(), PATH)  # save model to path

In [None]:
# Make prediction on test
labels = []
predicted = []
for batch_idx, (inputs, targets) in enumerate(test_loader):
    with torch.no_grad():
        net.eval()
        inputs = inputs.to(device)  # move images to gpu
        outputs = net(inputs)
        _, predict = torch.max(outputs, 1)
        labels.extend(list(targets))
        predicted.extend(predict.tolist())

In [None]:
d = {'id': list(labels), 'label': predicted}
df = pd.DataFrame(data=d)
df.to_csv('out_wide.csv', index=False)