In [1]:
import os
from PIL import Image
from IPython.display import display

import torch
import torch.nn.functional as F

from torch import nn
from torch import optim

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

torch.set_printoptions(sci_mode=False)
torch.set_default_tensor_type('torch.FloatTensor')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class SignsDataset(Dataset):
    def __init__(self, base_path, prefix='train', transform=None):
        path = os.path.join(base_path, f'{prefix}_signs')
        files = os.listdir(path)

        self.filenames = [ os.path.join(path, file) for file in files if file.endswith('.jpg') ]
        self.tagets = [ int(file[0]) for file in files]
        self.transform = transform

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, index):
        image = Image.open(self.filenames[index])
        if self.transform:
            image = self.transform(image)

        return image, self.tagets[index]

In [3]:
class SignsDataset(Dataset):
    def __init__(self, base_dir, split_name='train', transform=None):
        path = os.path.join(base_dir, '{}_signs'.format(split_name))
        files = os.listdir(path)

        self.filenames = [os.path.join(path, file) for file in files if file.endswith('.jpg')]
        self.targets = [int(file[0]) for file in files]
        self.transform = transform

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self,index):
        image = Image.open(self.filenames[index])

        if self.transform:
            image = self.transform(image)

        return image, self.targets[index]

In [4]:
transform = transforms.Compose(
  [transforms.RandomHorizontalFlip(), #data augmentation
   transforms.ToTensor(),
   transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))    
  ]
)

trainloader = DataLoader( SignsDataset('../datasets/64x64_SIGNS/', 'train', transform), batch_size=32, shuffle=True )
valloader = DataLoader( SignsDataset('../datasets/64x64_SIGNS/', 'val', transform), batch_size=32, shuffle=True )
testloader = DataLoader( SignsDataset('../datasets/64x64_SIGNS/', 'test', transform), batch_size=32, shuffle=True )

dataloaders = {
    'train' : trainloader,
    'val' : valloader,
    'test' : testloader,
}

In [5]:
class Model(nn.Module):
    def __init__(self, n_channels):
        super(Model, self).__init__()
        self.n_channels = n_channels

        self.conv1 = nn.Conv2d(3, self.n_channels, 3, 1, 1)
        self.bn1 = nn.BatchNorm2d(self.n_channels)
        
        self.conv2 = nn.Conv2d(self.n_channels, self.n_channels*2, 3, 1, 1)
        self.bn2 = nn.BatchNorm2d(self.n_channels)
        
        self.conv3 = nn.Conv2d(self.n_channels*2, self.n_channels*4, 3, 1, 1)
        self.bn3 = nn.BatchNorm2d(self.n_channels)

        self.fc1 = nn.Linear(self.n_channels*4*8*8, self.n_channels*4)
        self.fcbn1 = nn.BatchNorm1d(self.n_channels*4)

        self.fc2 = nn.Linear(self.n_channels*4, 6)


    def forward(self, x):
        x = self.bn1( self.conv1(x) )
        x = F.relu( F.max_pool2d(x, 2) )

        x = self.bn2( self.conv2(x) )
        x = F.relu( F.max_pool2d(x, 2) )
        
        x = self.bn3( self.conv3(x) )
        x = F.relu( F.max_pool2d(x, 2) )

        x = x.view(-1, self.n_channels*4*8*8)

        x = F.relu( self.fcbn1( self.fc1(x) ) )
        x = F.dropout(x, p=0.8, training=True)
        x = self.fc2(x)

        x = F.log_softmax(x, dim=1)

        return x

In [6]:
class RunningMetric():
    def __init__(self):
        self.data = 0
        self.n_data = 0

    def update(self, val, size):
        self.data += val
        self.n_data += size

    def __call__(self):
        return self.data/float(self.n_data)

In [7]:
def train_and_evaluate(model, optimizer, loss_fn, dataloaders, device, n_epochs=10, lr=0.001):
    for g in optimizer.param_groups:
        g['lr'] = lr

    for epoch in range(n_epochs):
        print(f'Epoch {epoch + 1} / {n_epochs} \n', '_'*45)

        for phase in ['train', 'val']:
            train = True if phase == 'train' else False
            
            model.train() if train else model.eval()

            running_loss = RunningMetric() 
            running_acc = RunningMetric()

            for inputs, targets in dataloaders[phase]:
                inputs, targets = inputs.to(device), targets.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(train):
                    outputs = model(inputs)
                    _, predictions = torch.max(outputs, 1)
                    loss = loss_fn(outputs, targets)
                    if train:
                        loss.backward()
                        optimizer.step()

                batch_size = inputs.shape[0]
                running_loss.update(loss.item()*batch_size, batch_size)
                running_acc.update(torch.sum(predictions == targets).float(), batch_size)

            print(phase)
            print('Loss:  {:.4f}     Acc:  {:.3%}\n'.format(running_loss(), running_acc()))

    return model

In [8]:
from torchvision import models

vgg = models.vgg16(pretrained=True)

In [9]:
last_sequential_layer = list(vgg.children())[-1]
last_sequential_layer

Sequential(
  (0): Linear(in_features=25088, out_features=4096, bias=True)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): Linear(in_features=4096, out_features=4096, bias=True)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): Linear(in_features=4096, out_features=1000, bias=True)
)

In [10]:
*list_of_layers, last_layer = list(last_sequential_layer.children())
display(list_of_layers)
display(last_layer)

[Linear(in_features=25088, out_features=4096, bias=True),
 ReLU(inplace=True),
 Dropout(p=0.5, inplace=False),
 Linear(in_features=4096, out_features=4096, bias=True),
 ReLU(inplace=True),
 Dropout(p=0.5, inplace=False)]

Linear(in_features=4096, out_features=1000, bias=True)

In [11]:
in_features = last_layer.in_features
in_features

4096

In [12]:
vgg.fc = nn.Linear(in_features, 6)
vgg.fc.requires_grad = True

vgg.fc

Linear(in_features=4096, out_features=6, bias=True)

In [13]:
vgg.classifier = nn.Sequential( *(list_of_layers + [vgg.fc]) )
vgg.classifier

Sequential(
  (0): Linear(in_features=25088, out_features=4096, bias=True)
  (1): ReLU(inplace=True)
  (2): Dropout(p=0.5, inplace=False)
  (3): Linear(in_features=4096, out_features=4096, bias=True)
  (4): ReLU(inplace=True)
  (5): Dropout(p=0.5, inplace=False)
  (6): Linear(in_features=4096, out_features=6, bias=True)
)

In [14]:
model = Model(32).to(device)
loss_fn = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum = 0.9)

In [15]:
inputs, targets = next(iter(dataloaders["train"]))
targets, inputs


(tensor([3, 0, 3, 0, 3, 3, 3, 5, 5, 3, 0, 4, 2, 4, 2, 0, 5, 4, 3, 5, 1, 3, 1, 5,
         2, 0, 5, 5, 3, 5, 4, 2]),
 tensor([[[[ 0.7412,  0.7647,  0.7882,  ...,  0.7882,  0.7882,  0.7882],
           [ 0.7412,  0.7647,  0.7882,  ...,  0.7882,  0.7882,  0.7882],
           [ 0.7412,  0.7647,  0.7882,  ...,  0.7882,  0.7882,  0.7882],
           ...,
           [ 0.5843,  0.5843,  0.5765,  ...,  0.0588,  0.3725,  0.4431],
           [ 0.5843,  0.5843,  0.5765,  ..., -0.1686,  0.1451,  0.4118],
           [ 0.5843,  0.5843,  0.5765,  ..., -0.3255, -0.0745,  0.3098]],
 
          [[ 0.7882,  0.7725,  0.7647,  ...,  0.7490,  0.7490,  0.7490],
           [ 0.7882,  0.7725,  0.7647,  ...,  0.7490,  0.7490,  0.7490],
           [ 0.7882,  0.7725,  0.7647,  ...,  0.7490,  0.7490,  0.7490],
           ...,
           [ 0.5451,  0.5451,  0.5373,  ..., -0.2157,  0.1059,  0.1686],
           [ 0.5451,  0.5451,  0.5373,  ..., -0.4353, -0.1216,  0.1451],
           [ 0.5451,  0.5451,  0.5373,  ..., -

In [16]:
vgg = vgg.to(device)
_, preds = torch.max(vgg(inputs.to(device)),1)
preds

tensor([1, 3, 4, 2, 1, 2, 1, 4, 2, 2, 4, 1, 4, 4, 0, 5, 3, 5, 1, 1, 2, 1, 5, 2,
        4, 3, 2, 1, 2, 2, 0, 1], device='cuda:0')

In [17]:
vgg(inputs.to(device)).size()

torch.Size([32, 6])

In [None]:
train_and_evaluate(vgg, optimizer, loss_fn, dataloaders, device, n_epochs=100)
