In [1]:
import os
import sys
import cv2
import wget
import glob
import wandb
import shutil
import numpy as np
import numpy.testing as npt
import torch
import torchvision
import matplotlib.pyplot as plt
from torchvision.transforms import v2
from torchvision.datasets import VisionDataset
from torch.utils.data import Subset, DataLoader
from torchvision.datasets.utils import check_integrity, download_and_extract_archive, download_url, verify_str_arg
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import shutil
from PIL import Image,ImageFile
import glob
import os
import sys
import torch
from torch import Tensor
import torch.testing as torch_testing
from torch.utils.data import Subset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
import torchvision
import torchvision.transforms as transforms
import numpy as np
import numpy.testing as npt
from functools import reduce
import matplotlib.pyplot as plt
import os
import pickle
import cv2
from dataclasses import dataclass
import tqdm
from sklearn.model_selection import train_test_split
from torch.utils.data.dataloader import default_collate
import pprofile
from torchvision.models import resnet18, ResNet18_Weights
from sklearn.metrics import confusion_matrix
import seaborn as sn
import pandas as pd

In [2]:
# torch.multiprocessing.set_start_method('spawn')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Add this line if you don't want to use GPU
device = torch.device('cpu')
torch.set_default_device(device)

`segmentation mini-project` CNN architecture, training loop, perform hyperparameter search and evaluate the best segmentation module. Inspired by the [U-Net](https://arxiv.org/abs/1505.04597) architecture.

In [3]:
def upsample_block(x, number_of_filters, filter_size, stride = 2):
    output_channels = x.shape[1] // 2
    x = torch.nn.ConvTranspose2d(in_channels=x.shape[1], out_channels=output_channels, kernel_size=filter_size, stride=stride)(x)
    x = torch.nn.BatchNorm2d(x.shape[1])(x)
    x = torch.nn.ReLU()(x)
    return x

in_layer = torch.rand((32, 32, 128, 128))

filter_size = 4
number_of_filters = 16

expected_shapes = [
    (32, 16, 258, 258),
    (32, 16, 512, 512),
    (32, 16, 1020, 1020)
]
strides = [2, 4, 8]

for expected_shape, stride in zip(expected_shapes, strides):
    x = upsample_block(in_layer, number_of_filters, filter_size, stride)
    print('in shape: ', in_layer.shape, ' upsample with filter size ', filter_size, '; stride ', stride, ' -> out shape ', x.shape)
    npt.assert_array_equal(x.shape, expected_shape)

in shape:  torch.Size([32, 32, 128, 128])  upsample with filter size  4 ; stride  2  -> out shape  torch.Size([32, 16, 258, 258])
in shape:  torch.Size([32, 32, 128, 128])  upsample with filter size  4 ; stride  4  -> out shape  torch.Size([32, 16, 512, 512])
in shape:  torch.Size([32, 32, 128, 128])  upsample with filter size  4 ; stride  8  -> out shape  torch.Size([32, 16, 1020, 1020])


`down-sampling` class Encoder. Each block: 2 convolution layers with 3 filter_size and a non liniar ReLu. Blocks separated by max pooling with size of 2 and stride of 2.

Parameters:
`channel_numbers`: list of integers, the number of channels used for each encoder block.

`retrun`: list of tensors, the output of each encoder block.

Diagram used in tests:
<img src="https://miro.medium.com/max/1400/1*J3t2b65ufsl1x6caf6GiBA.png" style="width:40vw; margin-left:20vw"/>

In [4]:
class DoubleConvolution(nn.Module):
    def __init__(self, in_channels: int, out_channels: int):
        super(DoubleConvolution, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x: Tensor) -> Tensor:
        return self.conv(x)

in_layer = torch.rand((1, 1, 512, 512)) # 1 number of images, 1 channel, 572x572 pixels
expected_shape = (1, 64, 512, 512)
model = DoubleConvolution(1, 64)
in_layer = model(in_layer)
print('in shape: ', in_layer.shape, ' out shape ', in_layer.shape)
npt.assert_array_equal(in_layer.shape, expected_shape)

in shape:  torch.Size([1, 64, 512, 512])  out shape  torch.Size([1, 64, 512, 512])


In [14]:
class DownSamplingNeuralNetwork(nn.Module): # Encoder
    def __init__(self, in_channels=1, features=[64,128,256,512]):
        super(DownSamplingNeuralNetwork, self).__init__()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        for feature in features:
            self.downs.append(DoubleConvolution(in_channels, feature))
            in_channels = feature

    def forward(self, x):
        skips = []
        for down in self.downs:
            x = down(x)
            skips.append(x)
            x = self.pool(x)
        return skips, x

# Test the down sampling network
in_layer = torch.rand((1, 1, 512, 512)) # 1 number of images, 1 channel, 572x572 pixels
expected_shape = (1, 512, 64, 64)
model = DownSamplingNeuralNetwork(in_channels=1,features=[64,128,256,512])
skips, _ = model(in_layer)
print('in shape: ', in_layer.shape, ' out shape ', skips[-1].shape)
npt.assert_equal(skips[-1].shape, expected_shape)

in shape:  torch.Size([1, 1, 512, 512])  out shape  torch.Size([1, 512, 64, 64])


`up-sapling` class Decoder.

Constructor parameters:`depth`: the depth of each decoder module

Forward parameters:

`x`: the input feature map
`encoder_activations`: a list of activations from the encoder (for the skip connections)

`Forward function`:

1. up-sampling operation normalization and ReLU
2. crop the activation map (use `CenterCrop`): to be the same size as the decoder block
3. concatenate these two activation maps (on the channel dimension, 1)
4. apply an encoder block on the result
5. pass the result to the next decoder block

In [15]:
class UpSamplingNeuralNetwork(nn.Module):
    def __init__(self, out_channels=1, features=[64,128,256,512]):
        super(UpSamplingNeuralNetwork, self).__init__()
        self.ups = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2)
            )
            self.ups.append(DoubleConvolution(feature*2, feature))
        
        self.bottleneck = DoubleConvolution(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
    
    def forward(self, x: Tensor, skip_connections) -> Tensor:
        # x = self.pool(x)
        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]
        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]
            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])
            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)
        return self.final_conv(x)

in_layer = torch.rand((1, 512, 64, 64))
expected_shape = (1, 1, 512, 512)
model = UpSamplingNeuralNetwork()
output = model(in_layer, skips)
print('in shape: ', in_layer.shape, ' out shape ', output.shape)
npt.assert_equal(output.shape, expected_shape)



in shape:  torch.Size([1, 512, 64, 64])  out shape  torch.Size([1, 1, 512, 512])


In [16]:
class UNET_BY_UP_AND_DOWN(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, features=[64,128,256,512]):
        super(UNET_BY_UP_AND_DOWN, self).__init__()
        self.downs = DownSamplingNeuralNetwork(in_channels, features)
        self.ups = UpSamplingNeuralNetwork(out_channels, features)
    
    def forward(self, x):
        skips, x = self.downs(x)
        return self.ups(x, skips)

in_layer = torch.rand((1, 1, 512, 512))
model = UNET_BY_UP_AND_DOWN(in_channels=1, out_channels=1)
output = model(in_layer)
print(output.shape)
print(in_layer.shape)
assert output.shape == in_layer.shape

torch.Size([1, 1, 512, 512])
torch.Size([1, 1, 512, 512])


`UNET` single class - simpler approach

In [17]:
class UNET(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, features=[64,128,256,512]):
        super(UNET, self).__init__()
        self.downs = nn.ModuleList()
        self.ups = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Down part of UNET
        for feature in features:
            self.downs.append(DoubleConvolution(in_channels, feature))
            in_channels = feature
        
        # Up part of UNET
        for feature in reversed(features):
            self.ups.append(
                nn.ConvTranspose2d(feature*2, feature, kernel_size=2, stride=2)
            )
            self.ups.append(DoubleConvolution(feature*2, feature))
        
        self.bottleneck = DoubleConvolution(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)
    
    def forward(self, x: Tensor) -> Tensor:
        skip_connections = []
        
        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)
        
        x = self.bottleneck(x)
        skip_connections = skip_connections[::-1]
        
        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:])

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)
        
        return self.final_conv(x)

in_layer = torch.rand((1, 1, 512, 512))
model = UNET(in_channels=1, out_channels=1)
output = model(in_layer)
print(output.shape)
print(in_layer.shape)
assert output.shape == in_layer.shape

torch.Size([1, 1, 512, 512])
torch.Size([1, 1, 512, 512])


`pre-trained` model: pre-trained weights on `ImageNet` and "freeze these weights during the training process (set `required_grad=False` for those tensors).

`Problem`: We neet to create the skip connections required by the U-Net architecture we need access to the feature maps of some intermediate layers in the network and these are not accessible by default.