In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms.functional as TF

import pytorch_lightning as pl
from typing import Dict, List, Tuple



In [2]:
class DoubleConv(pl.LightningModule):
    def __init__(self, in_channels, out_channels):
        super(DoubleConv, self).__init__()
        self.conv = nn.Sequential(
            
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False), # bias=False for Batchnorm              
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            
            nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.conv(x)

In [3]:
class UNET(pl.LightningModule):
    def __init__(
        self, 
        in_channels, 
        out_channels, 
        optimizer, 
        loss_fn,
        features=[64, 128, 256, 512],
    ):
        super(UNET, self).__init__()
        
        self.loss_fn = loss_fn
        
        optimizer_params[0]["params"] = self.parameters() 
        self.optimizer = optimizer(optimizer_params)     
        
        self.train_dice_score = 0
        self.val_dice_score = 0

        
        # architechture          
        self.ups = nn.ModuleList()
        self.downs = nn.ModuleList()
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Down part of UNET - only left side 
        for feature in features:
            self.downs.append(DoubleConv(in_channels, feature))
            in_channels = feature

        # Up part of UNET - only right side 
        for feature in reversed(features):
            # up             
            self.ups.append(
                nn.ConvTranspose2d(
                    # with skip-conn
                    feature*2, feature, kernel_size=2, stride=2,
                )
            )
            # two times right, x gets concat to 2xchannel         
            self.ups.append(DoubleConv(feature*2, feature))

        self.bottleneck = DoubleConv(features[-1], features[-1]*2)
        self.final_conv = nn.Conv2d(features[0], out_channels, kernel_size=1)

        
    def forward(self, x):
        skip_connections = []

        for down in self.downs:
            x = down(x)
            skip_connections.append(x)
            x = self.pool(x)

        x = self.bottleneck(x)
        
        # reverse skip_connections 
        skip_connections = skip_connections[::-1]
            
                        # пропускаем DoubleConv        
        for idx in range(0, len(self.ups), 2):
            x = self.ups[idx](x)
            skip_connection = skip_connections[idx//2]

            # resize if x does not match skip_conn              
            if x.shape != skip_connection.shape:
                x = TF.resize(x, size=skip_connection.shape[2:], antialias=True)

            concat_skip = torch.cat((skip_connection, x), dim=1)
            x = self.ups[idx+1](concat_skip)

        return self.final_conv(x)
    
    
    def configure_optimizers(self):
        return self.optimizer
    
    
    def training_step(self, batch: torch.Tensor, batch_idx) -> Dict[str, torch.Tensor]:
        x, y = batch
        pred = self(x.float())
        loss = self.loss_fn(pred, y)
        pred = torch.sigmoid(pred)
        pred = (pred > 0.5).float()
        
        self.train_dice_score += (2 * (pred * y).sum()) / ((pred + y).sum() + 1e-8)
        self.log("train_loss", loss, prog_bar=True)
        return {"loss": loss}

    def training_epoch_end(self, output: List[Dict[str, torch.Tensor]]):
        dice_score = self.train_dice_score / len(output)
        self.log("train_dice_score", dice_score, prog_bar=True)

    def validation_step(self, batch: torch.Tensor, batch_idx: int) -> Dict[str, torch.Tensor]:
        x, y = batch
        pred = self(x.float())
        loss = self.loss_fn(pred, y)
        pred = torch.sigmoid(pred)
        pred = (pred > 0.5).float()

        self.val_dice_score += (2 * (pred * y).sum()) / ((pred + y).sum() + 1e-8)
        self.log("val_loss", loss, prog_bar=True)
        return {"loss": loss}

    def validation_epoch_end(self, output: List[Dict[str, torch.Tensor]]):
        dice_score = self.val_dice_score / len(output)
        self.log("val_dice_score", dice_score, prog_bar=True)