In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import glob 
import os
import random
import numpy as np
from PIL import Image
from tqdm import tqdm
import torch.utils.data as data
import torchvision
from torchvision import models, transforms

In [2]:
torch.manual_seed(119)
np.random.seed(119)
random.seed(119)

In [3]:
class ImageTransfrom():
    def __init__(self, mean, std):
        self.data_transform = {
            'train': transforms.Compose([
                transforms.RandomCrop(32, padding=4),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize(mean, std)
            ]),
            'val': transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean, std)
            ])
        }
    
    def __call__(self, img, phase='train'):
        return self.data_transform[phase](img)

In [4]:
def make_datapath_list(phase='train'):
    rootpath = './data/'
    target_path = os.path.join(rootpath+phase+'/*.png')
    print(target_path)
    
    path_list = []
    
    for path in glob.glob(target_path):
        path_list.append(path)
    
    return path_list

In [5]:
train_list = make_datapath_list(phase='train')
val_list = make_datapath_list(phase='val')

./data/train/*.png
./data/val/*.png


In [6]:
class TrafficDataset(data.Dataset):
    def __init__(self, file_list, transform=None, phase='train'):
        self.file_list = file_list
        self.transform = transform
        self.phase = phase
        self.labels = {}

        if phase == 'train':
            with open('./data/annotations/train.txt') as f:
                for line in f:
                    line = line.strip()
                    self.labels[line.split()[0]] = line.split()[1]

        elif phase == 'val':
            with open('./data/annotations/val.txt') as f:
                for line in f:
                    line = line.strip()
                    self.labels[line.split()[0]] = line.split()[1]
            
    
    def __getitem__(self, index):
        img_path = self.file_list[index]
        img = Image.open(img_path)

        img_transformed = self.transform(img, self.phase)

        if self.phase=='train':
            label_index = img_path[13:]
            label = int(self.labels[label_index])


        elif self.phase=='val':
            label_index = img_path[11:]
            label = int(self.labels[label_index])

        return img_transformed, label

    def __len__(self):
        return len(self.file_list)

In [7]:
train_dataset = TrafficDataset(file_list=train_list, transform=ImageTransfrom(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)), phase='train')
val_dataset = TrafficDataset(file_list=val_list, transform=ImageTransfrom(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)), phase='val')

In [8]:
batch_size = 256

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

dataloaders = {'train': train_dataloader, 'val': val_dataloader}

In [9]:
batch_iterator = iter(dataloaders['train'])
inputs, labels = next(batch_iterator)
print(inputs.size())
print(labels)

torch.Size([256, 3, 32, 32])
tensor([ 7, 17,  4, 11, 27, 32, 14, 35, 15, 11, 32, 31, 37, 16, 23, 34, 18, 13,
        13,  6, 35, 36,  4, 33, 19, 24, 35, 37, 29,  6, 39, 35,  1,  3, 40, 12,
         5,  5, 22,  8, 40,  8, 29, 31, 15, 41, 12,  0, 35, 36, 35, 16, 28, 27,
        29,  8, 17,  6, 25,  4, 13, 11, 42,  0, 42,  3, 35, 32, 30,  9, 19, 41,
        20, 35,  1, 26,  8, 40, 16, 14,  0, 11, 42, 18, 17,  2, 41, 29, 26, 25,
        29, 19, 11,  4,  4, 34, 32, 15,  7, 26, 25,  3, 29, 18, 26, 19, 17, 13,
        40, 15, 33, 10, 21, 33, 42, 23,  1, 28, 28, 21, 30,  6,  3, 14, 18,  7,
        37, 10, 39, 30, 10, 39, 29,  2,  7, 39,  3,  4,  4,  5, 11, 30,  6,  5,
         3, 42, 31, 15, 11, 25, 33, 39,  5,  2, 19, 30,  8, 37, 18, 40, 19,  1,
        39,  4, 22, 28, 26, 40, 28, 26, 14, 15,  9, 11, 26, 14, 16, 28, 32, 28,
        10, 38, 26, 38,  5, 37, 35, 19, 14, 22, 27,  8,  2, 21,  1, 40, 10, 19,
        25, 12, 29, 41,  1, 18,  5, 26, 11, 13, 28, 14,  9,  7, 23, 38, 17, 39,
        17,

---

## Basic ResNet Model

In [10]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

EPOCHS = 300

In [11]:
class BasicBlock(nn.Module):
    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shorcut = nn.Sequential()
        if stride != 1 or in_planes != planes:
            self.shorcut = nn.Sequential(
                nn.Conv2d(in_planes, planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes)
            )
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shorcut(x)
        out = F.relu(out)
        return out

In [12]:
class ResNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 16

        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(16)
        self.layer1 = self._make_layer(16, 2, stride=1)
        self.layer2 = self._make_layer(32, 2, stride=2)
        self.layer3 = self._make_layer(64, 2, stride=2)
        self.linear = nn.Linear(64, num_classes)

    def _make_layer(self, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(BasicBlock(self.in_planes, planes, stride))
            self.in_planes = planes
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, 8)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

In [13]:
net = ResNet(num_classes=43).to(DEVICE)
EPOCHS = 300
optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)
criterion = nn.CrossEntropyLoss()

In [14]:
def train_model(net, dataloaders_dict, criterion, optimizer, num_epochs):
    
    torch.backends.cudnn.benchmark = True
    
    for epoch in range(num_epochs):
        
        if (epoch+1)  % 10 == 0:
            print('Epoch {}/{}'.format(epoch+1, num_epochs))
            print('-------------')

        for phase in ['train', 'val']:
            if phase == 'train':
                net.train()
            else:
                net.eval()

            epoch_loss = 0.0
            epoch_corrects = 0

            if (epoch == 0) and (phase == 'train'):
                continue

            for inputs, labels in dataloaders_dict[phase]:
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = net(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                epoch_loss += loss.item() * inputs.size(0)
                epoch_corrects += torch.sum(preds == labels.data)
            
            if phase == 'train':
                scheduler.step()

            epoch_loss = epoch_loss / len(dataloaders_dict[phase].dataset)
            epoch_acc = epoch_corrects.double() / len(dataloaders_dict[phase].dataset)

            if (epoch+1)  % 10 == 0:
                print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

In [15]:
train_model(net, dataloaders, criterion, optimizer, num_epochs=EPOCHS)

Epoch 10/300
-------------
train Loss: 0.0786 Acc: 0.9784
val Loss: 0.5067 Acc: 0.8794
Epoch 20/300
-------------
train Loss: 0.0553 Acc: 0.9859
val Loss: 0.2241 Acc: 0.9293
Epoch 30/300
-------------
train Loss: 0.0472 Acc: 0.9881
val Loss: 0.3014 Acc: 0.9195
Epoch 40/300
-------------
train Loss: 0.0490 Acc: 0.9876
val Loss: 0.4696 Acc: 0.8642
Epoch 50/300
-------------
train Loss: 0.0443 Acc: 0.9888
val Loss: 0.2671 Acc: 0.9349
Epoch 60/300
-------------
train Loss: 0.0059 Acc: 0.9997
val Loss: 0.0923 Acc: 0.9762
Epoch 70/300
-------------
train Loss: 0.0063 Acc: 0.9998
val Loss: 0.0966 Acc: 0.9719
Epoch 80/300
-------------
train Loss: 0.0066 Acc: 0.9998
val Loss: 0.0955 Acc: 0.9741
Epoch 90/300
-------------
train Loss: 0.0077 Acc: 0.9995
val Loss: 0.1173 Acc: 0.9637
Epoch 100/300
-------------
train Loss: 0.0094 Acc: 0.9992
val Loss: 0.1214 Acc: 0.9698
Epoch 110/300
-------------
train Loss: 0.0046 Acc: 1.0000
val Loss: 0.1042 Acc: 0.9728
Epoch 120/300
-------------
train Loss: 0

In [17]:
save_path = './weights/ResNet.pth'
torch.save(net.state_dict(), save_path)

---