In [None]:
!pip install ftfy
!pip install torch==1.13.1
# Install PyTorch
# !conda install pytorch==1.12.0 torchvision==0.13.0 torchaudio==0.12.0 cudatoolkit=11.3 -c pytorch
# Install mim
!pip install -U openmim
# Install mmengine
!mim install mmengine
# Install MMCV
# !pip wheel mmcv-full==1.4.0
!mim install mmcv

!rm -rf mmsegmentation
!git clone -b main https://github.com/open-mmlab/mmsegmentation.git 
%cd mmsegmentation
!pip install -e .

In [None]:
!mim download mmsegmentation --config pspnet_r50-d8_4xb2-40k_cityscapes-512x1024 --dest .
!mim download mmsegmentation --config configs/ccnet/ccnet_r50-d8_4xb4-80k_ade20k-512x512.py --dest .

In [None]:
from glob import glob
from sklearn.metrics import f1_score
import cv2
from tqdm import tqdm
import os

from torch import nn
import torch.functional as F
import numpy as np
import cv2
from tqdm import tqdm

import numpy as np
from PIL import Image
from mmseg.apis import init_model, inference_model, show_result_pyplot
from mmengine import Config
import mmcv

from torchvision import transforms

import torch
import torch.nn.functional as F

In [None]:
PATH_TO_VALIDATION = '/kaggle/input/leaders-of-digital-segmentation/patches_val/'



In [None]:
# calculate f1-metrics

def calculate_model_metrics(model):
    real_masks = []
    pred_masks = []
    for img_path in tqdm(sorted(glob(os.path.join(PATH_TO_VALIDATION, 'images/*')))):
        mask_path = img_path.replace('images', 'masks')

        mask = cv2.imread(mask_path)
        pred_masks += model(img_path).flatten().cpu().detach().numpy().tolist()
        real_masks += mask[:, :, 0].flatten().cpu().detach().numpy().tolist()
    return f1_score(real_masks, pred_masks)

In [None]:
import numpy as np
from PIL import Image
from mmseg.apis import init_model, inference_model, show_result_pyplot
from mmengine import Config
import mmcv

from torchvision import transforms

import torch
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
UNET_CHECKPOINT_PATH = '/kaggle/input/checkpoints-sat-snimkis/checkpoint_epoch3_4800.pth'
MMCV_CHECKPOINT_PATH = '/kaggle/input/checkpoints-sat-snimkis/mmseq_pspnet_2000.pth'
CCNET_CHECKPOINT_PATH = '/kaggle/input/checkpoints-sat-snimkis/ccnet_2000.pth'

MMCV_CONF_PATH = 'configs/pspnet/pspnet_r50-d8_4xb2-40k_cityscapes-512x1024.py'
CCNET_CONF_PATH = 'configs/ccnet/ccnet_r50-d8_4xb4-80k_ade20k-512x512.py'

# create config for mmcv
def create_config_from_file(
        conf_path: str = 'configs/pspnet/pspnet_r50-d8_4xb2-40k_cityscapes-512x1024.py',
        load_from: str = 'pspnet_r50-d8_512x1024_40k_cityscapes_20200605_003338-2966598c.pth'
    ):
    cfg = Config.fromfile(conf_path)

    # Since we use only one GPU, BN is used instead of SyncBN
    cfg.norm_cfg = dict(type='BN', requires_grad=True)
    cfg.crop_size = (256, 256)
    cfg.model.data_preprocessor.size = cfg.crop_size
    cfg.model.backbone.norm_cfg = cfg.norm_cfg
    cfg.model.decode_head.norm_cfg = cfg.norm_cfg
    cfg.model.auxiliary_head.norm_cfg = cfg.norm_cfg
    # modify num classes of the model in decode/auxiliary head
    cfg.model.decode_head.num_classes = 2
    cfg.model.auxiliary_head.num_classes = 2

    # cfg.model.optimizer = dict(lr=0.005, momentum=0.9, type='SGD', weight_decay=0.0005)

    # Modify dataset type and path
    cfg.dataset_type = 'StanfordBackgroundDataset'
    cfg.data_root = ''

    cfg.train_dataloader.batch_size = 8

    cfg.train_pipeline = [
        dict(type='LoadImageFromFile'),
        dict(type='LoadAnnotations'),
        # dict(type='RandomResize', scale=(2048, 2048), ratio_range=(0.5, 2.0), keep_ratio=True),
        # dict(type='RandomCrop', crop_size=cfg.crop_size),
        # dict(type='RandomFlip', prob=0.5),
        dict(type='PackSegInputs')
    ]

    cfg.test_pipeline = [
        dict(type='LoadImageFromFile'),
        # dict(type='Resize', scale=(2048, 2048), keep_ratio=True),
        # add loading annotation after ``Resize`` because ground truth
        # does not need to do resize data transform
        dict(type='LoadAnnotations'),
        dict(type='PackSegInputs')
    ]


    # Load the pretrained weights
    cfg.load_from = load_from

    # Set up working dir to save files and logs.
    cfg.work_dir = './work_dirs/tutorial'

    train_cfg = dict(type='EpochBasedTrainLoop')

    cfg.train_cfg.max_iters = 10000
    cfg.train_cfg.val_interval = 1000
    cfg.default_hooks.logger.interval = 100
    cfg.default_hooks.checkpoint.interval = 1000

    # Set seed to facilitate reproducing the result
    cfg['randomness'] = dict(seed=0)

    return cfg



In [None]:
""" Parts of the U-Net model """
# Unet baseline realisation
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

In [None]:
""" Full assembly of the parts to form the complete network """

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64))
        self.down1 = (Down(64, 128))
        self.down2 = (Down(128, 256))
        self.down3 = (Down(256, 512))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor))
        self.up1 = (Up(1024, 512 // factor, bilinear))
        self.up2 = (Up(512, 256 // factor, bilinear))
        self.up3 = (Up(256, 128 // factor, bilinear))
        self.up4 = (Up(128, 64, bilinear))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

    def use_checkpointing(self):
        self.inc = torch.utils.checkpoint(self.inc)
        self.down1 = torch.utils.checkpoint(self.down1)
        self.down2 = torch.utils.checkpoint(self.down2)
        self.down3 = torch.utils.checkpoint(self.down3)
        self.down4 = torch.utils.checkpoint(self.down4)
        self.up1 = torch.utils.checkpoint(self.up1)
        self.up2 = torch.utils.checkpoint(self.up2)
        self.up3 = torch.utils.checkpoint(self.up3)
        self.up4 = torch.utils.checkpoint(self.up4)
        self.outc = torch.utils.checkpoint(self.outc)

In [None]:

cfg = create_config_from_file(CCNET_CONF_PATH, load_from=CCNET_CHECKPOINT_PATH)
ccnet_model = init_model(cfg, CCNET_CHECKPOINT_PATH, 'cuda:0')

cfg = create_config_from_file(MMCV_CONF_PATH, load_from=MMCV_CHECKPOINT_PATH)
pspnet_model = init_model(cfg, MMCV_CHECKPOINT_PATH, 'cuda:0')

unet_model = UNet(3, 1, False).to(device)
unet_model.load_state_dict(torch.load(UNET_CHECKPOINT_PATH))
unet_model.eval()

In [None]:
# preprocess functions and predictions for using in blending
def unet_preprocess(pil_img, is_mask):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((256, 256))
    ])
    img = np.asarray(pil_img)

    if transform:
        img = transform(img)

    if is_mask:
        img *= 255

    return img


def predict_img(net, img, device):
    net.eval()
    img = np.array(img)
    img = unet_preprocess(img, is_mask=False)
    img = img.unsqueeze(0)
    img = img.to(device=device, dtype=torch.float32)
    with torch.no_grad():
        output = net(img).cpu()
        mask = torch.sigmoid(output)
    return mask.squeeze().numpy()


def read_image_from_mmcv(img_path):
    return mmcv.imread(img_path)


def read_image_to_numpy(img_path):
    return np.array(Image.open(img_path))

def mmcv_predict(img_path, get_buildings=True):
    # Init the model from the config and the checkpoint
    img = mmcv.imread(img_path)

    result = torch.softmax(inference_model(pspnet_model, img).seg_logits.data, dim=0)
    if get_buildings:
        result = result[0]

    return result


def ccnet_predict(img_path, get_buildings=True):
    # Init the model from the config and the checkpoint
    img = mmcv.imread(img_path)

    result = torch.softmax(inference_model(ccnet_model, img).seg_logits.data, dim=0)
    if get_buildings:
        result = result[0]

    return result

def unet_predict(img, device=device):
    return predict_img(unet_model, img, device)



In [None]:

class Blending:
    def __init__(self, predict_functions, weights_list, read_picture_functions):
        self.predict_functions = predict_functions
        self.weights_list = weights_list
        assert abs(sum(self.weights_list) - 1) <= 1e-8
        self.read_picture_functions = read_picture_functions

    def predict(self, img):
        if isinstance(img, str):
            pil_img = np.array(Image.open(img))
        elif isinstance(img, np.ndarray):
            pil_img = img
        else:
            raise ValueError(f"Type of img: {type(img)}. The type is not known")

        res_mask = np.zeros((pil_img.shape[0], pil_img.shape[1]))
        for i, (pred_func, read_pic_func) in enumerate(zip(self.predict_functions, self.read_picture_functions)):
            img = read_pic_func(img)
            pred_mask = pred_func(img)
            pred_mask = cv2.resize(
                pred_mask.cpu().detach().numpy(), 
                dsize=(pil_img.shape[1], pil_img.shape[0]), 
                interpolation=cv2.INTER_CUBIC)

            pred_mask = 1 - pred_mask
            res_mask += (self.weights_list[i] * pred_mask)

        return res_mask

In [None]:
# create submission for blending

PATCH_SIZE = 256
TEST_DATA_PATH = '/kaggle/input/leaders-of-digital-segmentation-test/images/*'


def predict_full(blend_obj, path):
    img = cv2.imread(path)
    mask = np.zeros((img.shape[0], img.shape[1]))
    count = np.zeros((img.shape[0], img.shape[1]))
    for i in tqdm(range(0 * 128, (img.shape[0] + PATCH_SIZE - 1) // PATCH_SIZE * 256, 128)):
        for j in range(0 * 128, (img.shape[1] + PATCH_SIZE - 1) // PATCH_SIZE * 256, 128):
            patch_img = img[i:i + PATCH_SIZE, j:j + PATCH_SIZE]
            if np.prod(patch_img.shape) == 0:
                continue
            predicted = blend_obj.predict(patch_img)
            mask[i:i + PATCH_SIZE, j:j + PATCH_SIZE] += predicted
            count[i:i + PATCH_SIZE, j:j + PATCH_SIZE] += np.ones(predicted.shape)

    return mask, count


for fn in sorted(glob(TEST_DATA_PATH)):
    mask, count = predict_full(fn)
    cv2.imwrite(fn.split('/')[-1].replace('image', 'mask'), mask / count)