In [1]:
import os, torch, json
import cv2 as cv
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from math import floor

In [2]:
MAPPING_FILE = "./train/mapping.txt"
CLASSES_FILE = "./train/classes.txt"
IMAGES_PATH = "./train/images"

INPUT_SIZE = (64, 64)

In [3]:
def resize_image(a: np.ndarray, size: tuple, preserve_aspect_ratio: bool = True, debug: bool = False):
    assert len(a.shape) == 3, "input matrix should have a shape HxWx3"
    assert a.shape[-1] == 3, "input matrix should be a 3 channels image"

    (height, width, _) = a.shape

    (output_height, output_width) = size
    assert  output_height > 0 and output_width > 0

    aspect_ratio = 1.0
    if preserve_aspect_ratio:
        aspect_ratio = float(width) / float(height)
    
    (adjusted_height, adjusted_width) = size
    if aspect_ratio > 1.0:
        adjusted_height = int(output_height * (aspect_ratio - 1))
    elif aspect_ratio < 1.0:
        adjusted_width = int(output_width * aspect_ratio)
    
    offset_x, offset_y = floor(abs(output_width - adjusted_width) / 2), floor(abs(output_height - adjusted_height) / 2)
    assert offset_x >= 0 and offset_y >= 0, "offset_x and offset_y should be equal or greater than zero"
    
    if debug:
        print(f"Input:\theight={height} width={width} (ratio: {aspect_ratio})")
        print(f"Adj:\theight={adjusted_height} width={adjusted_width}")
        print(f"Output:\theight={output_height} width={output_width}")
        print(f"Offset:\tx={offset_x} y={offset_y}")

    result = cv.resize(a.astype('float32'), (adjusted_width, adjusted_height), interpolation=cv.INTER_NEAREST).astype(int)
    result = np.pad(result, [(offset_y, offset_y), (offset_x, offset_x), (0, 0)], mode="constant")

    # Because of round values we can get errors between 1 and -1 pixels
    if result.shape[0] != output_height:
        delta = abs(result.shape[0] - output_height)
        result = np.pad(result, [(0, delta), (0, 0), (0, 0)], mode="constant")
    if result.shape[1] != output_width:
        delta = abs(result.shape[1] - output_width)
        result = np.pad(result, [(0, 0), (0, delta), (0, 0)], mode="constant")
    
    assert result.shape[:-1] == size, "output should match expected size"
    
    return result.astype(np.double)

In [4]:
class VGG19(nn.Module):
    def __init__(self, image_size=(64, 64), *args, **kwargs):
        super().__init__(*args, **kwargs)
        height, width = image_size
        
        # Block 1
        self.block1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2), stride=2)
        ).to("cuda")

        # Block 2
        self.block2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3)),
            nn.ReLU()
        ).to("cuda")

        # Block 3
        self.block3 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=(3, 3)),
            nn.ReLU()
        ).to("cuda")

        # Block 4
        self.block4 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3, 3)),
            nn.ReLU()
        ).to("cuda")

        # Block 5
        self.block5 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3, 3)),
            nn.ReLU(),
            nn.Conv2d(in_channels=512, out_channels=512, kernel_size=(3, 3)),
            nn.ReLU()
        ).to("cuda")

        # FCL
        self.fcl = nn.Sequential(
            nn.Linear(17, 4096),
            nn.Linear(4096, 4096),
            nn.Linear(4096, 2),
            nn.Softmax(dim=1)
        ).to("cuda")
    
    def forward(self, x: torch.Tensor):
        x = x.transpose(-1, -3)
        
        return self.fcl(
            self.block5(
                self.block4(
                    self.block3(
                        self.block2(
                            self.block1(x)
                        )
                    )
                )
            )
        )

In [5]:
classes = None
x = []
y = []
with open(CLASSES_FILE, "r") as cf:
    nb_classes = len(cf.readlines())
    classes = torch.Tensor(np.diag((nb_classes, nb_classes)) / nb_classes)

In [6]:
class SegmentDataset(Dataset):
    def __init__(self, input_size: tuple = (64, 64)):
        super().__init__()
        self.x = []
        self.y = []
        with open(MAPPING_FILE, "r") as mf:
            annotations = mf.readlines()
            nb_images = len(annotations)
            for i in range(0, nb_images):
                [img_name, classe] = annotations[i].split(" ")
                self.x.append(
                    torch.Tensor(
                        resize_image(
                            cv.imread(f"{IMAGES_PATH}/{img_name}"),
                            input_size,
                            False
                        )
                    ).to("cuda")
                )
                self.y.append(classes[:, int(classe.replace("\n", ""))])
    
        assert len(x) == len(y)
    
    def __getitem__(self, index):
        return (self.x[index], self.y[index])

    def __len__(self):
        return len(self.x)

In [None]:
def train(model, dataset, loss_fn, optimizer, epochs=2000, batch_size=16):
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
    
    for i in range(epochs):
        for x, y in dataloader:
            optimizer.zero_grad()
            result = model(x)
            print(x.shape, result.shape, y.shape)
            loss = loss_fn(result, y)
            loss.backward()

        optimizer.step()

        print(f'epoch: {i:3} loss: {loss.item():10.8f}')

    print(f'epoch: {i:3} loss: {loss.item():10.10f}')

In [8]:
dataset = SegmentDataset(INPUT_SIZE)
model = VGG19(INPUT_SIZE).to("cuda")
optimizer = torch.optim.Adam(model.parameters())

train(model, dataset, nn.CrossEntropyLoss().to("cuda"), optimizer)

torch.Size([16, 64, 64, 3]) torch.Size([16, 512, 17, 2]) torch.Size([16, 2])


RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu! (when checking argument for argument target in method wrapper_CUDA__nll_loss2d_forward)