- You can find the full source code from
[github](https://github.com/Kitsunetic/dacon-hand-gesture-public.git).
- You can also download the pretrained weights from
[here](https://github.com/Kitsunetic/dacon-hand-gesture-public/releases/tag/weights).
- This notebook contains full code of dataset generation and model training.
It is highly recommended to visit
[notebook](https://github.com/Kitsunetic/dacon-hand-gesture-public/blob/master/inference.ipynb)
if what you want is only inferencing and reproducing.
- Even though you want to do the training steps from scratch,
it is highly recommended to clone the github's source code and run `main.py`
because this notebook could contains unexpected errors while compressing original source codes into a single notebook.

# 0. Prerequisites

## 0-1. Directory Structure

```
+ data
  + ori (original dataset)
    + train
      - *.png
    + test
      - *.png
    - hand_gesture_pose.csv
    - sample_submission.csv
  + crop512_9 (the new dataset)
    + train
      - 000_00_000.png
      - 000_00_000.pth
      - ...
    + test
      - 000_00.png
      - 000_00.pth
      - ...
+ const
  - __init__.py
  - flip.py
  - label_names.py
- utils.py
- main.ipynb (this file)
- inference.ipynb (inference notebook file)
- main.py (the original training script)
```

## 0-2. Python Libraries

- Albumentations
- opencv-python
- imageio
- numpy
- pandas
- timm
- torch==1.7.0 with cuda toolkit 11.2.2, cudnn8
- pyaml
- adabelief_pytorch
- scikit-learn
- tqdm

In [None]:
import gc
import json
import math
import random
import re
import shutil
import sys
from collections import defaultdict
from dataclasses import dataclass
from multiprocessing import Pool
from os import PathLike
from pathlib import Path
from typing import Any, Tuple, List

import albumentations as A
import cv2
import imageio
import numpy as np
import pandas as pd
import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
import yaml
from adabelief_pytorch import AdaBelief
from albumentations.pytorch import ToTensorV2
from sklearn.metrics import classification_report, f1_score
from sklearn.model_selection import StratifiedKFold
from timm.models.layers import Conv2dSame
from timm.models.nfnet import ScaledStdConv2dSame
from torch import Tensor
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

Please refer to [github link](https://github.com/Kitsunetic/dacon-hand-gesture-public.git)
for these three python script files. They define constant variables related to the dataset and basic utility functions.
I omitted them from here because it makes the notebook unnecessarily long.

- [*const/flip.py*](https://github.com/Kitsunetic/dacon-hand-gesture-public/blob/master/const/flip.py)
- [*const/label_names.py*](https://github.com/Kitsunetic/dacon-hand-gesture-public/blob/master/const/label_names.py)
- [*utils.py*](https://github.com/Kitsunetic/dacon-hand-gesture-public/blob/master/utils.py)

In [None]:
from const.flip import id_flip
from const.label_names import id_to_label, label_names
from utils import AverageMeter, CustomLogger, make_result_dir, seed_everything, tqdm_kwargs

# 1. Dataset

Here we will make new dataset by cropping the original dataset that located in `./data/ori` and will locate the new dataset into `./data/crop512_9` directory.
The new dataset contains both `*.png` and `*.pth` files.
But you can just ignore the `*.pth` files which will not be used in this notebook.

First, I cropped and resized the original images into $512 \times 512$ images with bounding box referencing to the keypoint annotations in the `*.json` files.
But this dataset contains some annotation errors.
I precisely inspected which data indexes containing mis-annotated data.
Bounding box from the keypoint pixels are used at that case, which idea came from 게으름뱅이's codeshare.

In [None]:
dsize = (512, 512)
crop_padding = 120
ratio_limit = 1.2
seq_len = 5

# data numbers where its keypoints contains error
wrong_data = [312, 317, 318, 327, 340, 343, 475, 543, 619, 622, 750, 746]

In [None]:
imagenet_mean = np.array([0.485, 0.456, 0.406]).reshape(1, 1, 3)
imagenet_std = np.array([0.229, 0.224, 0.225]).reshape(1, 1, 3)

In [None]:
out_dir = Path("./data/crop512_9")
if out_dir.exists():
    shutil.rmtree(out_dir)

train_out_dir = out_dir / "train"
test_out_dir = out_dir / "test"
train_out_dir.mkdir(parents=True, exist_ok=True)
test_out_dir.mkdir(parents=True, exist_ok=True)

In [None]:
def elastic_resize(im, bbox, dsize, ratio_limit):
    """resize while keep aspect ratio"""
    # bbox (x1, y1, x2, y2)
    # dsize (w, h)
    # ratio_limit: float

    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]

    if h == w:
        return cv2.resize(im[bbox[1] : bbox[3], bbox[0] : bbox[2]], dsize)

    long = h > w
    a, b = (h, w) if long else (w, h)
    ratio = a / b

    if ratio <= ratio_limit:
        return cv2.resize(im[bbox[1] : bbox[3], bbox[0] : bbox[2]], dsize)

    e, f, g = (bbox[0], bbox[2], im.shape[1]) if long else (bbox[1], bbox[3], im.shape[0])

    db = int(a / ratio_limit)
    c = db - b
    e -= math.ceil(c / 2)
    f += math.floor(c / 2)

    if e < 0:
        f += -e
        e = 0
    elif f > g:
        e -= f - g
        f = g

    e = max(0, e)
    f = min(f, g)

    if long:
        bbox[0], bbox[2] = e, f
    else:
        bbox[1], bbox[3] = e, f
    fb = f - e

    return cv2.resize(im[bbox[1] : bbox[3], bbox[0] : bbox[2]], dsize)

In [None]:
def find_bbox(im, u):
    """
    refered to 게으름뱅이's codeshare:
    https://dacon.io/competitions/official/235805/codeshare/3373
    """
    mask = (im == [255, 0, 0]).all(axis=-1) | (im == [0, 255, 0]).all(axis=-1)

    pos = np.stack(mask.nonzero())
    bbox = np.round(
        np.array(
            (
                np.clip(pos[1, :].min() - u, 0, 1920),
                np.clip(pos[0, :].min() - u, 0, 1920),
                np.clip(pos[1, :].max() + u, 0, 1920),
                np.clip(pos[0, :].max() + u, 0, 1920),
            ),
            dtype=np.float64,
        )
    ).astype(np.int64)

    return bbox

In [None]:
def process_image(impath: Path, keypoints: np.ndarray):
    im = imageio.imread(impath)

    # crop
    u = crop_padding

    if int(impath.parent.name) in wrong_data:
        bbox = find_bbox(im, u)
    else:
        v = keypoints
        bbox = np.round(
            np.array(
                (
                    np.clip(v[:, 0].min() - u, 0, 1920),
                    np.clip(v[:, 1].min() - u, 0, 1080),
                    np.clip(v[:, 0].max() + u, 0, 1920),
                    np.clip(v[:, 1].max() + u, 0, 1080),
                ),
                dtype=np.float32,
            )
        ).astype(np.int64)

    im = elastic_resize(im, bbox, dsize, ratio_limit)

    # standardization
    im2 = (im.astype(np.float32) / 255.0 - imagenet_mean) / imagenet_std
    im2 = torch.from_numpy(im2).permute(2, 0, 1).type(torch.float32)

    return im, im2

## 1-1. Generate Training Dataset

Generate training dataset in parallel with `multiprocessing` module.

The training image file names follow this format `{dir index}_{image index}_{label index}.png`,
e.g. `001_02_003.png`, it means the second image in `001` directory and its label index is `3`.

In [None]:
def process_dir_train(dirpath: Path):
    with open(dirpath / f"{dirpath.name}.json") as f:
        j = json.load(f)

    diridx = int(dirpath.name)

    label = id_to_label[j["action"][0]]
    label = torch.tensor(label, dtype=torch.long)

    for i, annot in enumerate(j["annotations"]):
        impath = dirpath / f"{i}.png"
        im_org, im = process_image(impath, np.array(annot["data"]))

        # save image
        fname = f"{diridx:03d}_{i:02d}_{label.item():03d}"
        imageio.imwrite(train_out_dir / (fname + ".png"), im_org)
        torch.save(im, train_out_dir / (fname + ".pth"))

In [None]:
dirs = sorted(list(Path("./data/ori/train").glob("*")))

In [None]:
len(dirs) # 649

In [None]:
with Pool() as pool:
    with tqdm(total=len(dirs), ncols=100, file=sys.stdout) as t:
        for _ in pool.imap_unordered(process_dir_train, dirs):
            t.update()

## 1-2. Generate Test Dataset

The test image file names are following this format `{dir index}_{image index}.png`, e.g. `001_02.png`.
It's basically same with the training one but there is no label index.

In [None]:
def process_dir_test(dirpath: Path):
    with open(dirpath / f"{dirpath.name}.json") as f:
        j = json.load(f)

    diridx = int(dirpath.name)

    for i, annot in enumerate(j["annotations"]):
        impath = dirpath / f"{i}.png"
        im_org, im = process_image(impath, np.array(annot["data"]))

        # save image
        fname = f"{diridx:03d}_{i:02d}"
        imageio.imwrite(test_out_dir / (fname + ".png"), im_org)
        torch.save(im, test_out_dir / (fname + ".pth"))

In [None]:
dirs = sorted(list(Path("./data/ori/test").glob("*")))

In [None]:
len(dirs) # 217

In [None]:
with Pool() as pool:
    with tqdm(total=len(dirs), ncols=100, file=sys.stdout) as t:
        for _ in pool.imap_unordered(process_dir_test, dirs):
            t.update()

# 2. Define Optimizer and Loss Function

In [None]:
class FocalLoss(nn.Module):
    """
    https://dacon.io/competitions/official/235585/codeshare/1796
    """

    def __init__(self, gamma=2.0, eps=1e-7):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        # print(self.gamma)
        self.eps = eps
        self.ce = nn.CrossEntropyLoss(reduction="none")

    def forward(self, input, target):
        logp = self.ce(input, target)
        p = torch.exp(-logp)
        loss = (1 - p) ** self.gamma * logp
        return loss.mean()

In [None]:
class SAM(torch.optim.Optimizer):
    """
    https://github.com/davda54/sam/blob/main/sam.py
    """
    
    def __init__(self, params, base_optimizer, rho=0.05, **kwargs):
        assert rho >= 0.0, f"Invalid rho, should be non-negative: {rho}"

        defaults = dict(rho=rho, **kwargs)
        super(SAM, self).__init__(params, defaults)

        self.base_optimizer = base_optimizer(self.param_groups, **kwargs)
        self.param_groups = self.base_optimizer.param_groups

    @torch.no_grad()
    def first_step(self, zero_grad=False):
        grad_norm = self._grad_norm()
        for group in self.param_groups:
            scale = group["rho"] / (grad_norm + 1e-12)

            for p in group["params"]:
                if p.grad is None:
                    continue
                e_w = p.grad * scale.to(p)
                p.add_(e_w)  # climb to the local maximum "w + e(w)"
                self.state[p]["e_w"] = e_w

        if zero_grad:
            self.zero_grad()

    @torch.no_grad()
    def second_step(self, zero_grad=False):
        for group in self.param_groups:
            for p in group["params"]:
                if p.grad is None:
                    continue
                p.sub_(self.state[p]["e_w"])  # get back to "w" from "w + e(w)"

        self.base_optimizer.step()  # do the actual "sharpness-aware" update

        if zero_grad:
            self.zero_grad()

    @torch.no_grad()
    def step(self, closure=None):
        assert closure is not None, "Sharpness Aware Minimization requires closure, but it was not provided"
        closure = torch.enable_grad()(closure)  # the closure should do a full forward-backward pass

        self.first_step(zero_grad=True)
        closure()
        self.second_step()

    def _grad_norm(self):
        shared_device = self.param_groups[0]["params"][
            0
        ].device  # put everything on the same device, in case of model parallelism
        norm = torch.norm(
            torch.stack(
                [
                    p.grad.norm(p=2).to(shared_device)
                    for group in self.param_groups
                    for p in group["params"]
                    if p.grad is not None
                ]
            ),
            p=2,
        )
        return norm

# 3. Define Hyper-parameters and Configuration

In [None]:
DATA_DIR = Path("./data")
DATA_NAME = "crop512_9"
N_CLASSES = len(label_names)

In [None]:
@dataclass
class Config:
    # experiment
    exp_num: str = "001"
    ver_num: int = None
    result_dir_root: PathLike = Path("results/exp")
    result_dir: PathLike = None
    seed: int = 867243624
    debug: bool = False

    # network
    model_name: str = "tf_efficientnetv2_l_in21ft1k"
    checkpoint_path: PathLike = None
    len_sequence: int = 5
    pretrained: bool = True

    # criterion
    criterion: str = "focal"  # ce, focal

    # training
    num_folds: int = 5
    fold: int = 1
    earlystop_limit = 10
    epochs: int = 100
    finetune: bool = True
    finetune_step1_epochs: int = 2
    finetune_step2_epochs: int = 4

    # optimizer / scheduler
    optimizer_name: str = "AdaBelief"
    lr: float = 1e-3
    weight_decay: float = 0.01
    scheduler: Any = ReduceLROnPlateau

    # Sharpness-Aware Minimization for Efficiently Improving Generalization [2020]
    sam: bool = True  # no simultaneous with LA, it have higher priority

    # Lookahead Optimizer: k steps forward, 1 step back [2019]
    look_ahead: bool = False
    look_ahead_k: int = 5
    look_ahead_alpha: float = 0.5

    # dataoader
    batch_size: int = 50
    num_workers: int = None

    # dataset
    in_memory: bool = True

    def __post_init__(self):
        if self.debug:
            self.epochs = 1
            self.finetune = False

        if self.num_workers is None:
            self.num_workers = self.batch_size * self.len_sequence // 10

    def to_yaml(self, target):
        data = yaml.load(str(self.__dict__), Loader=yaml.FullLoader)
        with open(str(target), "w") as f:
            yaml.dump(data, f)

# 4. Prepare Items for Training

## 4-1. Construct NN

I converted the input sequence images into multi-channel single image by stacking them,
i.e. converted 5 images with shape of `(3, height, width)` into single image with shape of `(15, height, width)`.
And replaced the first CNN of the pre-trained network so that it can handle 15 channel input image.
After that I filled the CNN's weight and bias data with stack of copied original weights to exploit pre-trained weights.
For example, the origin CNN's weight have shape of `(out_channels, 3, kernel_height, kernel_width)`.
I have repeated it 5 times and stacked them all to form the weight's shape of `(out_channels, 15, kernel_height, kernel_width)`.

In [None]:
class Net(nn.Module):
    def __init__(
        self,
        name: str = "resnet",
        pretrained: bool = True,
        n_classes: int = 1000,
        len_sequence: int = 5,
    ):
        super().__init__()

        self.backbone = timm.create_model(name, pretrained=pretrained)

        self.tuning_modules: List[nn.Module] = []

        self.freeze_step = 3

        embedding_size = self.backbone.classifier.in_features
        self.backbone.classifier = nn.Linear(embedding_size, n_classes)
        self.tuning_modules.append(self.backbone.classifier)

        with torch.no_grad():
            c1: Conv2dSame = self.backbone.conv_stem
            w = c1.weight.data

            c2 = Conv2dSame(3 * len_sequence, c1.out_channels, c1.kernel_size, c1.stride)
            c2.weight.data = torch.repeat_interleave(w, len_sequence, dim=1)

            self.backbone.conv_stem = c2
            # self.tuning_modules.append(c2)

    def forward(self, x: Tensor):
        return self.backbone(x)

    def freeze(self, step=3):
        if self.freeze_step != step:
            self.freeze_step = step

            if step == 1:
                self.backbone.requires_grad_(False)
                for m in self.tuning_modules:
                    m.requires_grad_(True)
            elif step == 2:
                self.backbone.requires_grad_(True)
                for m in self.tuning_modules:
                    m.requires_grad_(False)
            else:
                self.backbone.requires_grad_(True)

## 4-2. Load Dataset

I randomly applied affine transform and gaussian blur, image compression augmentations during training steps.
Horizontal flip augmentations was applied randomly and replaced its label's left-right attribute,
i.e. gesture from left hand will be gesture from right hand when it horizontally flipped,
and gesture from both hands will keep its label even though it is horizontally flipped.

In [None]:
class FileLoader:
    def __init__(self, in_memory: bool, files: List[Path]) -> None:
        self.in_memory = in_memory
        self.files = files

        if self.in_memory:
            self.data = {}
            for file in tqdm(self.files, ncols=100, file=sys.stdout, desc="in-memory loading..."):
                self.data[file] = imageio.imread(file)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, file: Path):
        if self.in_memory:
            return self.data[file]
        else:
            return imageio.imread(file)

In [None]:
class GestureDataset(Dataset):
    def __init__(self, items, fileloader: FileLoader, augmentation=False):
        super().__init__()

        self.items = items
        self.fileloader = fileloader
        self.has_label = len(self.items[0]) == 3
        self.augmentation = augmentation

        self._p = lambda: random.random() > 0.5

        t = []
        if augmentation:
            t.append(A.Affine(scale=(0.9, 1.1), translate_px=(-40, 40), rotate=(-15, 15), shear=(-10, 10)))
            t.append(A.GaussianBlur())
            t.append(A.ImageCompression())
        t.append(A.Normalize())
        t.append(ToTensorV2())
        self.transform = A.Compose(transforms=t)

    def __len__(self):
        return len(self.items)

    def __getitem__(self, idx):
        item = self.items[idx]
        files = item[0]
        diridx = torch.tensor(item[1], dtype=torch.long)
        # ims = torch.cat([torch.load(file) for file in files])  # (L*3, H, W)
        # ims = torch.cat([self.transform(image=imageio.imread(file))["image"] for file in files])
        ims = torch.cat([self.transform(image=self.fileloader[file])["image"] for file in files])

        if self.has_label:
            label = item[2]

            # flipping
            if self.augmentation:
                ims, label = self.augment(ims, label)

            label = torch.tensor(label, dtype=torch.long)

            return ims, diridx, label
        else:
            return ims, diridx

    def fliplr(self, ims: Tensor, label: int):
        if label in id_flip:
            new_label = id_flip[label]
            new_ims = torch.flip(ims, dims=(2,))
            return new_ims, new_label
        else:
            return ims, label

    def augment(self, ims: Tensor, label: int):
        # fliplr
        if self._p():
            ims, label = self.fliplr(ims, label)

        return ims, label

In [None]:
def make_datasets(config: Config):
    print("Load datasets ...")

    # load train dataset
    files_train = sorted(list((DATA_DIR / DATA_NAME / "train").glob("*.png")))
    files_test = sorted(list((DATA_DIR / DATA_NAME / "test").glob("*.png")))
    if config.debug:
        files_train = files_train[:1000]

    fileloader = FileLoader(in_memory=config.in_memory, files=files_train + files_test)

    data = defaultdict(list)
    labels = {}
    for file in files_train:
        # train filename: {diridx:3d}_{fileidx:2d}_{label:3d}
        diridx = int(file.stem[:3])
        data[diridx].append(file)
        labels[diridx] = int(file.stem[-3:])

    items_file = []
    items_dir = []
    items_label = []
    for diridx in data:
        if len(data[diridx]) > config.len_sequence:
            for start_idx in range(len(data[diridx]) - config.len_sequence + 1):
                items_file.append(data[diridx][start_idx : start_idx + config.len_sequence])
                items_dir.append(diridx)
                items_label.append(labels[diridx])
        elif len(data[diridx]) == config.len_sequence:
            items_file.append(data[diridx])
            items_dir.append(diridx)
            items_label.append(labels[diridx])
        else:
            fake = [data[diridx][-1] for _ in range(config.len_sequence - len(data[diridx]))]
            items_file.append(data[diridx] + fake)
            items_dir.append(diridx)
            items_label.append(labels[diridx])

    label_cnt = defaultdict(list)
    for i, label in enumerate(items_label):
        label_cnt[label].append(i)

    skf = StratifiedKFold(config.num_folds, shuffle=True, random_state=config.seed)
    tidx, vidx = list(skf.split(items_file, items_label))[config.fold - 1]

    items_train = [(items_file[i], items_dir[i], items_label[i]) for i in tidx]
    items_valid = [(items_file[i], items_dir[i], items_label[i]) for i in vidx]

    ds_train = GestureDataset(items_train, fileloader=fileloader, augmentation=True)
    ds_valid = GestureDataset(items_valid, fileloader=fileloader, augmentation=False)

    dl_kwargs = dict(batch_size=config.batch_size, num_workers=config.num_workers, pin_memory=True)
    dl_train = DataLoader(ds_train, **dl_kwargs, shuffle=True)
    dl_valid = DataLoader(ds_valid, **dl_kwargs, shuffle=False)

    # load test dataset
    data = defaultdict(list)
    for file in files_test:
        # test filename: {diridx:3d}_{fileidx:2d}
        diridx = int(file.stem[:3])
        data[diridx].append(file)

    items_test = []
    for diridx in data:
        if len(data[diridx]) > config.len_sequence:
            for i in range(len(data[diridx]) - config.len_sequence):
                items_test.append((data[diridx][i : i + config.len_sequence], diridx))
        elif len(data[diridx]) == config.len_sequence:
            items_test.append((data[diridx], diridx))
        else:
            fake = [data[diridx][-1] for _ in range(config.len_sequence - len(data[diridx]))]
            items_test.append((data[diridx] + fake, diridx))

    ds_test = GestureDataset(items_test, fileloader=fileloader, augmentation=False)
    dl_test = DataLoader(ds_test, **dl_kwargs, shuffle=False)

    return dl_train, dl_valid, dl_test

## 4-3. Make Trainer

In [None]:
class GestureTrainerOutput:
    def __init__(self) -> None:
        self._loss = AverageMeter()
        self._correct, self._total = 0, 0
        self._preds, self._targets = [], []
        self._labels = list(range(N_CLASSES))

        self._target_names = [label_names[i]["name"] for i in range(N_CLASSES)]

    @torch.no_grad()
    def update(self, loss: Tensor, preds: Tensor, labels: Tensor):
        n = preds.size(0)
        self._loss.update(loss.item(), n)

        pclass = preds if preds.dim() == 1 else preds.argmax(dim=1)
        tclass = labels if labels.dim() == 1 else labels.argmax(dim=1)

        self._correct += (pclass == tclass).sum().item()
        self._total += n

        self._preds.extend(pclass.tolist())
        self._targets.extend(tclass.tolist())

    @property
    def acc(self):
        if self._total == 0:
            return 0
        return self._correct / self._total * 100

    @property
    def loss(self):
        return self._loss()

    @property
    def f1(self):
        return f1_score(self._targets, self._preds, labels=self._labels, average="macro")

    @property
    def report(self):
        return classification_report(
            self._targets,
            self._preds,
            labels=self._labels,
            target_names=self._target_names,
        )


class GestureTrainer:
    def __init__(self, config: Config) -> None:
        self.config = config
        self.log = CustomLogger(config.result_dir / "log.log")
        self.log_rpt = CustomLogger(config.result_dir / "report_train.log")
        self.log_rpv = CustomLogger(config.result_dir / "report_valid.log")

        self.epoch = 1
        self.best_loss = math.inf
        self.earlystop_cnt = 0

        self.model = Net(
            name=config.model_name,
            n_classes=N_CLASSES,
            pretrained=config.pretrained,
            len_sequence=config.len_sequence,
        ).cuda()

        if config.checkpoint_path is not None:
            self.model.load_state_dict(torch.load(config.checkpoint_path))

        self.model = nn.DataParallel(self.model)

        # criterion
        if config.criterion == "ce":
            self.criterion = nn.CrossEntropyLoss().cuda()
        elif config.criterion == "focal":
            self.criterion = FocalLoss().cuda()
        else:
            raise NotImplementedError(config.criterion)

        # optimizer
        OptimizerClass = {
            "AdamW": AdamW,
            "AdaBelief": AdaBelief,
        }[config.optimizer_name]
        if config.sam:
            self.optimizer = SAM(
                self.model.parameters(),
                base_optimizer=OptimizerClass,
                lr=config.lr,
                weight_decay=config.weight_decay,
            )
        else:
            self.optimizer = OptimizerClass(
                self.model.parameters(),
                lr=config.lr,
                weight_decay=config.weight_decay,
            )
            if config.look_ahead:
                self.optimizer = Lookahead(
                    optimizer=self.optimizer,
                    k=config.look_ahead_k,
                    alpha=config.look_ahead_alpha,
                )
        self.scheduler = ReduceLROnPlateau(self.optimizer, factor=0.5, patience=3, verbose=True)

        self.tdl, self.vdl, self.dl_test = make_datasets(config)

    def save(self):
        ckpt_path = self.config.result_dir / "best.pth"
        self.log.info("Save checkpoint:", ckpt_path)

        state_dict = self.model.module.state_dict()

        torch.save(state_dict, ckpt_path)

    def train_loop(self, dl: DataLoader):
        o = GestureTrainerOutput()

        tqdm_desc = f"Train [{self.epoch:02d}/{self.config.epochs:02d}]"

        with tqdm(total=len(dl.dataset), **tqdm_kwargs, desc=tqdm_desc) as t:
            for images, diridxes, labels in dl:
                images_ = images.cuda()
                labels_ = labels.cuda()
                n = images.size(0)

                self.optimizer.zero_grad()

                logits_ = self.model(images_)
                loss = self.criterion(logits_, labels_)

                o.update(loss, logits_, labels_)
                t.set_postfix_str(f"loss:{o.loss:.6f}, acc:{o.acc:.2f}")

                loss.backward()
                if self.config.sam:
                    self.optimizer.first_step(zero_grad=True)
                    logits2_ = self.model(images_)
                    loss2 = self.criterion(logits2_, labels_)
                    loss2.backward()
                    self.optimizer.second_step(zero_grad=True)
                else:
                    self.optimizer.step()

                # self.scheduler.step()

                t.update(n)

        return o

    @torch.no_grad()
    def valid_loop(self, dl: DataLoader):
        o = GestureTrainerOutput()

        tqdm_desc = f"Valid [{self.epoch:02d}/{self.config.epochs:02d}]"

        with tqdm(total=len(dl.dataset), **tqdm_kwargs, desc=tqdm_desc) as t:
            for images, diridxes, labels in dl:
                images_ = images.cuda()
                labels_ = labels.cuda()
                n = images.size(0)

                logits_ = self.model(images_)
                loss = self.criterion(logits_, labels_)

                o.update(loss, logits_, labels_)
                t.set_postfix_str(f"loss:{o.loss:.6f}, acc:{o.acc:.2f}")

                t.update(n)

        return o

    @torch.no_grad()
    def callback(self, to: GestureTrainerOutput, vo: GestureTrainerOutput):
        print()

        self.log.info(
            f"ep[{self.epoch:03d}/{self.config.epochs:03d}]",
            f"loss[{to.loss:.6f};{vo.loss:.6f}]",
            f"acc[{to.acc:.2f};{vo.acc:.2f}]",
            f"f1[{to.f1:.4f};{vo.f1:.4f}]",
        )

        _t = f"ep[{self.epoch:03d}/{self.config.epochs:03d}]"
        self.log_rpt.info("TRAIN", _t, "\n", to.report)
        self.log_rpv.info("VALID", _t, "\n", vo.report)

        if isinstance(self.scheduler, ReduceLROnPlateau):
            self.scheduler.step(vo.loss)

        if self.best_loss > vo.loss:
            self.best_loss = vo.loss
            self.earlystop_cnt = 0
            self.save()
        else:
            self.earlystop_cnt += 1

        self.log.flush()
        self.log_rpt.flush()
        self.log_rpv.flush()

    def fit(self):
        seed_everything(self.config.seed)

        for self.epoch in range(self.epoch, self.config.epochs + 1):
            if self.config.finetune:
                if self.epoch <= self.config.finetune_step1_epochs:
                    self.model.module.freeze(step=1)
                elif self.epoch <= self.config.finetune_step2_epochs:
                    self.model.module.freeze(step=2)
                else:
                    self.model.module.freeze(step=3)

            # Training
            self.model.train()
            to = self.train_loop(self.tdl)

            if self.earlystop_cnt >= self.config.earlystop_limit:
                self.log.info("Early Stopping")
                break
            else:
                with torch.no_grad():
                    self.model.eval()
                    vo = self.valid_loop(self.vdl)
                    self.callback(to, vo)

            self.log.flush()
            self.log_rpt.flush()
            self.log_rpv.flush()

    @torch.no_grad()
    def submit(self):
        # load best checkpoint
        checkpoint_path = self.config.result_dir / "best.pth"
        self.log.info("Load checkpoint", checkpoint_path)
        checkpoint = torch.load(checkpoint_path)
        self.model.module.load_state_dict(checkpoint)

        self.model.eval()

        # exp022: update submission tta
        ret = defaultdict(list)
        with tqdm(total=len(self.dl_test.dataset), ncols=100, file=sys.stdout, desc="submission") as t:
            for images, diridxes in self.dl_test:
                logits = self.model(images.cuda()).cpu()

                for logit, diridx in zip(logits, diridxes):
                    ret[diridx.item()].append(logit)

                    t.update()

        out_sm = defaultdict(list)
        out_ms = defaultdict(list)
        for diridx, logits in ret.items():
            out_sm["Image_Path"].append(f"./test\\{diridx}")
            out_ms["Image_Path"].append(f"./test\\{diridx}")

            logits = torch.stack(logits)
            logit_sm = logits.softmax(dim=1).mean(dim=0)
            logit_ms = logits.mean(dim=0).softmax(dim=0)

            for k in range(196):
                if k in id_to_label:
                    out_sm[f"Label_{k}"].append(logit_sm[id_to_label[k]].item())
                    out_ms[f"Label_{k}"].append(logit_ms[id_to_label[k]].item())

        df_sm = pd.DataFrame(out_sm)
        df_ms = pd.DataFrame(out_ms)

        out_df_path = str(self.config.result_dir / f"exp{self.config.exp_num}_ver{self.config.ver_num}_%s.csv")
        print("Write result to", out_df_path % "_")
        df_sm.to_csv(out_df_path % "sm", index=False)
        df_ms.to_csv(out_df_path % "ms", index=False) # ms was better

# 5. Train Neural Network

The output log and weight files will be located in `./results/exp/exp001/version_{config.ver_num}/`.
The `config.ver_num` will be automatically updated each time when you running this script.

I made my final ensembled submission by calculating average from 17 other inference submissions where each training was performed under different seed and fold values.

In [None]:
def main():
    config = Config(
        debug=False,
        finetune=True,
        model_name="tf_efficientnetv2_l_in21ft1k",
        batch_size=18,
        sam=True,
        pretrained=True,
        optimizer_name="AdaBelief",
        fold=1,
        seed=1,
        num_workers=6,
        in_memory=True,
        lr=1e-3,
    )
    
    config.result_dir = make_result_dir(config)
    shutil.copy(__file__, config.result_dir / Path(__file__).name)
    config.to_yaml(config.result_dir / "params.yaml")

    trainer = GestureTrainer(config)
    trainer.fit()
    trainer.submit()

    del trainer
    gc.collect()
    torch.cuda.empty_cache()