In [5]:
import os

In [13]:
# !nvidia-smi

In [3]:
os.environ["CUDA_VISIBLE_DEVICES"]="4"

In [None]:
from pathlib import Path
from tqdm.notebook import tqdm
from matplotlib import pyplot as plt
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchvision.transforms.v2 as T
from torch import nn
import cv2
from pylab import imshow
import torch.nn.functional as F
from numpy import array
import random

from torch.utils import data
import PIL.Image
import albumentations as A
import albumentations.pytorch.transforms
import glob

In [5]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DEVICE

device(type='cuda')

In [6]:
def read_csv(filename):
    res = {}
    with open(filename) as fhandle:
        next(fhandle)
        for line in fhandle:
            parts = line.rstrip('\n').split(',')
            coords = array([float(x) for x in parts[1:]], dtype='float64')
            res[parts[0]] = coords
    return res

In [6]:
train_images_path = Path().absolute() / 'public_tests' / '00_test_img_input' / 'train' / 'images'
train_gt_path = Path().absolute() / 'public_tests' / '00_test_img_input' / 'train' / 'gt.csv'
assert train_images_path.exists(), train_images_path.absolute()
assert train_gt_path.exists(), train_gt_path.absolute()

train_gt = read_csv(train_gt_path)

In [9]:
class MyCustomDataset(data.Dataset):
    def __init__(self, mode, root_dir=train_images_path,
                 train_fraction=0.8, split_seed=42, transform=None,):

        paths = []
        labels = []
        rng = random.Random(split_seed)

        cls_paths = sorted(glob.glob(f"{root_dir}/*"))
        split = int(train_fraction * len(cls_paths))
        rng.shuffle(cls_paths)

        if mode == "train":
            cls_paths = cls_paths[:split]
        elif mode == "valid":
            cls_paths = cls_paths[split:]
        else:
            raise RuntimeError(f"Invalid mode: {mode!r}")

        paths.extend(cls_paths)
        labels.extend(train_gt[sample.split('/')[-1]] for sample in cls_paths)

        self._len = len(paths)
        self._paths = paths
        self._labels = np.array(labels)

        if transform is None:
            transform = DEFAULT_TRANSFORM
        self._transform = transform
        self._img_size = NETWORK_SIZE[0]

    def __len__(self):
        return len(self._paths)

    def __getitem__(self, index):
        img_path = self._paths[index]
        label = self._labels[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        tr = self._transform(image=image, keypoints=label.reshape(14, 2))
        image = tr["image"]
        label = tr["keypoints"].flatten()

        return image, label

In [11]:
NETWORK_SIZE = (224, 224)
BATCH_SIZE = 16

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD = [0.229, 0.224, 0.225]

common_transforms = [
    A.Resize(*NETWORK_SIZE),
    A.ToFloat(max_value=255),
    A.Normalize(max_pixel_value=1.0, mean=IMAGENET_MEAN, std=IMAGENET_STD),
    A.pytorch.transforms.ToTensorV2(),
]

MyTransform = A.Compose(common_transforms, keypoint_params=A.KeypointParams(format='xy', remove_invisible=False))

In [12]:
ds_train = MyCustomDataset(mode="train", transform=MyTransform)
ds_valid = MyCustomDataset(mode="valid", transform=MyTransform)

dl_train = data.DataLoader(
    ds_train,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=True,
    num_workers=os.cpu_count(),
)
dl_valid = data.DataLoader(
    ds_valid,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    num_workers=os.cpu_count(),
)

In [13]:
class MyBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.block = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
        )
        self.identity = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1),
            nn.BatchNorm2d(out_channels),
        )
        
        self.act = nn.ReLU()
        
    def forward(self, x):
        out = self.block(x)
        out += self.identity(x)
        return self.act(out)

In [14]:
class MyResNetLike(nn.Sequential):
    def __init__(self, num_classes):
        super().__init__()

        self.bl1   = MyBlock(3, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        
        self.bl2   = MyBlock(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        
        self.bl3   = MyBlock(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2)
        
        self.bl4   = MyBlock(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2)

        self.flatten = nn.Flatten(start_dim=1, end_dim=-1)

        self.fc1 = nn.Linear(512 * (NETWORK_SIZE[0] // 16) ** 2, 128)

In [15]:
# from https://stackoverflow.com/questions/71998978/early-stopping-in-pytorch
class EarlyStopping:
    def __init__(self, *, min_delta=1, patience=0):
        self.min_delta = min_delta
        self.patience = patience
        self.best = float("inf")
        self.wait = 0
        self.done = False

    def step(self, current):
        self.wait += 1

        if current < self.best - self.min_delta:
            self.best = current
            self.wait = 0
        elif self.wait >= self.patience:
            self.done = True

        return self.done

In [16]:
def train(num_epochs):
    model = MyResNetLike(num_classes=28).to(DEVICE)
    loss_fn = torch.nn.MSELoss().to(DEVICE)
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3)
    early_stopping = EarlyStopping(patience=10)

    for e in range(num_epochs):

        model = model.train()
        train_loss = []
        progress_train = tqdm(
            total=len(dl_train),
            desc=f"Epoch {e}",
            leave=False,
        )
        for x_batch, y_batch in dl_train:
            x_batch = x_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE).float()

            p_batch = model(x_batch)
            loss = loss_fn(p_batch, y_batch)
            train_loss.append(loss.detach())

            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            progress_train.update()
        
        progress_train.close()

        train_loss = torch.stack(train_loss).mean()
        print(
            f"Epoch {e},",
            f"train_loss: {train_loss.item():.8f}",
        )

        model = model.eval()
        valid_loss = []
        progress_valid = tqdm(
            total=len(dl_valid),
            desc=f"Epoch {e}",
            leave=False,
        )
        for x_batch, y_batch in dl_valid:
            x_batch = x_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE).float()

            with torch.no_grad():
                p_batch = model(x_batch)

            loss = loss_fn(p_batch, y_batch)
            valid_loss.append(loss.detach())

            progress_valid.update()
        progress_valid.close()

        valid_loss = torch.stack(valid_loss).mean()
        print(
            f"Epoch {e},",
            f"valid_loss: {valid_loss.item():.8f}",
        )
        
        scheduler.step(valid_loss)
        if early_stopping.step(valid_loss):
            break
        
    return model

In [None]:
model = train(num_epochs=100)
#took ~60 epochs

In [8]:
test_images_path = Path().absolute() / 'public_tests' / '00_test_img_input' / 'test' / 'images'
test_shapes_path = Path().absolute() / 'public_tests' / '00_test_img_gt' / 'img_shapes.csv'
test_gt_path = Path().absolute() / 'public_tests' / '00_test_img_gt' / 'gt.csv'
assert test_images_path.exists(), test_images_path.absolute()
assert test_shapes_path.exists(), test_shapes_path.absolute()
assert test_gt_path.exists(), test_gt_path.absolute()

In [19]:
class MyCustomTestDataset(data.Dataset):
    def __init__(self, root_dir=test_images_path,
                 transform=None):

        paths = []
        cls_paths = sorted(glob.glob(f"{root_dir}/*"))
        paths.extend(cls_paths)

        self._len = len(paths)
        self._paths = paths
        if transform is None:
            transform = DEFAULT_TRANSFORM
        self._transform = transform

    def __len__(self):
        return len(self._paths)

    def __getitem__(self, index):
        img_path = self._paths[index]

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        height, width = image.shape[:-1]
        image = self._transform(image=image)["image"]

        return image, img_path.split('/')[-1], (width, height)

In [21]:
ds_test = MyCustomTestDataset(transform=MyTransform)

dl_test = data.DataLoader(
    ds_test,
    batch_size=BATCH_SIZE,
    shuffle=False,
    drop_last=False,
    num_workers=os.cpu_count(),
)

In [24]:
points = []
names = []

def test(fitted_model):
    model = fitted_model
    model.eval()
    model.to(DEVICE)

    progress_test = tqdm(
        total=len(dl_test),
        leave=True,
    )
    with torch.no_grad():
        for x_batch, x_path, x_shape in dl_test:
            x_batch = x_batch.to(DEVICE)
            
            prediction = model(x_batch)
            for i in range(len(x_batch)):
                prediction_val = prediction[i].cpu().detach().numpy()
                prediction_val[::2] *= x_shape[0][i].item() / NETWORK_SIZE[1]
                prediction_val[1::2] *= x_shape[1][i].item() / NETWORK_SIZE[0]
                points.append(prediction_val)
                names.append(x_path[i])
    
            progress_test.update()
        progress_test.close()


In [25]:
test(model)

  0%|          | 0/375 [00:00<?, ?it/s]

In [26]:
with open('output.csv', mode='w', newline='', encoding='utf-8') as file:
    file.write('filename,x1,y1,x2,y2,x3,y3,x4,y4,x5,y5,x6,y6,x7,y7,x8,y8,x9,y9,x10,y10,x11,y11,x12,y12,x13,y13,x14,y14\n')
    for i in range(len(points)):
        file.write(names[i] + ',' + ','.join([ str(j) for j in points[i] ]) + '\n')

In [10]:
def read_img_shapes(filename):
    img_shapes = {}
    with open(filename) as fhandle:
        next(fhandle)
        for line in fhandle:
            parts = line.rstrip('\n').split(',')
            filename = parts[0]
            n_rows, n_cols = map(int, parts[1:])
            img_shapes[filename] = (n_rows, n_cols)
    return img_shapes

In [11]:
detected = read_csv('output.csv')
gt = read_csv(test_gt_path)
img_shapes = read_img_shapes(test_shapes_path)

In [12]:
error = 0.0
all_found = True
for filename, gt_coords in gt.items():
    if filename not in detected:
        all_found = False
        res = f'Error, keypoints for "{filename}" not found'
        break

    coords = detected[filename]
    n_rows, n_cols = img_shapes[filename]

    diff = (coords - gt_coords)
    diff[::2] /= n_cols
    diff[1::2] /= n_rows
    diff *= 100
    error += (diff ** 2).mean()
error /= len(gt)

if all_found:
    res = f'Ok, error {error:.4f}'

print(res)

Ok, error 4.8415
