# Road Segmentation Project


In [None]:
# Can skip in Jupyter
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#Constants
PATCH_SIZE = 16  # pixels per side of square patches
VAL_SIZE = 10  # size of the validation set (number of images)
CUTOFF = 0.25  # minimum average brightness for a mask patch to be classified as containing road

In [None]:
import math
import os
import re
import cv2
import numpy as np
import matplotlib.pyplot as plt
from glob import glob
from random import sample
from PIL import Image
from sklearn.model_selection import train_test_split

## Helper functions
These are some general utility functions for visualization and submission --> Directly taken from the provided project notebook

In [None]:

def load_all_from_path(path):
    # loads all HxW .pngs contained in path as a 4D np.array of shape (n_images, H, W, 3)
    # images are loaded as floats with values in the interval [0., 1.]
    return np.stack([np.array(Image.open(f)) for f in sorted(glob(path + '/*.png'))]).astype(np.float32) / 255.


def show_first_n(imgs, masks, n=5):
    # visualizes the first n elements of a series of images and segmentation masks
    imgs_to_draw = min(5, len(imgs))
    fig, axs = plt.subplots(2, imgs_to_draw, figsize=(18.5, 6))
    for i in range(imgs_to_draw):
        axs[0, i].imshow(imgs[i])
        axs[1, i].imshow(masks[i])
        axs[0, i].set_title(f'Image {i}')
        axs[1, i].set_title(f'Mask {i}')
        axs[0, i].set_axis_off()
        axs[1, i].set_axis_off()
    plt.show()

def image_to_patches(images, masks=None):
    # takes in a 4D np.array containing images and (optionally) a 4D np.array containing the segmentation masks
    # returns a 4D np.array with an ordered sequence of patches extracted from the image and (optionally) a np.array containing labels
    n_images = images.shape[0]  # number of images
    h, w = images.shape[1:3]  # shape of images
    assert (h % PATCH_SIZE) + (w % PATCH_SIZE) == 0  # make sure images can be patched exactly

    images = images[:,:,:,:3]

    h_patches = h // PATCH_SIZE
    w_patches = w // PATCH_SIZE

    patches = images.reshape((n_images, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE, -1))
    patches = np.moveaxis(patches, 2, 3)
    patches = patches.reshape(-1, PATCH_SIZE, PATCH_SIZE, 3)
    if masks is None:
        return patches

    masks = masks.reshape((n_images, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE, -1))
    masks = np.moveaxis(masks, 2, 3)
    labels = np.mean(masks, (-1, -2, -3)) > CUTOFF  # compute labels
    labels = labels.reshape(-1).astype(np.float32)
    return patches, labels


def show_patched_image(patches, labels, h_patches=25, w_patches=25):
    # reorders a set of patches in their original 2D shape and visualizes them
    fig, axs = plt.subplots(h_patches, w_patches, figsize=(18.5, 18.5))
    for i, (p, l) in enumerate(zip(patches, labels)):
        # the np.maximum operation paints patches labeled as road red
        axs[i // w_patches, i % w_patches].imshow(np.maximum(p, np.array([l.item(), 0., 0.])))
        axs[i // w_patches, i % w_patches].set_axis_off()
    plt.show()

def create_submission(test_pred, test_filenames, submission_filename):
    with open(submission_filename, 'w') as f:
        f.write('id,prediction\n')
        for fn, patch_array in zip(sorted(test_filenames), test_pred):
            img_number = int(re.search(r"satimage_(\d+)", fn).group(1))
            for i in range(patch_array.shape[0]):
                for j in range(patch_array.shape[1]):
                    f.write("{:03d}_{}_{},{}\n".format(img_number, j*PATCH_SIZE, i*PATCH_SIZE, int(patch_array[i, j])))

In [None]:
#ROOT_PATH = "/content/drive/MyDrive/Colab Notebooks/ethz-cil-road-segmentation-2024"
ROOT_PATH = "ethz-cil-road-segmentation-2024"
images = load_all_from_path(os.path.join(ROOT_PATH, 'training', 'images'))[:, :, :, :3]
masks = load_all_from_path(os.path.join(ROOT_PATH, 'training', 'groundtruth'))

## Helper Functions for image processing
These are the utility functions used for image processing --> Directly taken from the provided project notebook

In [None]:
import torch
from torch import nn
from torch.utils.tensorboard import SummaryWriter
from tqdm.notebook import tqdm

def np_to_tensor(x, device):
    # allocates tensors from np.arrays
    if device == 'cpu':
        return torch.from_numpy(x).cpu()
    else:
        return torch.from_numpy(x).contiguous().pin_memory().to(device=device, non_blocking=True)


class ImageDataset(torch.utils.data.Dataset):
    # dataset class that deals with loading the data and making it available by index.

    def __init__(self, is_train, device, use_patches=True, resize_to=(400, 400)):
        self.is_train = is_train
        self.device = device
        self.use_patches = use_patches
        self.resize_to=resize_to
        self.x, self.y, self.n_samples = None, None, None
        self._load_data()

    def _load_data(self):  # not very scalable, but good enough for now
        self.x = train_images if self.is_train else val_images
        self.y = train_masks if self.is_train else val_masks
        if self.use_patches:  # split each image into patches
            self.x, self.y = image_to_patches(self.x, self.y)
        elif self.resize_to != (self.x.shape[1], self.x.shape[2]):  # resize images
            self.x = np.stack([cv2.resize(img, dsize=self.resize_to) for img in self.x], 0)
            self.y = np.stack([cv2.resize(mask, dsize=self.resize_to) for mask in self.y], 0)
        self.x = np.moveaxis(self.x, -1, 1)  # pytorch works with CHW format instead of HWC
        self.n_samples = len(self.x)

    def _preprocess(self, x, y):
        # to keep things simple we will not apply transformations to each sample,
        # but it would be a very good idea to look into preprocessing
        return x, y

    def __getitem__(self, item):
        return self._preprocess(np_to_tensor(self.x[item], self.device), np_to_tensor(self.y[[item]], self.device))

    def __len__(self):
        return self.n_samples


def show_val_samples(x, y, y_hat, segmentation=False):
    # training callback to show predictions on validation set
    imgs_to_draw = min(5, len(x))
    if x.shape[-2:] == y.shape[-2:]:  # segmentation
        fig, axs = plt.subplots(3, imgs_to_draw, figsize=(18.5, 12))
        for i in range(imgs_to_draw):
            axs[0, i].imshow(np.moveaxis(x[i], 0, -1))
            axs[1, i].imshow(np.concatenate([np.moveaxis(y_hat[i], 0, -1)] * 3, -1))
            axs[2, i].imshow(np.concatenate([np.moveaxis(y[i], 0, -1)]*3, -1))
            axs[0, i].set_title(f'Sample {i}')
            axs[1, i].set_title(f'Predicted {i}')
            axs[2, i].set_title(f'True {i}')
            axs[0, i].set_axis_off()
            axs[1, i].set_axis_off()
            axs[2, i].set_axis_off()
    else:  # classification
        fig, axs = plt.subplots(1, imgs_to_draw, figsize=(18.5, 6))
        for i in range(imgs_to_draw):
            axs[i].imshow(np.moveaxis(x[i], 0, -1))
            axs[i].set_title(f'True: {np.round(y[i]).item()}; Predicted: {np.round(y_hat[i]).item()}')
            axs[i].set_axis_off()
    plt.show()

In [None]:
def train(train_dataloader, eval_dataloader, model, loss_fn, metric_fns, optimizer, n_epochs):
    # training loop
    logdir = './tensorboard/net'
    writer = SummaryWriter(logdir)  # tensorboard writer (can also log images)

    history = {}  # collects metrics at the end of each epoch

    for epoch in range(n_epochs):  # loop over the dataset multiple times

        # initialize metric list
        metrics = {'loss': [], 'val_loss': []}
        for k, _ in metric_fns.items():
            metrics[k] = []
            metrics['val_'+k] = []

        pbar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{n_epochs}')
        # training
        model.train()
        for (x, y) in pbar:
            optimizer.zero_grad()  # zero out gradients
            y_hat = model(x)  # forward pass
            loss = loss_fn(y_hat, y)
            loss.backward()  # backward pass
            optimizer.step()  # optimize weights

            # log partial metrics
            metrics['loss'].append(loss.item())
            for k, fn in metric_fns.items():
                metrics[k].append(fn(y_hat, y).item())
            pbar.set_postfix({k: sum(v)/len(v) for k, v in metrics.items() if len(v) > 0})

        # validation
        model.eval()
        with torch.no_grad():  # do not keep track of gradients
            for (x, y) in eval_dataloader:
                y_hat = model(x)  # forward pass
                loss = loss_fn(y_hat, y)

                # log partial metrics
                metrics['val_loss'].append(loss.item())
                for k, fn in metric_fns.items():
                    metrics['val_'+k].append(fn(y_hat, y).item())

        # summarize metrics, log to tensorboard and display
        history[epoch] = {k: sum(v) / len(v) for k, v in metrics.items()}
        for k, v in history[epoch].items():
          writer.add_scalar(k, v, epoch)
        print(' '.join(['\t- '+str(k)+' = '+str(v)+'\n ' for (k, v) in history[epoch].items()]))
        show_val_samples(x.detach().cpu().numpy(), y.detach().cpu().numpy(), y_hat.detach().cpu().numpy())

    print('Finished Training')
    # plot loss curves
    plt.plot([v['loss'] for k, v in history.items()], label='Training Loss')
    plt.plot([v['val_loss'] for k, v in history.items()], label='Validation Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()

# Baseline 1 - U-Net
This is the provided baseline U-Net with F1 score of 86%.

In [None]:
class Block(nn.Module):
    # a repeating structure composed of two convolutional layers with batch normalization and ReLU activations
    def __init__(self, in_ch, out_ch, activation='RELU'):
        super().__init__()
        self.activation = nn.ReLU() if activation == 'RELU' else nn.ELU()
        self.block = nn.Sequential(nn.Conv2d(in_channels=in_ch, out_channels=out_ch, kernel_size=3, padding=1),
                                   nn.ReLU(),
                                   nn.BatchNorm2d(out_ch),
                                   nn.Conv2d(in_channels=out_ch, out_channels=out_ch, kernel_size=3, padding=1),
                                   self.activation)

    def forward(self, x):
        return self.block(x)


class UNet(nn.Module):
    # UNet-like architecture for single class semantic segmentation.
    def __init__(self, chs=(3,64,128,256,512,1024), activation='RELU'):
        super().__init__()
        enc_chs = chs  # number of channels in the encoder
        dec_chs = chs[::-1][:-1]  # number of channels in the decoder
        if activation == 'RELU':
            self.enc_blocks = nn.ModuleList([Block(in_ch, out_ch) for in_ch, out_ch in zip(enc_chs[:-1], enc_chs[1:])])  # encoder blocks
            self.dec_blocks = nn.ModuleList([Block(in_ch, out_ch) for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # decoder blocks   
        else:
            self.enc_blocks = nn.ModuleList([Block(in_ch, out_ch, 'ELU') for in_ch, out_ch in zip(enc_chs[:-1], enc_chs[1:])])  # encoder blocks
            self.dec_blocks = nn.ModuleList([Block(in_ch, out_ch, 'ELU') for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # decoder blocks   
        
        self.pool = nn.MaxPool2d(2)  # pooling layer (can be reused as it will not be trained)
        self.upconvs = nn.ModuleList([nn.ConvTranspose2d(in_ch, out_ch, 2, 2) for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # deconvolution
        self.head = nn.Sequential(nn.Conv2d(dec_chs[-1], 1, 1), nn.Sigmoid()) # 1x1 convolution for producing the output
        self.activation = activation
    
    def forward(self, x):
        # encode
        enc_features = []
        for block in self.enc_blocks[:-1]:
            x = block(x) # pass through the block
            enc_features.append(x)  # save features for skip connections
            x = self.pool(x)  # decrease resolution
        x = self.enc_blocks[-1](x)
        # decode
        for block, upconv, feature in zip(self.dec_blocks, self.upconvs, enc_features[::-1]):
            x = upconv(x)  # increase resolution
            x = torch.cat([x, feature], dim=1)  # concatenate skip features
            x = block(x)  # pass through the block
        return self.head(x)  # reduce to 1 channel


def patch_accuracy_fn(y_hat, y):
    # computes accuracy weighted by patches (metric used on Kaggle for evaluation)
    h_patches = y.shape[-2] // PATCH_SIZE
    w_patches = y.shape[-1] // PATCH_SIZE
    patches_hat = y_hat.reshape(-1, 1, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE).mean((-1, -3)) > CUTOFF
    patches = y.reshape(-1, 1, h_patches, PATCH_SIZE, w_patches, PATCH_SIZE).mean((-1, -3)) > CUTOFF
    return (patches == patches_hat).float().mean()

def accuracy_fn(y_hat, y):
    # computes classification accuracy
    return (y_hat.round() == y.round()).float().mean()

# Baseline 2 - ResU-Net --> Road Extraction by Deep Residual U-Net
This is the provided baseline U-Net with F1 score of 86%.

In [None]:
class ResidualConv(nn.Module):
    def __init__(self, input_dim, output_dim, stride, padding):
        super(ResidualConv, self).__init__()

        self.conv_block = nn.Sequential(
            nn.BatchNorm2d(input_dim),
            nn.ReLU(),
            nn.Conv2d(
                input_dim, output_dim, kernel_size=3, stride=stride, padding=padding
            ),
            nn.BatchNorm2d(output_dim),
            nn.ReLU(),
            nn.Conv2d(output_dim, output_dim, kernel_size=3, padding=1),
        )
        self.conv_skip = nn.Sequential(
            nn.Conv2d(input_dim, output_dim, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(output_dim),
        )

    def forward(self, x):
        return self.conv_block(x) + self.conv_skip(x)
        
class ResUnet(nn.Module):
    def __init__(self, channel, filters=[64, 128, 256, 512]):
        super(ResUnet, self).__init__()

        self.input_layer = nn.Sequential(
            nn.Conv2d(channel, filters[0], kernel_size=3, padding=1),
            nn.BatchNorm2d(filters[0]),
            nn.ReLU(),
            nn.Conv2d(filters[0], filters[0], kernel_size=3, padding=1),
        )
        self.input_skip = nn.Sequential(
            nn.Conv2d(channel, filters[0], kernel_size=3, padding=1)
        )

        self.residual_conv_1 = ResidualConv(filters[0], filters[1], 2, 1)
        self.residual_conv_2 = ResidualConv(filters[1], filters[2], 2, 1)

        self.bridge = ResidualConv(filters[2], filters[3], 2, 1)

        self.upsample_1 = nn.ConvTranspose2d(filters[3], filters[3], 2, 2)
        self.up_residual_conv1 = ResidualConv(filters[3] + filters[2], filters[2], 1, 1)

        self.upsample_2 = nn.ConvTranspose2d(filters[2], filters[2], 2, 2)
        self.up_residual_conv2 = ResidualConv(filters[2] + filters[1], filters[1], 1, 1)

        self.upsample_3 = nn.ConvTranspose2d(filters[1], filters[1], 2, 2)
        self.up_residual_conv3 = ResidualConv(filters[1] + filters[0], filters[0], 1, 1)

        self.output_layer = nn.Sequential(
            nn.Conv2d(filters[0], 1, 1, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        # Encode
        x1 = self.input_layer(x) + self.input_skip(x)
        x2 = self.residual_conv_1(x1)
        x3 = self.residual_conv_2(x2)
        # Bridge
        x4 = self.bridge(x3)
        # Decode
        x4 = self.upsample_1(x4)
        x5 = torch.cat([x4, x3], dim=1)

        x6 = self.up_residual_conv1(x5)

        x6 = self.upsample_2(x6)
        x7 = torch.cat([x6, x2], dim=1)

        x8 = self.up_residual_conv2(x7)

        x8 = self.upsample_3(x8)
        x9 = torch.cat([x8, x1], dim=1)

        x10 = self.up_residual_conv3(x9)

        output = self.output_layer(x10)

        return output

# Upgrade 1 - Using Transfer Learning for the Encoder
In the architecture of the U-Net, the encoder is replaced with pretrained VGG16 model.

In [None]:
from torchvision import models
class UNetVGG(nn.Module):
    def __init__(self, vgg_features, chs=(64,64,128,256,512,512)):
        super().__init__()
        self.enc1 = nn.Sequential(*vgg_features[0:5])   # Conv1 (2 conv layers + maxpool)
        self.enc2 = nn.Sequential(*vgg_features[5:10])  # Conv2 (2 conv layers + maxpool)
        self.enc3 = nn.Sequential(*vgg_features[10:17]) # Conv3 (3 conv layers + maxpool)
        self.enc4 = nn.Sequential(*vgg_features[17:24]) # Conv4 (3 conv layers + maxpool)
        self.enc5 = nn.Sequential(*vgg_features[24:31]) # Conv5 (3 conv layers + maxpool)
        self.encoders = [self.enc1, self.enc2, self.enc3, self.enc4, self.enc5]

        dec_chs = chs[::-1]  # decoder channels in the reverse order
        self.upconvs = nn.ModuleList([nn.ConvTranspose2d(in_ch, out_ch, 2, 2) for in_ch, out_ch in zip(dec_chs[:-1], dec_chs[1:])])  # deconvolution
        self.dec_blocks = nn.ModuleList([Block(2*out_ch, out_ch) for out_ch in dec_chs[1:-1]])
        self.dec_blocks.append(Block(64,64))
        self.head = nn.Sequential(nn.Conv2d(64, 1, 1), nn.Sigmoid())

    def forward(self, x):
        # encode
        enc_features = []
        output = x
        for i, encoder in enumerate(self.encoders):
          output = encoder(output)
          enc_features.append(output)

        # decode
        output = self.upconvs[0](enc_features[-1])
        output = torch.cat((output, enc_features[3]), dim=1)
        output = self.dec_blocks[0](output)

        for block, upconv, feature in zip(self.dec_blocks[1:-1], self.upconvs[1:-1], enc_features[::-1][2:]):
          output = upconv(output)
          output = torch.cat((output, feature), dim=1)
          output = block(output)

        output = self.upconvs[-1](output)
        output = self.dec_blocks[-1](output)
        return self.head(output)  # reduce to 1 channel


### Training


In [None]:
# Hyperparameters
N_EPOCHS = 20
BATCH_SIZE = 4
RESIZE = 384

In [None]:
# Training
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_images, val_images, train_masks, val_masks = train_test_split(
    images, masks, test_size=0.2, random_state=42
)
train_patches, train_labels = image_to_patches(train_images, train_masks)
val_patches, val_labels = image_to_patches(val_images, val_masks)

# reshape the image to simplify the handling of skip connections and maxpooling
train_dataset = ImageDataset('training', device, use_patches=False, resize_to=(RESIZE, RESIZE))
val_dataset = ImageDataset('validation', device, use_patches=False, resize_to=(RESIZE, RESIZE))

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
vgg16 = models.vgg16(pretrained=True)
# Freeze all the layers
for param in vgg16.features.parameters():
    param.requires_grad = False

# Optionally, unfreeze some of the later layers (uncomment if needed)
for param in vgg16.features[24:].parameters():
    param.requires_grad = True

vgg_features = list(vgg16.features.children())
unet = UNetVGG(vgg_features)
model = unet.to(device)
loss_fn = nn.BCELoss()
metric_fns = {'acc': accuracy_fn, 'patch_acc': patch_accuracy_fn}
optimizer = torch.optim.Adam(model.parameters())
n_epochs = N_EPOCHS
train(train_dataloader, val_dataloader, model, loss_fn, metric_fns, optimizer, 1)

In [None]:
# Creating the submission on the test set
test_path = os.path.join(ROOT_PATH, 'test', 'images')
test_filenames = (glob(test_path + '/*.png'))
test_images = load_all_from_path(test_path)
batch_size = test_images.shape[0]
size = test_images.shape[1:3]
# we also need to resize the test images. This might not be the best ideas depending on their spatial resolution.
test_images = np.stack([cv2.resize(img, dsize=(RESIZE, RESIZE)) for img in test_images], 0)
test_images = test_images[:, :, :, :3]
test_images = np_to_tensor(np.moveaxis(test_images, -1, 1), device)
test_pred = [model(t).detach().cpu().numpy() for t in test_images.unsqueeze(1)]
test_pred = np.concatenate(test_pred, 0)
test_pred= np.moveaxis(test_pred, 1, -1)  # CHW to HWC
test_pred = np.stack([cv2.resize(img, dsize=size) for img in test_pred], 0)  # resize to original shape
# now compute labels
test_pred = test_pred.reshape((-1, size[0] // PATCH_SIZE, PATCH_SIZE, size[0] // PATCH_SIZE, PATCH_SIZE))
test_pred = np.moveaxis(test_pred, 2, 3)
test_pred = np.round(np.mean(test_pred, (-1, -2)) > CUTOFF)
create_submission(test_pred, test_filenames, submission_filename='unet_submission.csv')

## Upgrade 1.2 - Using Bagging

In [None]:
# Parameters
N_ESTIMATORS = 3

In [None]:
from collections import Counter
from torchvision import models
bagging_models = []

for i in range(N_ESTIMATORS):
    vgg16 = models.vgg16(pretrained=True)
    # Freeze all the layers
    for param in vgg16.features.parameters():
        param.requires_grad = False

    # Optionally, unfreeze some of the later layers (uncomment if needed)
    for param in vgg16.features[24:].parameters():
        param.requires_grad = True

    vgg_features = list(vgg16.features.children())
    train_images, val_images, train_masks, val_masks = train_test_split(
        images, masks, test_size=0.2, random_state=42
    )
    train_patches, train_labels = image_to_patches(train_images, train_masks)
    val_patches, val_labels = image_to_patches(val_images, val_masks)

    # reshape the image to simplify the handling of skip connections and maxpooling
    train_dataset = ImageDataset('training', device, use_patches=False, resize_to=(RESIZE, RESIZE))
    val_dataset = ImageDataset('validation', device, use_patches=False, resize_to=(RESIZE, RESIZE))

    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True)
    bagging_models.append(UNetVGG(vgg_features).to(device))
    loss_fn = nn.BCELoss()
    metric_fns = {'acc': accuracy_fn, 'patch_acc': patch_accuracy_fn}
    optimizer = torch.optim.Adam(bagging_models[i].parameters())
    train(train_dataloader, val_dataloader, bagging_models[i], loss_fn, metric_fns, optimizer, N_EPOCHS)

In [None]:
# Creating the predictions
from collections import Counter
predictions = []
for model in bagging_models:
    test_pred = [model(t).detach().cpu().numpy() for t in test_images.unsqueeze(1)]
    test_pred = np.concatenate(test_pred, 0)
    test_pred= np.moveaxis(test_pred, 1, -1)  # CHW to HWC
    test_pred = np.stack([cv2.resize(img, dsize=size) for img in test_pred], 0)  # resize to original shape
    # now compute labels
    test_pred = test_pred.reshape((-1, size[0] // PATCH_SIZE, PATCH_SIZE, size[0] // PATCH_SIZE, PATCH_SIZE))
    test_pred = np.moveaxis(test_pred, 2, 3)
    test_pred = np.round(np.mean(test_pred, (-1, -2)) > CUTOFF)
    predictions.append(test_pred)

stacks = np.stack((predictions[0], predictions[1], predictions[2]), dim=0).astype(int)
majority = np.apply_along_axis(lambda x: np.bincount(x).argmax(), axis=0, arr=stacks)
create_submission(majority, test_filenames, submission_filename='unet_submission.csv')

## Upgrade 2 - CGAN --> https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=8628717
In the paper, they use a simple Unet architecture. I tried transfer learning in this part (did not give a better score).

In [None]:
# Discriminator network for the GANS
class Discriminator(nn.Module):
    def __init__(self, in_channels=3):
        super().__init__()

        def discriminator_block(in_filters, out_filters, normalization=True):
            """Returns downsampling layers of each discriminator block"""
            layers = [nn.Conv2d(in_filters, out_filters, 4, stride=2, padding=1)]
            if normalization:
                layers.append(nn.InstanceNorm2d(out_filters))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *discriminator_block(in_channels, 64, normalization=False),
            *discriminator_block(64, 128),
            *discriminator_block(128, 256),
            *discriminator_block(256, 512),
            nn.ZeroPad2d((1, 0, 1, 0)),
            nn.Conv2d(512, 1, 4, padding=1, bias=False)
        )

    def forward(self, img_A, img_B):
        # Concatenate image and condition image by channels to produce input
        img_input = torch.cat((img_A, img_B), 1)
        return self.model(img_input)

In [None]:
# Losses for the discriminator and the generator
class GeneratorLoss(nn.Module):
    def __init__(self, alpha=100):
        super().__init__()
        self.alpha=alpha
        self.bce=nn.BCEWithLogitsLoss()
        self.l1=nn.L1Loss()

    def forward(self, fake, real, fake_pred):
        fake_target = torch.ones_like(fake_pred)
        loss = self.bce(fake_pred, fake_target) + self.alpha* self.l1(fake, real)
        return loss


class DiscriminatorLoss(nn.Module):
    def __init__(self,):
        super().__init__()
        self.loss_fn = nn.BCEWithLogitsLoss()

    def forward(self, fake_pred, real_pred):
        fake_target = torch.zeros_like(fake_pred)
        real_target = torch.ones_like(real_pred)
        fake_loss = self.loss_fn(fake_pred, fake_target)
        real_loss = self.loss_fn(real_pred, real_target)
        loss = (fake_loss + real_loss)/2
        return loss

In [None]:
def train_pix2pix(train_dataloader, eval_dataloader, generator, discriminator, g_loss, d_loss, metric_fns, g_optimizer, d_optimizer, n_epochs):
    # training loop
    logdir = './tensorboard/net'
    writer = SummaryWriter(logdir)  # tensorboard writer (can also log images)

    history = {}  # collects metrics at the end of each epoch

    for epoch in range(n_epochs):  # loop over the dataset multiple times

        # initialize metric list
        metrics = {'g_loss': [], 'd_loss': [], 'val_loss': []}
        for k, _ in metric_fns.items():
            metrics[k] = []
            metrics['val_'+k] = []

        pbar = tqdm(train_dataloader, desc=f'Epoch {epoch+1}/{n_epochs}')

        # Training
        generator.train()
        discriminator.train()
        for (x,y) in pbar:
            # Generator
            fake_image = generator(x)
            fake_pred = discriminator(fake_image, x)
            generator_loss = g_loss(fake_image, y, fake_pred)

            # Discriminator
            fake_image = generator(x).detach()
            fake_pred = discriminator(fake_image, x)
            real_pred = discriminator(y, x)
            discriminator_loss = d_loss(fake_pred, real_pred)


            # Performing the parameter updates
            g_optimizer.zero_grad()
            generator_loss.backward()
            g_optimizer.step()

            d_optimizer.zero_grad()
            discriminator_loss.backward()
            d_optimizer.step()

            metrics['g_loss'].append(generator_loss.item())
            metrics['d_loss'].append(discriminator_loss.item())

            for k, fn in metric_fns.items():
                metrics[k].append(fn(fake_image, y).item())
            pbar.set_postfix({k: sum(v)/len(v) for k, v in metrics.items() if len(v) > 0})

        if (epoch + 1) % 20 == 0:
            checkpoint_path = f'models/checkpoint_epoch_generator_{epoch + 1}.pth'
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': generator.state_dict(),
                'optimizer_state_dict': g_optimizer.state_dict(),
                'loss': g_loss,
            }, checkpoint_path)
            checkpoint_path = f'models/checkpoint_epoch_discriminator_{epoch + 1}.pth'
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': discriminator.state_dict(),
                'optimizer_state_dict': d_optimizer.state_dict(),
                'loss': d_loss,
            }, checkpoint_path)


        # validation
        generator.eval()
        discriminator.eval()
        with torch.no_grad():  # do not keep track of gradients
            for (x, y) in eval_dataloader:
                y_hat = generator(x)  # forward pass
                fake_pred = discriminator(y_hat, x)
                loss = g_loss(y_hat, y, fake_pred)

                # log partial metrics
                metrics['val_loss'].append(loss.item())
                for k, fn in metric_fns.items():
                    metrics['val_'+k].append(fn(y_hat, y).item())

        # summarize metrics, log to tensorboard and display
        history[epoch] = {k: sum(v) / len(v) for k, v in metrics.items()}
        for k, v in history[epoch].items():
          writer.add_scalar(k, v, epoch)
        print(' '.join(['\t- '+str(k)+' = '+str(v)+'\n ' for (k, v) in history[epoch].items()]))
        show_val_samples(x.detach().cpu().numpy(), y.detach().cpu().numpy(), y_hat.detach().cpu().numpy())

    print('Finished Training')
    # plot loss curves
    plt.plot([v['d_loss'] for k, v in history.items()], label='Discriminator Loss')
    plt.plot([v['g_loss'] for k, v in history.items()], label='Generator Loss')
    plt.plot([v['val_loss'] for k, v in history.items()], label='Validation Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()

In [None]:
# Training with default Unet
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_images, val_images, train_masks, val_masks = train_test_split(
    images, masks, test_size=0.1, random_state=42
)
train_patches, train_labels = image_to_patches(train_images, train_masks)
val_patches, val_labels = image_to_patches(val_images, val_masks)

# reshape the image to simplify the handling of skip connections and maxpooling
train_dataset = ImageDataset('training', device, use_patches=False, resize_to=(RESIZE, RESIZE))
val_dataset = ImageDataset('validation', device, use_patches=False, resize_to=(RESIZE, RESIZE))

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=3, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=3, shuffle=True)

generator = UNet().to(device)
discriminator = Discriminator(4).to(device)
generator_loss = GeneratorLoss()
discriminator_loss = DiscriminatorLoss()
metric_fns = {'acc': accuracy_fn, 'patch_acc': patch_accuracy_fn}
n_epochs = N_EPOCHS
g_optimizer = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))
#train_pix2pix(train_dataloader, val_dataloader, generator, discriminator, generator_loss, discriminator_loss, metric_fns, g_optimizer, d_optimizer, 100)

In [None]:
# Training with UNETVGG
vgg16 = models.vgg16(pretrained=True)
# Freeze all the layers
for param in vgg16.features.parameters():
    param.requires_grad = False

# Optionally, unfreeze some of the later layers (uncomment if needed)
for param in vgg16.features[24:].parameters():
    param.requires_grad = True

vgg_features = list(vgg16.features.children())

device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_images, val_images, train_masks, val_masks = train_test_split(
    images, masks, test_size=0.1, random_state=42
)
train_patches, train_labels = image_to_patches(train_images, train_masks)
val_patches, val_labels = image_to_patches(val_images, val_masks)

# reshape the image to simplify the handling of skip connections and maxpooling
train_dataset = ImageDataset('training', device, use_patches=False, resize_to=(RESIZE, RESIZE))
val_dataset = ImageDataset('validation', device, use_patches=False, resize_to=(RESIZE, RESIZE))

train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=3, shuffle=True)
val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size=3, shuffle=True)

generator = UNetVGG(vgg_features).to(device)
discriminator = Discriminator(4).to(device)
generator_loss = GeneratorLoss()
discriminator_loss = DiscriminatorLoss()
metric_fns = {'acc': accuracy_fn, 'patch_acc': patch_accuracy_fn}
n_epochs = N_EPOCHS
g_optimizer = torch.optim.Adam(generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
d_optimizer = torch.optim.Adam(discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))
#train_pix2pix(train_dataloader, val_dataloader, generator, discriminator, generator_loss, discriminator_loss, metric_fns, g_optimizer, d_optimizer, 100)

In [None]:
# Picking the best checkpoint by the validation score
checkpoint = torch.load('model.pth', map_location=torch.device('cpu'))
generator.load_state_dict(checkpoint['model_state_dict'])
generator.eval()

In [None]:
test_path = os.path.join(ROOT_PATH, 'test', 'images')
test_filenames = (glob(test_path + '/*.png'))
test_images = load_all_from_path(test_path)
batch_size = test_images.shape[0]
size = test_images.shape[1:3]
# we also need to resize the test images. This might not be the best ideas depending on their spatial resolution.
test_images = np.stack([cv2.resize(img, dsize=(RESIZE, RESIZE)) for img in test_images], 0)
test_images = test_images[:, :, :, :3]
test_images = np_to_tensor(np.moveaxis(test_images, -1, 1), device)
test_pred = [generator(t).detach().cpu().numpy() for t in test_images.unsqueeze(1)]
test_pred = np.concatenate(test_pred, 0)
test_pred= np.moveaxis(test_pred, 1, -1)  # CHW to HWC
test_pred = np.stack([cv2.resize(img, dsize=size) for img in test_pred], 0)  # resize to original shape
# now compute labels
test_pred = test_pred.reshape((-1, size[0] // PATCH_SIZE, PATCH_SIZE, size[0] // PATCH_SIZE, PATCH_SIZE))
test_pred = np.moveaxis(test_pred, 2, 3)
test_pred = np.round(np.mean(test_pred, (-1, -2)) > CUTOFF)
create_submission(test_pred, test_filenames, submission_filename='pix2pix_elu_submission.csv')

## Upgrade 2.3 -- CGAN with DCED Framework --> Road Segmentation of Remotely-Sensed Images Using Deep Convolutional Neural Networks with Landscape Metrics and Conditional Random Fields
In this framework, the writers use 4 additional ideas.
1. Using ELU activation function instead of RELU
2. Using Gaussian Smoothing and Connected Component Labeling
3. False Road Object Removal with LMs
4. Road Object Sharpening with CRFs

In [None]:
import copy

# Defining the functions for the framework
def gaussian_smoothing(kernel_size, sigma=1):
  kernel_size = int(kernel_size) // 2
  x, y = np.mgrid[-kernel_size:kernel_size+1, -kernel_size:kernel_size+1]
  normal = 1 / (2.0 * np.pi * sigma**2)
  g =  np.exp(-((x**2 + y**2) / (2.0*sigma**2))) * normal
  return g

def connected_component_labeling(prediction, gaussian_filter, threshold=128):
    mask = np.uint8(prediction*255)
    mask = cv2.filter2D(mask,-1,gaussian_filter)
    _, binary_image = cv2.threshold(np.uint8(mask), threshold, 255, cv2.THRESH_BINARY)

    num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(binary_image, connectivity=4)
    # Stats is --> https://stackoverflow.com/questions/35854197/how-to-use-opencvs-connectedcomponentswithstats-in-python
    
    #print(f"Number of labels: {num_labels}")
    #print("Stats: ")
    #print(stats)
    #print("Centroids: ")
    #print(centroids)
    return num_labels, labels, stats, centroids

def calculate_shape_index(stats):
  perimeter = 2 * (stats[2] + stats[3])
  return perimeter / (4 * math.sqrt(stats[-1]))

def remove_noise(image, num_labels, labels, stats, threshold=1.25, isprint=False):
    output = copy.deepcopy(image)
    # Map component labels to hue value
    for label in range(1, num_labels):
        mask = labels == label
        index = calculate_shape_index(stats[label].tolist())
        if isprint:
            print(label, 'and', index)
        if index < threshold:
          output[mask] = 0 # removing the object
    return output

In [None]:
from scipy.ndimage import gaussian_filter

# Defining the class for the last step of DCED framework --> CRF
class CRF():
    def __init__(self, kernel_1_weight=10, kernel_2_weight=5, alpha=60, beta=10, gamma=1, efficient=False, spatial_downsampling=15, range_downsampling=15, iterations=3):
        self.kernel_1_weight = kernel_1_weight
        self.kernel_2_weight = kernel_2_weight
        self.alpha = alpha
        self.beta = beta
        self.gamma = gamma
        self.efficient = efficient
        self.spatial_downsampling = spatial_downsampling
        self.range_downsampling = range_downsampling
        self.iterations = iterations

    def appearance_kernel(self, x_1, y_1, p_1, x_2, y_2, p_2):
        """Compute appearance kernel.
    
        Args:
            x_1: X coordinate of first pixel.
            y_1: Y coordinate of first pixel.
            p_1: Color vector of first pixel.
            x_2: X coordinate of second pixel.
            y_2: Y coordinate of second pixel.
            p_2: Color vector of second pixel.
            theta_alpha: Standard deviation for the position.
            theta_beta: Standard deviation for the color.
    
        Returns:
            The output of the appearence kernel.
        """
        result = np.exp(
        -((x_1 - x_2) ** 2.0 + (y_1 - y_2) ** 2.0) / (2 * self.alpha ** 2.0)
        - np.sum((p_1 - p_2) ** 2.0) / (2.0 * self.beta ** 2.0)
        )
        #print(f'Result of apperance kernel is {result}')
        return result


    def smoothness_kernel(self, x_1, y_1, p_1, x_2, y_2, p_2):
        """Compute smoothness kernel.
    
        Args:
            x_1: X coordinate of first pixel.
            y_1: Y coordinate of first pixel.
            p_1: Color vector of first pixel.
            x_2: X coordinate of second pixel.
            y_2: Y coordinate of second pixel.
            p_2: Color vector of second pixel.
            theta_gamma: Standard deviation for the position.
    
        Returns:
            The output of the smoothness kernel.
        """
        del p_1, p_2
        result = np.exp(
            -((x_1 - x_2) ** 2.0 + (y_1 - y_2) ** 2.0) / (2.0 * self.gamma ** 2.0)
        )
        #print(f'Result of smoothness kernel {result}')
        return result

    def normalize(self, potentials):
        """Normalize potentials such that output is a valid pixelwise distribution.
    
        Args:
            potentials: Array of potentials. Shape (H,W,N).
    
        Returns:
            Probability array with same shape as potentials.
            Probabilities sum up to 1 at every slice (i,j,:).
        """
        # Sum the potentials along the last axis (the class axis)
        sum_potentials = np.sum(potentials, axis=-1, keepdims=True)
    
        # Avoid division by zero
        sum_potentials[sum_potentials == 0] = 1
    
        # Normalize by dividing each potential by the sum of potentials at that pixel
        normalized_potentials = potentials / sum_potentials
    
        return normalized_potentials

    def message_passing(self, image, current_probabilities) :
        """Perform "message passing" as the first step of the inference loop.
    
        Args:
            image:
                Array of size ROWS x COLUMNS x CHANNELS, representing the image used to
                compute the kernel.
            current_probabilities:
                Array of size ROWS x COLUMNS x CLASSES, representing the current
                probabilities.
            kernel_functions: The kernel functions defining the edge potential.
    
        Returns:
            Array of size ROWS x COLUMNS x CLASSES x KERNELS, representing the intermediate
            result of message passing for each kernel.
        """
        # naive version
        rows = image.shape[0]
        cols = image.shape[1]
        classes = current_probabilities.shape[2] # road or not
        result = np.zeros(
            (
                current_probabilities.shape[0],
                current_probabilities.shape[1],
                classes, #1 class --> road or not
                2, # 2 kernels
            ),
            dtype=float,
        )
        
    
        # TODO implement naive message passing (using loops)
        for i in range(rows):
            for j in range(cols):
                probability_1 = 0
                probability_2 = 0
                color_vector_1 = image[i, j, :]
                for k in range(rows):
                    for l in range(cols):
                        if (i == k) and (j == l):
                            pass
                        else:
                            color_vector_2 = image[k, l, :]
                            probability_1 = probability_1 + result[k, l, 0, 0] * self.appearance_kernel(i, j, color_vector_1, k, l, color_vector_2)
                            probability_2 = probability_2 + result[k, l, 0, 1] * self.smoothness_kernel(i, j, color_vector_1, k, l, color_vector_2)
                result[i, j, 0, 0] = probability_1
                result[i, j, 0, 1] = probability_2
                #print(f'----------- {i}, {j}, {probability_1}, {probability_2}')
        return result

    def compatibility_transform(self,q_tilde):
        """Perform compatability transform as part of the inference loop.
    
        Args:
            q_tilde:
                Array of size ROWS x COLUMNS x CLASSES x KERNELS, representing the
                intermediate result of message passing for each kernel.
            weights: Weights of each kernel.
    
        Returns:
            Array of size ROWS x COLUMNS x CLASSES, representing the result after combining
            the kernels and applying the label compatability function (here: Potts model).
        """
    
        # TODO: implement compatability transform (try with matrix operations only)
        weights = [self.kernel_1_weight, self.kernel_2_weight]
        q_tilde[..., 0] *= weights[0] 
        q_tilde[..., 1] *= weights[1]
        result = np.sum(q_tilde, axis=-1)
        return result

    def get_unary_potential(self, image):
        return -np.log(image)

    def local_update(self, q_hat, unary_potential):
        """Perform local update as part of the interefence loop.
    
        Args:
            q_hat:
                Array of size ROWS x COLUMNS x CLASSES, representing the intermediate result
                after combining the kernels and applying the label compatability function.
            unary_potential:
                Array of size ROWS x COLUMNS x CLASSES, representing the prior energy for
                each pixel and class from a different source.
        Returns:
            Array of size ROWS x COLUMNS x CLASSES, representing the probabilities for each
            pixel and class.
        """
        result = np.exp(-unary_potential - q_hat)
        #print(f'Local update result is {result}')
        return np.exp(-unary_potential - q_hat)

    def efficient_message_passing(self, image, current_probabilities):
        """Perform efficient "message passing" by downsampling and convolution in 5D.
    
        This assumes two kernels: an appearance kernel based on theta_alpha and theta_beta,
        and a smoothness kernel based on theta_gamma.
    
        Args:
            image:
                Array of size ROWS x COLUMNS x CHANNELS, representing the image used to
                compute the kernel.
            current_probabilities:
                Array of size ROWS x COLUMNS x CLASSES, representing the current
                probabilities.
            spatial_downsampling:
                Factor to downsample the spatial dimensions for the 5D representation.
            range_downsampling:
                Factor to downsample the range dimensions for the 5D representation.
            theta_alpha: Spatial standard deviation for the appearance kernel.
            theta_beta: Color standard deviation for the appearance kernel.
            theta_gamma: Spatial standard deviation for the smoothness kernel.
    
        Returns:
            Array of size ROWS x COLUMNS x CLASSES x KERNELS, representing the intermediate
            result of message passing for each kernel.
        """
        #t_0 = time.time()
    
        rows = image.shape[0]
        cols = image.shape[1]
        classes = current_probabilities.shape[2]
        color_range = 255
    
        ds_rows = int(np.ceil(rows / self.spatial_downsampling))
        ds_cols = int(np.ceil(cols / self.spatial_downsampling))
        ds_range = int(np.ceil(color_range / self.range_downsampling))
    
        #print(f"Downsampled to: {ds_rows}x{ds_cols}x{ds_range}")
    
        result = np.zeros(
            (
                current_probabilities.shape[0],
                current_probabilities.shape[1],
                current_probabilities.shape[2],
                2,
            ),
            dtype=float,
        )
    
        # Precompute indices
        indices_list = []
        for row in np.arange(rows):
            for col in np.arange(cols):
                indices_list.append(
                    (row, col, image[row, col, 0], image[row, col, 1], image[row, col, 2])
                )
        indices_list = np.array(indices_list, dtype=float)
        indices_list[:, 0:2] = indices_list[:, 0:2] / float(self.spatial_downsampling)
        indices_list[:, 2:] = indices_list[:, 2:] / float(self.range_downsampling)
        indices_list = np.round(indices_list).astype(int)

        for class_id in np.arange(classes):
            # Allocate 5D feature space
            feature_space = np.zeros((ds_rows+1, ds_cols+1, ds_range+1, ds_range+1, ds_range+1))
            # Downsample with box filter and go to 5D space at same time
            for row in np.arange(rows):
                for col in np.arange(cols):
                    idx = indices_list[row * cols + col]
                    feature_space[idx[0], idx[1], idx[2], idx[3], idx[4]] += current_probabilities[row, col, 0]
    
            for kernel_id in np.arange(2):
                if kernel_id == 0:  # Appearance kernel
                    # Apply appearance kernel as a Gaussian filter
                    filtered_feature_space = gaussian_filter(feature_space, sigma=[self.alpha , self.alpha, self.beta, self.beta, self.beta])
        
                if kernel_id == 1:  # Smoothness kernel
                    # Apply smoothness kernel as a Gaussian filter
                    filtered_feature_space = gaussian_filter(feature_space, sigma=[self.gamma, self.gamma, 0, 0, 0])
        
                # Upsample with simple lookup (no interpolation for simplicity)
                for row in np.arange(rows):
                    for col in np.arange(cols):
                        idx = indices_list[row * cols + col]
                        result[row, col, 0, kernel_id] = filtered_feature_space[idx[0], idx[1], idx[2], idx[3], idx[4]]
        
        #t_1 = time.time()
        #print(f"Efficient message passing took {t_1-t_0}s")
    
        return result
    
    def inference(self, image, initial_probabilities):
        """Perform inference in fully connected CRF with Gaussian edge potentials.
    
        Args:
            image:
                Array of size ROWS x COLUMNS x CHANNELS, representing the image used the
                features.
            initial_probabilities:
                Initial pixelwise probabilities for each class. Used to initialize unary
                potential.
            params:
                Parameter class for fully connected CRFs (see CrfParameters documentation).
        Return:
            Array of size ROWS x COLS x CLASSES
        """
        # initialize
        current_probabilities = initial_probabilities
    
        unary_potential = -np.log(current_probabilities)
    
        for _ in np.arange(self.iterations):
            if self.efficient:
                q_tilde = self.efficient_message_passing(image,current_probabilities)
            else:
                q_tilde = self.message_passing(image, current_probabilities)
            q_hat = self.compatibility_transform(q_tilde)
            unnormalized_probabilities = self.local_update(q_hat, unary_potential)
            #print(unnormalized_probabilities)
            current_probabilities = self.normalize(unnormalized_probabilities)
            #print(current_probabilities)
            print('Iteration completed')
    
        return current_probabilities
        
        

In [None]:
# Creating the submission on the test set
model = generator
test_path = os.path.join(ROOT_PATH, 'training', 'images')
test_filenames = (glob(test_path + '/*.png'))
test_images = load_all_from_path(test_path)
test_images2 = load_all_from_path(test_path)
batch_size = test_images.shape[0]
size = test_images.shape[1:3]
# we also need to resize the test images. This might not be the best ideas depending on their spatial resolution.
test_images = np.stack([cv2.resize(img, dsize=(RESIZE, RESIZE)) for img in test_images], 0)
test_images = test_images[:, :, :, :3]
test_images = np_to_tensor(np.moveaxis(test_images, -1, 1), device)
test_pred = [model(t).detach().cpu().numpy() for t in test_images.unsqueeze(1)]
test_pred = np.concatenate(test_pred, 0)
test_pred= np.moveaxis(test_pred, 1, -1)  # CHW to HWC
test_pred = np.stack([cv2.resize(img, dsize=size) for img in test_pred], 0)  # resize to original shape
test_pred = test_pred.reshape(test_pred.shape[0], test_pred.shape[1], test_pred.shape[2], 1)

# now compute labels
#output = output.reshape(output.shape[0], output.shape[1], output.shape[2])
# now compute labels
#output = output.reshape((-1, size[0] // PATCH_SIZE, PATCH_SIZE, size[0] // PATCH_SIZE, PATCH_SIZE))
#output = np.moveaxis(output, 2, 3)
#output = np.round(np.mean(output, (-1, -2)) > CUTOFF)
#create_submission(output, test_filenames, submission_filename='pix2pix_dced_submission.csv')

In [None]:
# Applying gaussian blur + ccl + lm
filter_size = 19 # Rule of thumb: size is 6 times standard deviation
gaussian_filter = gaussian_smoothing(filter_size, sigma=3)

output = []
output_image = np.zeros((test_pred[4].shape[0], test_pred[20].shape[1], 3), dtype=np.uint8)

for i in range(test_pred.shape[0]):
    num_labels, labels, stats, centroids = connected_component_labeling(test_pred[i], gaussian_filter, threshold=128)
    lm_output = None
    if i == 4:
        # Map component labels to hue value
        for label in range(1, num_labels):
            mask = labels == label
            color = np.random.randint(0, 255, size=3)
            output_image[mask] = color
        lm_output = remove_noise(test_pred[i], num_labels, labels, stats, 1.3)
    else:
        lm_output = remove_noise(test_pred[i], num_labels, labels, stats, 1.3)
    output.append(lm_output)

In [None]:
output = np.array(output)
print(output.shape)

In [None]:
test_images2 = test_images2[:, :, :, :3]
test_images2 = test_images2*255

In [None]:
non_road_probabilities = 1 - output
combined_probabilities = np.concatenate((output, non_road_probabilities), axis=-1)
combined_probabilities.shape

In [None]:
crf = CRF(efficient=True, iterations=10)

In [None]:
crf2 = CRF(efficient=True, iterations=10, spatial_downsampling=1) #alpha=60, beta=10,

In [None]:
crfed = crf.inference(test_images2[0], combined_probabilities[0])

In [None]:
crfed_2 = crf2.inference(test_images2[0], combined_probabilities[0])

In [None]:
np.max(crfed)

In [None]:
# Show the original and labeled images
plt.figure(figsize=(12, 6))
plt.subplot(1, 4, 1)
plt.title('Original Image')
plt.imshow(crfed_2[:,:,0])
plt.subplot(1, 4, 2)
plt.title('Ilk Output')
plt.imshow(test_pred[0])
plt.subplot(1, 4, 3)
plt.title('LM Output')
plt.imshow(output[0])
plt.title('CRF Output')
plt.subplot(1, 4, 4)
plt.imshow(crfed[:,:,0])
plt.show()

In [None]:
# Show the original and labeled images
plt.figure(figsize=(12, 6))
plt.subplot(1, 4, 1)
plt.title('Original Image')
plt.imshow(test_pred[0])
plt.subplot(1, 4, 2)
plt.title('Labeled Components')
plt.imshow(output[0])
plt.subplot(1, 4, 3)
plt.imshow(crfed)
plt.show()