In [1]:
%matplotlib inline
import time
from glob import glob
import re
import os
from collections import namedtuple

import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
from torch import nn, optim
from torchvision import datasets, transforms
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset


### Settings

In [2]:
epochs = 50
batch_size = 10
learning_rate = 0.01

train_root = 'src_data/Train'
test_root = 'src_data/Test'

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

### DataSet

In [3]:

class TinyDataset(Dataset):
    
    def __init__(self, folder, labelfile, transform=None):
        image_pat = re.compile('.*\.({})'.format('|'.join(['jpg', 'png', 'jpeg', 'tif', 'ppm', 'bmp', 'tiff'])))
        self.folder = folder
        self.labelfile = labelfile
        self.transform = transform
    
        if not os.path.isfile(labelfile):
            print('{} not found'.format(labelfile))
            return
        
        self.samples = []
        
        with open(labelfile, 'r') as f:
            for line in f:
                items = line.split(' ')
                sample = {'image': os.path.join(self.folder, items[0]), 'brand': int(items[1]), 'type': int(items[2])}
                self.samples.append(sample)
    
    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        sample = self.samples[idx]
        if not os.path.isfile(sample['image']):
            print('{} not found'.format(sample['image']))
            return None
        
        image = Image.open(sample['image'])
        if self.transform:
            image = self.transform(image)
        tensor_sample = sample.copy()
        tensor_sample['image'] = image
        
        return tensor_sample

### Network

In [4]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        
        #3x500x500 => 3x250x250
        self.conv_1 = nn.Conv2d(3, 3, kernel_size=(3, 3), padding=1)
        self.pool_1 = nn.MaxPool2d(kernel_size=(2, 2))
        
        #6x250x250 => 6x125x125
        self.conv_2 = nn.Conv2d(3, 6, kernel_size=(3, 3), padding=1)
        self.pool_2 = nn.MaxPool2d(kernel_size=(2, 2))
        
        self.drop = nn.Dropout2d(0.2)
        
        self.fc1 = nn.Linear(6*125*125, 100)
        
        self.fc2 = nn.Linear(100, 3)
        self.softmax1 = nn.Softmax(dim=1)
        self.fc3 = nn.Linear(100, 2)
        self.softmax2 = nn.Softmax(dim=1)
    
    def forward(self, x):
        x = self.conv_1(x)
        x = F.relu(x)
        x = self.pool_1(x)
        
        x = self.conv_2(x)
        x = F.relu(x)
        x = self.pool_2(x)
        
        x = x.view(x.shape[0], -1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.drop(x)
        
        x_brand = self.softmax1(self.fc2(x))
        x_type = self.softmax2(self.fc3(x))
        
        return x_brand, x_type
        
        

### Train

In [6]:
def compute_accuracy(model, loader, device):
    accuracy_brand = 0
    accuracy_type = 0
    for sample in loader:
        image = sample['image'].to(device)
        y_brand = sample['brand'].to(device)
        y_type = sample['type'].to(device)
        
        x_brand, x_type = model(image)
        x_brand = x_brand.view(-1, 3)
        x_type = x_type.view(-1, 2)
        
        _, pred_brand = torch.max(x_brand, 1)
        _, pred_type = torch.max(x_type, 1)
        
        correct_brand = torch.sum(pred_brand == y_brand)
        correct_type = torch.sum(pred_type == y_type)
        
        accuracy_brand += correct_brand
        accuracy_type += correct_type
        
        
    
    accuracy_brand = accuracy_brand.float() / len(loader.dataset)
    accuracy_type = accuracy_type.float() / len(loader.dataset)
    
    return accuracy_brand, accuracy_type
        
def train(model, optimizer, trainloader, testloader, device, criterion, epochs=10):

    start_time = time.time()
    for e in range(epochs):
        running_loss = 0.0

        for sample in trainloader:
            model.train()

            image, y_brand, y_type = sample
            image = sample['image'].to(device)
            y_brand = sample['brand'].to(device)
            y_type = sample['type'].to(device)
            
            optimizer.zero_grad()
            
            x_brand, x_type = model(image)

            x_brand = x_brand.view(-1, 3)
            x_type = x_type.view(-1, 2)

            _, pred_brand = torch.max(x_brand, 1)
            _, pred_type = torch.max(x_type, 1)

            loss = criterion(x_brand, y_brand) + criterion(x_type, y_type)
            
            loss.backward()
            optimizer.step()

            model.eval()

            running_loss += loss.item() * image.size(0)
        
        print('{}/{}: Time: {:.2f}mm, Train Loss: {:.4f}, Acc_brand: {:.2f}, Acc_type: {:.2f}'.format(e+1, epochs,
                                                                                      (time.time() - start_time)/60,
                                                                                      running_loss,
                                                                                      *compute_accuracy(model, trainloader, device)))
        print('{}/{}: Test Acc_brand: {:.2f} Acc_type: {:.2f}'.format(e+1, epochs, *compute_accuracy(model, testloader, device)))
        
        

### Main Process

In [None]:
train_transform = transforms.Compose([transforms.Resize((500, 500)),
                                      transforms.ToTensor(),
                                      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                               ])
test_transform = transforms.Compose([transforms.Resize((500, 500)),
                                    transforms.ToTensor(),
                                     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                               ])
trainset = TinyDataset(train_root, os.path.join(train_root, 'Label.TXT'), transform=train_transform)
testset = TinyDataset(test_root, os.path.join(test_root, 'Label.TXT'), transform=test_transform)

trainloader = DataLoader(trainset, shuffle=True, batch_size=batch_size)
testloader = DataLoader(testset)

model = Net()
model.to(device)
optimizer = optim.Adam(model.parameters(), lr=0.03)

train(model, optimizer, trainloader, testloader, device, nn.CrossEntropyLoss(), epochs)

1/50: Train Loss: 122.2022, Acc_brand: 0.33, Acc_type: 0.50
1/50: Test Acc_brand: 0.33 Acc_type: 0.50
2/50: Train Loss: 121.8824, Acc_brand: 0.33, Acc_type: 0.50
2/50: Test Acc_brand: 0.33 Acc_type: 0.50
3/50: Train Loss: 121.8824, Acc_brand: 0.33, Acc_type: 0.50
3/50: Test Acc_brand: 0.33 Acc_type: 0.50
4/50: Train Loss: 121.8824, Acc_brand: 0.33, Acc_type: 0.50
4/50: Test Acc_brand: 0.33 Acc_type: 0.50
5/50: Train Loss: 121.8824, Acc_brand: 0.33, Acc_type: 0.50
5/50: Test Acc_brand: 0.33 Acc_type: 0.50
6/50: Train Loss: 121.8824, Acc_brand: 0.33, Acc_type: 0.50
6/50: Test Acc_brand: 0.33 Acc_type: 0.50
