# Vesuvius Challenge Inference baseline

This notebook is an example on how to use pretrained models and submit the results

## Imports

In [None]:
import json
import logging
import os
from collections import defaultdict
from dataclasses import dataclass
from io import StringIO
from pathlib import Path
from typing import Union, get_type_hints

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from dotenv import load_dotenv
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

## Initial structures

In [None]:
%%writefile .env

LOG_LEVEL = INFO
ENVIRONMENT = kaggle
TILE_SIZE = 16

BATCH_SIZE = 32

In [None]:
load_dotenv()


TRAINING_KEYS = ["BATCH_SIZE", "EPOCHS", "LEARNING_RATE", "PATIENCE", "CV_FOLDS", "FOLD_IDX", "TILE_SIZE"]

dataset_path_map = {
    "local": "dataset",
    "kaggle": "/kaggle/input/vesuvius-challenge-ink-detection",
}


class AppConfigError(Exception):
    """Raised when there is an error in the configuration"""


class AppConfig:
    """
    General configuration class for the project
    Maps environment variables to class attributes
    """

    SEED: int = 777
    LOG_LEVEL: str
    ENVIRONMENT: str
    TILE_SIZE: int
    BATCH_SIZE: int
    EPOCHS: int = 200
    MODEL: str = ""
    LEARNING_RATE: float = 1e-3
    CHECKPOINTS_DIR: Path = '.'
    PATIENCE: int = 10
    CV_FOLDS: int = 5
    FOLD_IDX: int = -1
    WANDB_API_KEY: str = ""

    def __init__(self, env):
        for field in self.__annotations__:  # pylint: disable=no-member
            # Raise AppConfigError if required field not supplied
            default_value = getattr(self, field, None)
            if default_value is None and env.get(field) is None:
                raise AppConfigError(f"The {field} field is required")

            # Cast env var value to expected type and raise AppConfigError on failure
            try:
                var_type = get_type_hints(AppConfig)[field]
                if var_type == bool:
                    value = _parse_bool(env.get(field, default_value))
                else:
                    value = var_type(env.get(field, default_value))

                self.__setattr__(field, value)
            except ValueError as err:
                raise AppConfigError(
                    f'Unable to cast value of "{env[field]}" to type "{var_type}" for "{field}" field'
                ) from err

    @property
    def NUM_WORKERS(self) -> int:
        """Defines the number of workers for the DataLoader"""
        return os.cpu_count() or 0

    @property
    def DEVICE(self) -> torch.device:
        """Defines the device to use for training"""
        return torch.device("cuda" if torch.cuda.is_available() else "cpu")

    @property
    def DATASET_PATH(self) -> Path:
        """Defines the path to the dataset logic"""
        return Path(dataset_path_map[self.ENVIRONMENT])

    @property
    def WANDB_PROJECT(self) -> str:
        return "Vesuvius Challenge"

    def __repr__(self):
        attrs = {
            **vars(self),
            **{
                prop_name: str(getattr(self, prop_name))
                for prop_name in dir(self)
                if isinstance(getattr(type(self), prop_name, None), property)
            },
        }

        # Remove private attributes
        attrs.pop("WANDB_API_KEY", None)

        attrs_str = json.dumps(attrs, indent=4, sort_keys=True, cls=ConfigEncoder)
        return f"{type(self).__name__}({attrs_str})"

    def __getitem__(self, key):
        return self.__getattribute__(key)

Config = AppConfig(os.environ)

In [None]:
logger = logging.getLogger(__name__)
logger.setLevel(Config.LOG_LEVEL)
stream_handler = logging.StreamHandler()
stream_handler.setStream(tqdm)
logger.addHandler(stream_handler)

## DataSet

In [None]:
class VesuviusOriginalDataSetTest(Dataset):
    """This dataset uses original data from Vesuvius Challenge.
    No optimizations, processing, and separations applied.
    """

    def __init__(self, fragment_path):
        self.voxels_data = None
        self.masked_idxs = None
        self._load_data(fragment_path)

    def _load_data(self, fragment_path):
        logger.info("Loading the data...")
        
        self.voxels_data, masked_idxs = self._load_fragment(fragment_path)
        self.masked_idxs = np.array(masked_idxs, dtype=np.int16)

    def _load_fragment(self, fragment_path: Path):
        slice_paths = sorted(list((fragment_path / "surface_volume").glob("*.tif")))
        mask = cv2.imread(str(fragment_path / "mask.png"), cv2.IMREAD_GRAYSCALE).astype(bool)
        masked_idxs = self._get_masked_idxs(mask)
        voxels_data = np.empty((len(masked_idxs), len(slice_paths), Config.TILE_SIZE, Config.TILE_SIZE), dtype=np.uint8)
        for i, slice_path in enumerate(tqdm(slice_paths, leave=False)):
            # In this case, we use cv2 to load image, because it's faster than PIL
            slice_img = cv2.imread(str(slice_path), cv2.IMREAD_UNCHANGED)

            # Convert to uint8 to save memory usage
            slice_data = (slice_img // 255).astype(np.uint8)

            voxels_data[:, i, :, :] = self._split_slice(slice_data, masked_idxs)

        return voxels_data, masked_idxs

    def _get_masked_idxs(self, mask):
        """
        Returns list of tuples with indexes of tiles with data.
        Basically, the idea of this function is to pre calculate the indexes of tiles with data,
        so it would be possible to pre-allocate memory for the fragments and then just fill it with data.
        This approach is much faster and less memory consuming than just appending to the list and concatenating then.
        """
        mask_idxs = []
        for i in range(0, mask.shape[0], Config.TILE_SIZE):
            for j in range(0, mask.shape[1], Config.TILE_SIZE):
                if mask[i : i + Config.TILE_SIZE, j : j + Config.TILE_SIZE].any():
                    mask_idxs.append((i, j))
        return mask_idxs

    def _split_slice(self, slice_data, masked_idxs):
        """Split slice into tiles. It's possible to mask to filter out tiles with no data."""
        tiles = np.empty((len(masked_idxs), Config.TILE_SIZE, Config.TILE_SIZE), dtype=np.uint8)
        for k, (i, j) in enumerate(masked_idxs):
            tile = slice_data[i : i + Config.TILE_SIZE, j : j + Config.TILE_SIZE]
            if tile.shape != (Config.TILE_SIZE, Config.TILE_SIZE):
                tile = np.pad(
                    tile,
                    (
                        (0, Config.TILE_SIZE - tile.shape[0]),
                        (0, Config.TILE_SIZE - tile.shape[1]),
                    ),
                    "constant",
                    constant_values=0,
                )

            tiles[k] = tile

        return tiles

    def __len__(self) -> int:
        return self.voxels_data.shape[0]

    def __getitem__(self, index):
        voxel = (self.voxels_data[index] / 255.0).astype(np.float32)
        tile_coords = self.masked_idxs[index]
        return torch.from_numpy(voxel).unsqueeze(0), tile_coords

## Models

In [None]:
class InkDetector(torch.nn.Module):
    def __init__(self):
        super().__init__()

        filters = [16, 32, 64]
        paddings = [1, 1, 1]
        kernel_sizes = [3, 3, 3]
        strides = [2, 2, 2]

        layers = []
        in_channels = 1
        for num_filters, padding, kernel_size, stride in zip(
            filters, paddings, kernel_sizes, strides
        ):
            layers.extend(
                [
                    nn.Conv3d(
                        in_channels=in_channels,
                        out_channels=num_filters,
                        kernel_size=kernel_size,
                        stride=stride,
                        padding=padding,
                    ),
                    nn.ReLU(inplace=True),
                    nn.BatchNorm3d(num_features=num_filters),
                ]
            )
            in_channels = num_filters
        layers.append(nn.AdaptiveAvgPool3d(1))
        layers.append(nn.Flatten())

        self.encoder = nn.Sequential(*layers)
        self.decoder = nn.Sequential(
            nn.Linear(in_channels, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 1),
        )

    def forward(self, x):
        features = self.encoder(x)
        return self.decoder(features)

## Models loading

In [None]:
@dataclass
class PretrainedModel:
    name: str
    model_class: nn.Module
    folds_path: Path
        
input_base = Path('../input')
pretrained_models = [
    PretrainedModel("InkDetector-pure", InkDetector, input_base / 'inknetv1')
]

## Prediction

In [None]:
result = {}
for fragment_path in (Config.DATASET_PATH / 'test').iterdir():
    image_size = cv2.imread(str(fragment_path / 'mask.png')).shape[:2]
    
    dataset = VesuviusOriginalDataSetTest(fragment_path)
    dataloader = DataLoader(
        dataset, 
        batch_size=Config.BATCH_SIZE,
        num_workers=Config.NUM_WORKERS
    )
    
    ink_masks = []
    for pretrained_model in pretrained_models:
        model = pretrained_model.model_class().to(Config.DEVICE)
        for fold_path in pretrained_model.folds_path.iterdir():
            ink_mask = np.zeros(image_size, dtype=bool)
            
            model.load_state_dict(torch.load(fold_path, map_location=Config.DEVICE))
            model.eval()
    
            for x, tile_coords in tqdm(dataloader):
                batch_pred = model(x).detach().cpu().numpy()[:, 0] > 0.5
                
                for coords, pred in zip(tile_coords, batch_pred):
                    ink_mask[coords[0]:coords[0] + Config.TILE_SIZE, coords[1]: coords[1] + Config.TILE_SIZE] = pred
            
            ink_masks.append(ink_mask)
    result_mask = np.array(ink_masks).mean(axis=0) > 0.2  # Majority voting strategy
    result[fragment_path.stem] = result_mask
    
    plt.imshow(result_mask)
    plt.show()

In [None]:
plt.imshow(result_mask)

## Submission

In [None]:
def combined_rle(img, img_id):
    pixels = img.flatten()
    pixels[0] = 0
    pixels[-1] = 0
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 2
    runs[1::2] = runs[1::2] - runs[:-1:2]
    f = StringIO()
    np.savetxt(f, runs.reshape(1, -1), delimiter=" ", fmt="%d")
    predicted = f.getvalue().strip()
    return {"Id": img_id, "Predicted": predicted}

In [None]:
submission = []
for name, pred_mask in result.items():
    submission.append(combined_rle(pred_mask, name))

submission_df = pd.DataFrame(submission)
submission_df    

In [None]:
submission_df.to_csv("/kaggle/working/submission.csv", index=False)