# generate

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
os.chdir("/content/drive/My Drive/deeplearning_intro/project")

# initialization

In [3]:
from model import *
import os
import re
import torch
import random
from torch import nn
from PIL import Image
import matplotlib.pyplot as plt
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader

In [25]:
label_dict = {'W': 0, 'A': 1, 'S': 2, 'D': 3, 'WA': 4, 'WD': 5, 'AS': 6, 'SD': 7, 'N': 8}
class custom_dataset(Dataset):
    """ Used for creating data loader """
    def __init__(self, data_path, input_transform):
        super(custom_dataset, self).__init__()
        self.data_path = data_path
        self.transform = input_transform

    def __getitem__(self, index):
        img_name = os.listdir(self.data_path)[index]
        key = re.findall(r'[a-zA-Z]+', img_name)[0]
        abs_img_path = os.path.join(self.data_path, img_name)
        img = process_each_pic(Image.open(abs_img_path))
        output_data = self.transform(img)
        output_label = label_dict[key]
        return output_data, output_label

    def __len__(self):
        return len(os.listdir(self.data_path))


def process_each_pic(img):
    img = img.convert('RGB')
    w, h = img.size
    output_img = Image.new('RGB', size=(max(w, h), max(w, h)), color=(0, 0, 0))
    length = int(abs(w - h) // 2)
    box = (length, 0) if w < h else (0, length)
    output_img.paste(img, box)
    return output_img


In [33]:
from torch import nn


class DriveNet(nn.Module):
    def __init__(self):
        super(DriveNet, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(5, 5), stride=(5, 5)),

            nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),

            nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),

            nn.Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),

            nn.Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),

            nn.Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)),
        )

        self.fc = nn.Sequential(
            nn.Linear(512 * 4 * 4, 512),
            nn.ReLU(),
            nn.Linear(512, 9),
        )

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(-1, 512 * 4 * 4)
        out = self.fc(out)
        return out


In [41]:
def train(net, criterion, optimizer, train_loader, device, max_epoch, is_tenCrop=False, scheduler=None):
    acc_list = []
    loss_list = []
    for epoch in range(max_epoch):
        temp_acc, temp_loss = train_once(net, criterion, optimizer, train_loader, device, is_tenCrop)
        acc_list.append(temp_acc)
        loss_list.append(temp_loss)
        print('epoch: {}'.format(epoch + 1))
        print('train loss: {:.6f}, train accuracy: {:.6f}'.format(temp_loss, temp_acc))
        if scheduler:
            scheduler.step()
    return acc_list, loss_list


def train_once(net, criterion, optimizer, loader, device, is_tenCrop=False):
    net.train()
    train_acc = 0
    train_loss = .0
    sample_num = .0
    for data, label in loader:
        optimizer.zero_grad()
        data, label = data.to(device), label.to(device)
        if is_tenCrop:
            batch_size, n_crops, chn, height, weight = data.size()
            data = data.view(-1, chn, height, weight)
            label = torch.repeat_interleave(label, repeats=n_crops, dim=0)
        prediction = net(data)
        loss = criterion(prediction, label)
        loss.backward()
        optimizer.step()
        train_acc += torch.sum((torch.argmax(prediction, dim=1) == label).type(torch.int)).item()
        train_loss += loss.item()
        sample_num += len(label)
        if device == 'cuda':
            torch.cuda.empty_cache()
    return train_acc / sample_num, train_loss / sample_num


def test2acc(net, loader, device, is_tenCrop=False):
    """
    Return the result of predicted class numbers, shape: (test_dataset_length, ), type: nd-array
    the result shows class numbers, not the probabilities of each class
    """
    net.eval()
    acc = 0.
    with torch.no_grad():
        for data, label in loader:
            data = data.to(device)
            label = label.to(device)
            if is_tenCrop:
                batch_size, n_crops, chn, height, weight = data.size()
                raw_pre = net(data.reshape(-1, chn, height, weight))
                temp = raw_pre.reshape(batch_size, n_crops, -1).mean(1)
            else:
                temp = net(data)
            acc += torch.sum((torch.argmax(temp, dim=1) == label).type(torch.int)).item()
    return acc / len(loader.dataset)

In [35]:
torch.backends.cudnn.benchmark = True
device = 'cuda' if torch.cuda.is_available() else 'cpu'
net = DriveNet().to(device)

In [36]:
train_set = custom_dataset('./dataset/train', T.ToTensor())
test_set = custom_dataset('./dataset/test', T.ToTensor())
train_loader = DataLoader(train_set, batch_size=16, num_workers=4, shuffle=True)
test_loader = DataLoader(test_set, batch_size=16, num_workers=4)

In [46]:
max_epoch = 10
lr = 1e-4
wd = 1e-5

In [47]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=lr, weight_decay=wd)

In [48]:
acc_list, loss_list = train(net, criterion, optimizer, train_loader, device, max_epoch)

epoch: 1
train loss: 0.018018, train accuracy: 0.962888
epoch: 2
train loss: 0.017406, train accuracy: 0.968286
epoch: 3
train loss: 0.016995, train accuracy: 0.970310
epoch: 4
train loss: 0.016490, train accuracy: 0.969636
epoch: 5
train loss: 0.016173, train accuracy: 0.971660
epoch: 6
train loss: 0.015655, train accuracy: 0.972335
epoch: 7
train loss: 0.015174, train accuracy: 0.974359
epoch: 8
train loss: 0.014814, train accuracy: 0.980432
epoch: 9
train loss: 0.014564, train accuracy: 0.979757
epoch: 10
train loss: 0.014210, train accuracy: 0.980432


In [55]:
acc = test2acc(net, test_loader_2, device)
print(acc)

0.6420940170940171


In [44]:
torch.save(net, './net_0.pkl')

In [54]:
test_loader_2 = DataLoader(custom_dataset('./xyx1', T.ToTensor()), batch_size=16, num_workers=4)

In [57]:
len(test_loader_2.dataset)

3744