# Load Packages

In [13]:
import os
import glob
import random
import pandas as pd
import numpy as np
from numpy import linalg
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torchvision
from torchvision import transforms
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from utils import averageMeter

# Setup Data

In [11]:
# setup transform
# train_transforms = transforms.ToTensor()
# val_transforms = transforms.ToTensor()

# TODO: INCOMPLETE, have not loaded labels
h_size = 2384
w_size = 4240

# load dataset
drive_path = "D:\\Edited\\1\\"
box_paths = "D:\\Edited\\1\\*\\*.csv"
day_folders = []
for i in range(10,24):
    day_folders.append(drive_path + "1807" + str(i) + " - Edited")

box_paths = glob.glob(box_paths)
set_len = len(box_paths)
print("Number of labeled images: " + str(set_len))
subset_len = set_len // 5
# train_len = set_len - test_len

random.shuffle(box_paths)
test_names = box_paths[:subset_len]
train_names1 = box_paths[subset_len:subset_len*2]
train_names2 = box_paths[subset_len*2:subset_len*3]
train_names3 = box_paths[subset_len*3:subset_len*4]
train_names4 = box_paths[subset_len*4:]

train_dataset = np.array([np.array(Image.open(fname[:-4] + ".jpg")) for fname in train_names1])
test_dataset = np.array([np.array(Image.open(fname[:-4] + ".jpg")) for fname in test_names])

# # unit-test
# train_loader = DataLoader(train_dataset, batch_size=4)
# value = next(iter(train_loader))
# data = value[0]
# target = value[1]
# print(data.shape)
# print(target.shape)

Number of labeled images: 2738


# Training and Testing Functions

In [None]:
# TODO: INCOMPLETE, change to Euclidean norm-based accuracy? since no longer a discrete class prediction
def compute_accuracy(output, target):
    ########### TODO ###########
    # compute the accuracy of the prediction
    predict = torch.argmax(output, 1)
    total = target.size()[0]
    same = torch.sum(predict == target)
    acc = same/total
    ######## End of TODO #######
    return acc

In [None]:
def train(data_loader, model, optimizer, epoch, criterion, device='cpu'):
    # set the model to training mode
    model.train()

    losses = averageMeter()
    accuracy = averageMeter()
    for (step, value) in enumerate(data_loader):

        data = value[0].to(device)
        target = value[1].to(device)

        optimizer.zero_grad()

        # forward the data to the model and get the output
        output = model(data)

        # compute accuracy
        acc = compute_accuracy(output, target)
        accuracy.update(acc.item(), data.size(0))

        # compute loss with the output and the target
        loss = criterion(output, target)

        # backward (PyTorch computes backpropagation for you)
        loss.backward()

        optimizer.step()

        # accumulate losses
        losses.update(loss.item(), data.size(0))

    print(f"[Epoch: {epoch}]\t lr: {optimizer.param_groups[0]['lr']:.4g}\t \
      loss_train: {losses.avg:.4f}\tacc_train: {accuracy.avg:.4f}")
    return losses.avg, accuracy.avg

In [None]:
@torch.no_grad()
def test(data_loader, model, criterion, device='cpu'):
    # set the model to evaluation mode
    model.eval()

    losses = averageMeter()
    accuracy = averageMeter()
    for (step, value) in enumerate(data_loader):

        data = value[0].to(device)
        target = value[1].to(device)

        # forward the data to the model and get the output
        output = model(data)

        # compute accuracy
        acc = compute_accuracy(output, target)
        accuracy.update(acc.item(), data.size(0))

        # compute loss with the output and the target
        loss = criterion(output, target)

        # accumulate losses
        losses.update(loss.item(), data.size(0))

    return losses.avg, accuracy.avg

# CNN Classifier

In [None]:
# TODO: untested, should be updated to increase accuracy as necessary
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        ########### TODO ###########
        self.conv1 = nn.Conv2d(1,4,3)
        self.relu = nn.ReLU()
        self.avgpool = nn.AvgPool2d(2)

        self.conv2 = nn.Conv2d(4,16,3)

        self.conv3 = nn.Conv2d(16,32,3)

        self.conv4 = nn.Conv2d(32,32,3)

        self.conv5 = nn.Conv2d(32,32,3)

        self.out = nn.Linear(1237152, 4)

        ######## End of TODO #######

    def forward(self, x):
        ########### TODO ###########
        # forward the input x to the model to get the output
        # 2384 x 4240
        conv1x = self.conv1(x)
        # 2382 x 4238 x 4
        conv1x = self.relu(conv1x)
        # print(conv1x.shape)

        # 2382 x 4238 x 4
        conv2x = self.conv2(conv1x)
        # 2380 x 4236 x 16
        conv2x = self.relu(conv2x)
        conv2x = self.avgpool(conv2x)
        # print(conv2x.shape)

        # 1190 x 2118 x 16
        conv3x = self.conv3(conv2x)
        # 1188 x 2116 x 32
        conv3x = self.relu(conv3x)
        # print(conv3x.shape)
        conv3x = self.avgpool(conv3x)

        # 594 x 1058 x 32
        conv4x = self.conv4(conv3x)
        # 592 x 1056 x 32
        conv4x = self.relu(conv4x)
        conv4x = self.avgpool(conv4x)

        # 296 x 528 x 32
        conv4x = self.conv4(conv3x)
        # 294 x 526 x 32
        conv4x = self.relu(conv4x)
        conv4x = self.avgpool(conv4x)

        # 147 x 263 x 32
        flat = torch.flatten(conv3x,1,3)
        # 1237152
        x = self.out(flat)
        ######## End of TODO #######
        return x

### Script to Train

In [None]:
def train_CNN(batch_size=64, lr=0.1, n_epoch=30, eval_epoch=5):

    # setup random seed
    torch.manual_seed(1337)
    torch.cuda.manual_seed(1337)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

    # setup model
    model = CNN()
    device = 'cpu'
    if torch.cuda.is_available():
        model = model.cuda()
        device = 'cuda'

    # setup data loader
    train_loader = DataLoader(
                    train_dataset,
                    batch_size=batch_size,
                    shuffle=True,
                    drop_last=True
                )

    test_loader = DataLoader(
                    test_dataset,
                    batch_size=128,
                    shuffle=False,
                    drop_last=False
                )

    # setup optimizer
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    ########### TODO ###########
    # setup loss function (criterion)
    criterion = nn.MSELoss()

    ######## End of TODO #######

    # training and evaluation
    losses_train = []
    accs_train = []
    losses_test = []
    accs_test = []
    for ep in range(n_epoch):

        # train an epoch and get the loss
        loss_train, acc_train = train(train_loader, model, optimizer, ep, criterion, device)
        losses_train.append(loss_train)
        accs_train.append(acc_train)

        if (ep + 1) % eval_epoch == 0:
            # evaluate current model and get the loss
            loss_test, acc_test = test(test_loader, model, criterion, device)
            losses_test.append(loss_test)
            accs_test.append(acc_test)
            print(f'[val]\tloss_test: {loss_test:.4f}\tacc_test: {acc_test:.4f}')

    # plot the training/testing loss and accuracy
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(8, 3))
    ax[0].plot(np.arange(n_epoch), losses_train, color='b', label='train_loss')
    ax[0].plot(np.linspace(0, n_epoch, n_epoch//eval_epoch), losses_test, color='r', label='test_loss')
    ax[0].legend()

    ax[1].plot(np.arange(n_epoch), accs_train, color='b', label='train_acc')
    ax[1].plot(np.linspace(0, n_epoch, n_epoch//eval_epoch), accs_test, color='r', label='test_acc')
    ax[1].legend()

In [None]:
train_CNN()