# Dataset creation 

In this npotebook we create the `Dataset` object from pytorch.

The `Dataset` we want to create is based on metadata we have buolt, and is respobsible to load the two `indoor` and `outdoor` elements and mixing them on the fly.  

In [None]:
import os
import torch
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

from src.utils.const import *

# Ignore warnings
import warnings
warnings.filterwarnings("ignore")

plt.ion()   # interactive mode

from pathlib import Path
from typing import List

## Inversion dataset

In [3]:
class InvDataset(torch.utils.data.Dataset):

    def __init__(self, metapath: Path = BASEPATH / Path('data/processed/meta'),
                       inversionspath: Path = BASEPATH / Path('data/interim/inversions'),
                       task: str = '2-class',
                       only_real: bool = True, 
                       remove: List = []) -> None:

        self.metapath = metapath
        # tasks that can be conducted are:
        # - '2-class'     - classification [mine / clutter]
        # - '3-class'   - classification [mine / clutter / arch]
        # - 'fg-class'  - fine-grain classification [vs50, wood-cylinder, ...]
        self.task = task
        self.only_real = only_real

        self.indoorpath = inversionspath / Path('indoor')
        self.outdoorpath = inversionspath / Path('outdoor')

        self.csv = pd.read_csv(metapath / Path('mix.csv'))
        # filter out some
        self.csv = self.csv[~self.csv['in_id'].isin(remove)]
        
        
    def __getitem__(self, index):
        # Returns (xb, yb) pair, after applying all transformations on the audio file.
        row = self.csv.iloc[index]
        in_inv = np.load(self.indoorpath / f"{row['in_file_name']}.npy")
        in_inv = torch.from_numpy(in_inv).double()
        
        try: inv_noise = np.load(self.indoorpath / f"{row['in_file_name']}.npy".replace(str(row['in_id']).zfill(2), '00'))
        except:
            try: inv_noise = np.load(self.indoorpath / f"{row['in_file_name'].replace('bas', 'low')}.npy".replace(str(row['in_id']).zfill(2), '00'))
            except:
                print(f"no noise signal {row['in_file_name'].replace(str(row['in_id']).zfill(2), '00')} exists for {row['in_file_name']}")
                inv_noise = np.zeros_like(in_inv)
        inv_noise = torch.from_numpy(inv_noise).double()

        out_inv = np.load(self.outdoorpath / f"{row['out_file_name']}.npy")
        out_inv = torch.from_numpy(out_inv).double()

        mix_inv =  (in_inv - inv_noise) + out_inv

        mix_inv = mix_inv.T

    
        def twoclass(label:str)->int:
            # 1 -> mine
            # 0 -> other
            return 1 if label == 'mine' else 0

        def threeclass(label:str)->int:
            # 2 -> archeology 
            # 1 -> mine
            # 0 -> other
            return 1 if label == 'mine' else 2 if label == 'archeology' else 0

        def fgclass(label:str)->int:
            # 14 -> knife
            # 13 -> terracotta
            # ...
            # 1 -> pmn1
            # 0 -> none
            return int(label)

        if self.task == '2-class':
            label = twoclass(row['in_name'])
        elif self.task == '3-class':
            label = threeclass(row['in_name'])
        elif self.task == 'fg-class':
            label = fgclass(row['in_id'])
        else:
            raise ValueError(f'task {self.task} is not supported !')
        
        mix = torch.real(mix_inv) if self.only_real else mix_inv
        mix = torch.nan_to_num(mix)
        return mix.float(), label

    def __sizeof__(self) -> int:
        return len(self.csv)
    
    def __len__(self) -> int:
        return len(self.csv)
        

In [4]:
train_data = InvDataset()

In [5]:
for x, yb in train_data:
    break

In [6]:
SIZE = train_data.__sizeof__()

## Holgraphic dataset

In [3]:
from utils.data import task_manager, get_holo_noise

In [74]:
class HoloDataset(torch.utils.data.Dataset):

    def __init__(self, metapath: Path = BASEPATH / Path('data/processed/meta'),
                       hologramspath: Path = BASEPATH / Path('data/interim/holograms'),
                       task: str = '2-class',
                       only_real: bool = True, 
                       remove: List = []) -> None:

        self.metapath = metapath
        # tasks that can be conducted are:
        # - '2-class'     - classification [mine / clutter]
        # - '3-class'   - classification [mine / clutter / arch]
        # - 'fg-class'  - fine-grain classification [vs50, wood-cylinder, ...]
        self.task = task
        self.only_real = only_real

        self.indoorpath = hologramspath / Path('indoor')
        self.outdoorpath = hologramspath / Path('outdoor')

        self.csv = pd.read_csv(metapath / Path('mix.csv'))
        # filter out some
        self.csv = self.csv[~self.csv['in_id'].isin(remove)]
        
        
    def __getitem__(self, index):
        # Returns (xb, yb) pair, after applying all transformations on the audio file.
        row = self.csv.iloc[index]
        in_holo = np.load(self.indoorpath / f"{row['in_file_name']}.npy")
        in_holo = torch.from_numpy(in_holo).double()
        
        holo_noise = get_holo_noise(self.indoorpath, row['in_file_name'], row['in_id'], in_holo.shape)
        holo_noise = torch.from_numpy(holo_noise).double()

        out_holo = np.load(self.outdoorpath / f"{row['out_file_name']}.npy")
        out_holo = torch.from_numpy(out_holo).double()

        print(in_holo.shape , holo_noise.shape, out_holo.shape)
        if out_holo.shape[-1] == 61:
            return None, None
        mix_holo =  ((in_holo - holo_noise) + out_holo)

        tasks = ['2-class','3-class','fg-class']

        if self.task in tasks:
            label = task_manager(self.task, row)
        else:
            raise ValueError(f'task {self.task} is not supported !')
        
        mix = torch.real(mix_holo) if self.only_real else mix_holo
        mix = mix.unsqueeze(0)
        mix = torch.nan_to_num(mix).T
        return mix.float(), label

    def __sizeof__(self) -> int:
        return len(self.csv)
    
    def __len__(self) -> int:
        return len(self.csv)


In [75]:
train_data = HoloDataset(task='fg-class')

In [76]:
count = 0
for x, yb in train_data:
    if count > 10: break

torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 62])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])
torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([5

FileNotFoundError: [Errno 2] No such file or directory: '/Users/emanuelevivoli/asmara/data/interim/holograms/outdoor/36_295_6.npy'

In [49]:
SIZE = train_data.__sizeof__()

### Step 3: Build a classification model

In [50]:
from torch.utils.data import random_split

In [60]:
# We use folds 1,2,3 for training, 4 for validation, 5 for testing.
test_data, val_data, train_data = random_split(HoloDataset(task='2-class'), [int(SIZE*0.1), int(SIZE*0.2), SIZE-(int(SIZE*0.1) + int(SIZE*0.2))])


In [61]:
train_loader = \
    torch.utils.data.DataLoader(train_data, batch_size=8, shuffle=True)

In [62]:
val_loader = torch.utils.data.DataLoader(val_data, batch_size=8)

In [63]:
test_loader = torch.utils.data.DataLoader(test_data, batch_size=8)

In [64]:
import pytorch_lightning as pl
from torchmetrics import functional

In [65]:
from torch import Tensor, nn
from torch.nn import functional as F

In [66]:
class UNet(pl.LightningModule):
    """Pytorch Lightning implementation of U-Net.

    Paper: `U-Net: Convolutional Networks for Biomedical Image Segmentation
    <https://arxiv.org/abs/1505.04597>`_

    Paper authors: Olaf Ronneberger, Philipp Fischer, Thomas Brox

    Implemented by:

        - `Annika Brundyn <https://github.com/annikabrundyn>`_
        - `Akshay Kulkarni <https://github.com/akshaykvnit>`_

    Args:
        num_classes: Number of output classes required
        input_channels: Number of channels in input images (default 3)
        num_layers: Number of layers in each side of U-net (default 5)
        features_start: Number of features in first layer (default 64)
        bilinear: Whether to use bilinear interpolation (True) or transposed convolutions (default) for upsampling.
    """

    def __init__(
        self,
        num_classes: int,
        input_channels: int = 3,
        num_layers: int = 5,
        features_start: int = 64,
        bilinear: bool = False,
    ):

        if num_layers < 1:
            raise ValueError(f"num_layers = {num_layers}, expected: num_layers > 0")

        super().__init__()
        self.num_layers = num_layers

        layers = [DoubleConv(input_channels, features_start)]

        feats = features_start
        for _ in range(num_layers - 1):
            layers.append(Down(feats, feats * 2))
            feats *= 2

        for _ in range(num_layers - 1):
            layers.append(Up(feats, feats // 2, bilinear))
            feats //= 2

        layers.append(nn.Conv2d(feats, num_classes, kernel_size=1))

        self.layers = nn.ModuleList(layers)

    def forward(self, x: Tensor) -> Tensor:
        xi = [self.layers[0](x)]
        # Down path
        for layer in self.layers[1 : self.num_layers]:
            xi.append(layer(xi[-1]))
        # Up path
        for i, layer in enumerate(self.layers[self.num_layers : -1]):
            xi[-1] = layer(xi[-1], xi[-2 - i])
        return self.layers[-1](xi[-1])

    def training_step(self, batch, batch_idx):
        # Very simple training loop
        x, y = batch
        y_hat = self(x)
        loss = F.cross_entropy(y_hat, y)
        self.log('train_loss', loss, on_step=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        y_hat = torch.argmax(y_hat, dim=1)
        acc = functional.accuracy(y_hat, y)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True)
        return acc
        
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer


class DoubleConv(nn.Module):
    """[ Conv2d => BatchNorm => ReLU ] x 2."""

    def __init__(self, in_ch: int, out_ch: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )

    def forward(self, x: Tensor) -> Tensor:
        return self.net(x)


class Down(nn.Module):
    """Downscale with MaxPool => DoubleConvolution block."""

    def __init__(self, in_ch: int, out_ch: int):
        super().__init__()
        self.net = nn.Sequential(nn.MaxPool2d(kernel_size=2, stride=2), DoubleConv(in_ch, out_ch))

    def forward(self, x: Tensor) -> Tensor:
        return self.net(x)


class Up(nn.Module):
    """Upsampling (by either bilinear interpolation or transpose convolutions) followed by concatenation of feature
    map from contracting path, followed by DoubleConv."""

    def __init__(self, in_ch: int, out_ch: int, bilinear: bool = False):
        super().__init__()
        self.upsample = None
        if bilinear:
            self.upsample = nn.Sequential(
                nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True),
                nn.Conv2d(in_ch, in_ch // 2, kernel_size=1),
            )
        else:
            self.upsample = nn.ConvTranspose2d(in_ch, in_ch // 2, kernel_size=2, stride=2)

        self.conv = DoubleConv(in_ch, out_ch)

    def forward(self, x1: Tensor, x2: Tensor) -> Tensor:
        x1 = self.upsample(x1)

        # Pad x1 to the size of x2
        diff_h = x2.shape[2] - x1.shape[2]
        diff_w = x2.shape[3] - x1.shape[3]

        x1 = F.pad(x1, [diff_w // 2, diff_w - diff_w // 2, diff_h // 2, diff_h - diff_h // 2])

        # Concatenate along the channels axis
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)

In [67]:
pl.seed_everything(0)

Global seed set to 0


0

In [69]:
# Test that the network works on a single mini-batch
audionet = UNet(num_classes = 2, input_channels = 1)
xb, yb = next(iter(train_loader))
audionet(xb).shape

torch.Size([52, 62]) torch.Size([52, 62]) torch.Size([52, 61])


RuntimeError: The size of tensor a (62) must match the size of tensor b (61) at non-singleton dimension 1

<torch.utils.data.dataloader.DataLoader at 0x7fb3efd21700>

In [23]:
trainer = pl.Trainer(gpus=0, max_epochs=25)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [24]:
trainer.fit(audionet, train_loader, val_loader)

Missing logger folder: /Users/emanuelevivoli/asmara/src/notebooks/lightning_logs

  | Name   | Type       | Params
--------------------------------------
0 | layers | ModuleList | 31.1 M
--------------------------------------
31.1 M    Trainable params
0         Non-trainable params
31.1 M    Total params
124.238   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

ValueError: Either `preds` and `target` both should have the (same) shape (N, ...), or `target` should be (N, ...) and `preds` should be (N, C, ...).

# to implement

A list of models to implement are:
- [3D U-NET++](https://github.com/wolny/pytorch-3dunet)
- [Attention-UNet](https://github.com/ozan-oktay/Attention-Gated-Networks)
- [FetalCPSeg](https://github.com/wulalago/FetalCPSeg) `DSRNet`
- [SEFCN]()


Then there is a repo with all
- [Segmentation models](https://github.com/qubvel/segmentation_models.pytorch)

With the following architetcures:

- Unet 
- Unet++ 
- MAnet 
- Linknet 
- FPN 
- PSPNet
- PAN 
- DeepLabV3 
- DeepLabV3+ 