## FHNW bverI - HS2023

In [None]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Segmentation

## Lernziele

- Semantic Segmentation: Architektur-Design, Trainieren und Evaluieren von Modellen
- Upsampling: Techniken kennen und anwenden
- Instance Segmentation: Anwenden von Pre-Trained Modellen, Verstehen & Evaluieren der Outputs

## Setup

Im Folgenden installieren und laden wir die benötigten Python packages. Danach setzten wir die Pfade für den Zugriff auf Daten und spezifizieren einen Output-Folder.

Mount your google drive / define a data path to store data and results.

In [None]:
import os
from pathlib import Path

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

print(f"In colab: {IN_COLAB}")

In [None]:
if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')

Modifizieren Sie die folgenden Pfade bei Bedarf.

In [None]:
if IN_COLAB:
    DATA_PATH = Path('/content/drive/MyDrive/bverI/data')
else:
    DATA_PATH = Path('../data')

Install packages not in base Colab environment.

In [None]:
if IN_COLAB:
    os.system("pip install torchshow torchinfo gdown")

In [None]:
import gdown
from IPython.display import Image
from matplotlib import pyplot as plt
import numpy as np
from PIL import Image
import seaborn as sns
import torch
import torchshow as ts
from tqdm.notebook import tqdm

## Semantic Segmentation

Hier werden Sie ein _Fully-Convolutional Network_ trainieren für _Semantic Segmentation_.

### Stanford Background Dataset

Wir schauen uns das [Stanford Background Dataset](http://dags.stanford.edu/projects/scenedataset.html) an. Dort gibt es verschiedene Szenerien mit _semantic segmentation_ Annotationen.

Als erstes laden wir den Datensatz runter.


In [None]:
import gdown

file_id = "1bXWW8v-vASZ6dUv2CchhrbvyQU4uE2dk"
url = f"https://drive.google.com/uc?id={file_id}"

download_path = DATA_PATH / "stanford_background_dataset.zip"
if not download_path.exists():
    gdown.download(url, str(download_path), quiet=False)

In [None]:
if not (DATA_PATH / "stanford_background_dataset").exists():
    CMD = f"unzip {str(download_path)} -d {DATA_PATH}"
    os.system(CMD)

Nun schauen wir uns eines der Bilder an:

DATA_PATH.joinpath("stanford_background_dataset/images/0000047.jpg")

In [None]:
img_sbds = Image.open(
    DATA_PATH.joinpath("stanford_background_dataset/images/0000047.jpg"))
label_path = DATA_PATH.joinpath(
    "stanford_background_dataset/labels/0000047.regions.txt")
display(img_sbds)

Wir definieren nun ein `torch.utils.data.Dataset` um ein Bild und dessen _Segmentation Map_ auszugeben.

In [None]:
from pathlib import Path
import numpy as np
import torch
from PIL import Image
from torch.utils.data import Dataset


class StanfordBackgroundDataset(Dataset):
    """
    Dataset class for the Stanford Background Dataset.
    """

    def __init__(self, root_path: Path, transform=None, transform_labels=None):
        """
        Initializes the dataset.

        Args:
            root_path (Path): Path to the dataset directory.
            transform (callable, optional): Transformation function for images.
            transform_labels (callable, optional): Transformation function for labels.
        """
        self.root_path = root_path
        self.transform = transform
        self.transform_labels = transform_labels
        self.image_paths = list(root_path.joinpath("images").glob("*.jpg"))
        self.classes = ["sky", "tree", "road", "grass", "water", "building", "mountain", "foreground object"]

    def __len__(self):
        """ Returns the number of items in the dataset. """
        return len(self.image_paths)

    def __getitem__(self, idx):
        """
        Retrieves an item by its index.

        Args:
            idx (int): Index of the item.

        Returns:
            tuple: Tuple containing the image, label mask, and label image.
        """
        image_path = self.image_paths[idx]
        image = np.array(Image.open(image_path))

        label_path = self.root_path.joinpath(f"labels/{image_path.stem}.regions.txt")
        labels = self._parse_regions(label_path)

        mask = torch.zeros(len(self.classes), *labels.shape)
        labels_tensor = torch.tensor(labels).unsqueeze(0)
        labels_clipped = torch.clip(labels_tensor, 0, len(self.classes) - 1)
        label_masks = mask.scatter_(0, labels_clipped, 1)

        if self.transform:
            image = self.transform(image)

        if self.transform_labels:
            label_masks = self.transform_labels(label_masks)
            labels_tensor = self.transform_labels(labels_tensor)

        return image, label_masks, labels_tensor

    def _parse_regions(self, path):
        """
        Parses the region labels from a file.

        Args:
            path (Path): Path to the label file.

        Returns:
            np.ndarray: Array of labels.
        """
        with open(path, "r") as file:
            lines = [list(map(int, line.split())) for line in file]
        return np.array(lines)

Instanzieren Sie das Dataset mit der Klasse `StanfordBackgroundDataset`. Wählen Sie danach die erste Beobachtung aus und visualisieren Sie: Bild, Segmentation Map und Masken. Verwenden Sie `torchshow.show`.

Überprüfen Sie ob die Daten korrekt aussehen.

In [None]:
root_path = DATA_PATH.joinpath("stanford_background_dataset")

# YOUR CODE HERE
raise NotImplementedError()

### Fully-Convolutional Network

Implementieren Sie ein FCN mit einer Encoder-Decoder Architektur. Ergänzen Sie die Klassen entsprechend.

In [None]:
from torch import nn
from torch.nn import functional as F

class EncoderDecoder(nn.Module):
    """ Encoder-Decoder """
    def __init__(self, encoder, decoder, num_initial_channels, num_input_channels, num_output_channels):
        super().__init__()
        self.input = nn.Conv2d(3, num_initial_channels, kernel_size=(3, 3), stride=1, padding=1)
        self.encoder = encoder
        self.decoder = decoder
        self.output = nn.Conv2d(num_input_channels,  num_output_channels, kernel_size=(1, 1), stride=1, padding=0)

    def forward(self, x):
        # YOUR CODE HERE
        raise NotImplementedError()
        return x

class EncoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.conv = nn.Conv2d(in_channels, in_channels, kernel_size=(3, 3), stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels, out_channels, kernel_size=(3, 3), stride=2, padding=1)
    def forward(self, x):
        x = self.conv(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        return x

class DecoderBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.conv = nn.Conv2d(in_channels, in_channels, kernel_size=(3, 3), stride=1, padding=1)
        # Transposed Convolution Layer
        # YOUR CODE HERE
        raise NotImplementedError()
    def forward(self, x):
        x = self.conv(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        return x

class Encoder(nn.Module):
    def __init__(self, num_channels):
        super().__init__()
        self.layers_ = nn.ModuleList()
        for i, (in_channels, out_channels) in enumerate(zip(num_channels, num_channels[1:])):
            self.layers_.append(EncoderBlock(in_channels=in_channels, out_channels=out_channels))
            
    def forward(self, x):
        for layer in self.layers_:
            x = layer(x)
        return x

class Decoder(nn.Module):
    def __init__(self, num_channels):
        super().__init__()
        self.layers_ = nn.ModuleList()
        for i, (in_channels, out_channels) in enumerate(zip(num_channels, num_channels[1:])):
            self.layers_.append(DecoderBlock(in_channels=in_channels, out_channels=out_channels))

    def forward(self, x):
        for layer in self.layers_:
            x = layer(x)
        return x


Überprüfen Sie die Architektur. Z.B. das die Output-Shape korrekt ist. Wir möchten pro Klasse eine eigene Maske erstellen.

In [None]:
from torchvision import transforms

transf = transforms.Compose([
    transforms.ToTensor(),
    transforms.CenterCrop((200, 300)),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    transforms.ConvertImageDtype(torch.float),
])

transf_labels = transforms.Compose([
    transforms.CenterCrop((200, 300))
])

ds = StanfordBackgroundDataset(root_path, transform=transf)

example_image, example_masks, example_labels = ds[0]

# Create / instantiate your model and process an example image with it

# YOUR CODE HERE
raise NotImplementedError()

### Model-Training und Metriken

Nun werden wir das Modell trainieren und den Fortschritt monitoren.

In [None]:
from torchvision import transforms

batch_size=4

transf = transforms.Compose([
    transforms.ToTensor(),
    transforms.CenterCrop((200, 300)),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    transforms.ConvertImageDtype(torch.float),
])

transf_labels = transforms.Compose([
    transforms.CenterCrop((200, 300))
])

ds = StanfordBackgroundDataset(root_path, transform=transf, transform_labels=transf_labels)

ds_loader = torch.utils.data.DataLoader(ds, batch_size=batch_size, shuffle=True, num_workers=1)


Ergänzen Sie den Trainings-Loop wo nötig.

In [None]:
import torch.optim as optim

torch.manual_seed(123)

# Parameters
num_epochs = 8

# create model
# YOUR CODE HERE
raise NotImplementedError()

# Create Loss-Function and Optimizer
# YOUR CODE HERE
raise NotImplementedError()

pbar = tqdm(total=num_epochs * len(ds_loader))

step = 0
for epoch in range(0, num_epochs):
    running_loss = 0.0
    running_acc = 0.0
    for i, data in enumerate(ds_loader):
        
        images, label_masks, label_images = data
        
        # Forward-Pass
        # YOUR CODE HERE
        raise NotImplementedError()
        
        # Optimize
        # YOUR CODE HERE
        raise NotImplementedError()
        
        # Calculate Pixel-Accuracy
        # YOUR CODE HERE
        raise NotImplementedError()

        # print statistics
        running_loss += loss.item()
        running_acc += pixel_acc
        step += 1
        print_every = 10
        if (i % print_every) == (print_every - 1):
            desc = f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / print_every:.3f} acc: {running_acc / print_every:.3f}'
            _ = pbar.update(print_every)
            _ = pbar.set_description(desc)
            running_loss = 0.0
            running_acc = 0.0
pbar.close()

print('Finished Training')


Visualisieren Sie die Vorhersage auf einem Bild und vergleichen Sie mit der annotierten _Segmentation map_.

Sie können für die Visualisierung `torchshow` verwenden.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

## Upsampling

Hier schauen wir verschiedene Upsampling Techniken an.

In [None]:
to_upsample = torch.tensor(
    [[1, 2], [3, 4]]).unsqueeze(0).to(torch.float)

to_upsample_2 = torch.concat(
    [to_upsample, to_upsample], dim=2)

to_upsample_2 = torch.concat(
    [to_upsample_2, to_upsample_2], dim=1)

    
def display_arrays(arrays: list[np.ndarray], titles: list[str]):
    """ Display Arrays """
    num_arrays = len(arrays)
    kwargs = {'annot': True, 'cbar': False, 'vmin': 0, 'vmax': 10, 'xticklabels': False, 'yticklabels': False}
    fig, ax = plt.subplots(figsize=(3 * num_arrays, 3), ncols=num_arrays)
    
    # handle single and multi-array plots
    if num_arrays > 1:
        axes = ax.flatten()
    else:
        axes = [ax]
    
    for i, (array, title) in enumerate(zip(arrays, titles)):
        sns.heatmap(array, **kwargs, ax=axes[i]).set(
            title=f"{title} - Shape {array.shape}")

    plt.show()

display_arrays([np.array(to_upsample[0, :, :])], ["input"])

### Unpooling

Testen Sie Max-Pooling und Max-Unpooling mit Switch. Probieren Sie verschiedene Parameter aus und schauen Sie sich das Ergebnis an.

In [None]:
from torch import nn

to_upsample_3 = torch.clone(to_upsample_2)
to_upsample_3[0, 0, 0] = 16

pool = nn.MaxPool2d(2, stride=2, return_indices=True)
unpool = nn.MaxUnpool2d(2, stride=2)
output, indices = pool(to_upsample_3)
unpooled = unpool(output, indices)

display_arrays(arrays=[
    np.array(to_upsample_3[0, :, :]),
    np.array(output[0, :, :]),
    np.array(unpooled[0, :, :])],
    titles=["input", "pooled", "unpooled"]
)
    

### Transpose Convolution

Testen Sie verschiedene Parameter für die _Transposed Convolution_. Erstellen Sie 2 weitere Varianten und visualisieren Sie die Ergebnisse.

In [None]:
from torch.nn import functional as F

weight = torch.tensor([[1, 2, 3], [0, 1, 2], [0, 1, 2]]).unsqueeze(0).unsqueeze(0).to(torch.float)
weight.shape

input_ = to_upsample_2

out = F.conv_transpose2d(
    input=input_,
    weight=weight,
    stride=2,
    padding=0,
    output_padding=0)

# YOUR CODE HERE
raise NotImplementedError()

arrays_to_plot = [np.array(x) for x in [
    input_[0, : :], weight[0, 0, : :], out[0, : :], out2[0, : :], out3[0, : :]]]

display_arrays(
    arrays=arrays_to_plot,
    titles=["Input", "Filter", "Output", "Output2", "Output3"])


## Instance Segmentation mit _Mask R-CNN_

In dieser Aufgabe wenden Sie Instance Segmentation an indem Sie ein vortrainiertes Modell verwenden.

Als erstes laden wir 2 Bilder um Instance Segmentation auszuprobieren. Danach lesen wir die Bilder ein mi `Pillow`.

In [None]:
files = [
    {'id': '18zuHwfojUUpmkrQttEtuaNW-MQ0QOoAH',  'name': 'ducks.jpg'},
    {'id': '1-UWVWqTpE80Qxh36hPuKkuQZj5BT3hXr', 'name': 'dogs.jpg'}
]

for file in files:
    url = f"https://drive.google.com/uc?id={file['id']}"
    download_path = DATA_PATH / file['name']
    if not download_path.exists():
        gdown.download(url, str(download_path), quiet=False)

In [None]:
img_dogs = Image.open(DATA_PATH.joinpath("dogs.jpg"))
img_ducks = Image.open(DATA_PATH.joinpath("ducks.jpg"))

Wenden Sie ein vortrainiertes Modell der _Mask R-CNN_ Familie von [torchvision](https://pytorch.org/vision/stable/models.html#instance-segmentation) an. Initialisieren Sie das Modell und führen Sie einen Forward Pass aus über die beiden Beispiel-Bilder.

In [None]:
from torchvision.models.detection import maskrcnn_resnet50_fpn, MaskRCNN_ResNet50_FPN_Weights

model = maskrcnn_resnet50_fpn(weights=MaskRCNN_ResNet50_FPN_Weights.DEFAULT)
weights = MaskRCNN_ResNet50_FPN_Weights.DEFAULT
transforms = weights.transforms()

# YOUR CODE HERE
raise NotImplementedError()

Inspizieren Sie den Output von _Mask R-CNN_. Wieviele Objekte hat das Modell gefunden? Was wird alles ausgegeben?

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Zeichnen Sie die gefundenen Objekte auf das Bild. Probieren Sie verschiedene Werte für `score_threshold` und `proba_threshold`. Was bedeuten die Werte?

In [None]:
from torchvision.utils import draw_segmentation_masks

score_threshold = .75
proba_threshold = 0.5

boolean_masks = [
    out['masks'][out['scores'] > score_threshold] > proba_threshold
    for out in output
]

images_with_masks = [
    draw_segmentation_masks(torch.tensor(np.array(img)).permute(2, 0, 1), mask.squeeze(1))
    for img, mask in zip(images_list, boolean_masks)
]
ts.show(images_with_masks)