[Пример 1](https://www.kaggle.com/code/cornverburg/u-net-lung-segmentation-0-98-iou-99-5-accuracy)

[Учебный материал по сегментации и детекции](https://github.com/EPC-MSU/EduNet-lectures/blob/dev-2.3/out/L11_Segmentation_Detection.ipynb)

### Введение
В блокноте демонстрируется подход на основе глубокого обучения к семантической сегментации рентгеновских изображений легких. Цель состоит в том, чтобы классифицировать пиксели рентгеновских изображений на два класса: легкие и фон. Будем использовать архитектуру U-Net с dropout-регуляризацией.

### Набор данных
Набор данных состоит из рентгеновских изображений и соответствующих масок. Изображения и маски загружаются с использованием пользовательского класса LungDataset.

### Архитектура модели
Модель семантической сегментации построена с использованием архитектуры U-Net, сочетающей возможности понижающей дискретизации и сверточных операций. В качестве функции потерь применена Dice Loss, используется оптимизатор Adam. Классическая архитектура U-Net показана на рисунке.
![](https://camo.githubusercontent.com/6b548ee09b97874014d72903c891360beb0989e74b4585249436421558faa89d/68747470733a2f2f692e696d6775722e636f6d2f6a6544567071462e706e67)

### Обучение и валидация
Блокнот включает функции для обучения модели, визуализации примеров и вычисления метрик. Для предотвращения переобучения используется ранняя остановка (early stopping).

### Результаты
Загружается обученная модель, и визуализируются примеры из валидационного набора. Для оценки производительности модели вычисляются такие метрики, как accuracy, IoU, F1 score, precision, recall.

### Импорт необходимых библиотек

In [None]:
import os
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from torchvision import transforms
from sklearn.model_selection import train_test_split
from tqdm import tqdm

try:
    import segmentation_models_pytorch as smp
except ImportError:
    !pip install segmentation-models-pytorch -q > /dev/null
    import segmentation_models_pytorch as smp

### Наборы данных, загрузка и предварительная обработка данных
Определим пользовательский dataset-класс LungDataset для загрузки и предварительной обработки рентгеновских изображений легких и соответствующих им масок. Набор данных разделен на обучающую и валидационную выборки. Также определим преобразования (transform) для изменения размера и нормализации изображений и масок.

Дополнительно определим класс TestDataset для того, чтобы загружать изображения из тестового набора (здесь возвращаются только изображения без масок).

In [None]:
class LungDataset(Dataset):
    def __init__(self, data_paths, transform=None, augmentations=None):
        self.data_paths = sorted(data_paths)
        self.transform = transform
        self.augmentations = augmentations
        
        self.images = []
        self.masks = []
        
        # Loop over data paths
        for data_path in self.data_paths:
            image_path = os.path.join(data_path, "images")
            mask_path = os.path.join(data_path, "masks")
            
            # Load images and masks
            images = [os.path.join(image_path, f) for f in os.listdir(image_path) if f.endswith(".png")]
            masks = [os.path.join(mask_path, f) for f in os.listdir(mask_path) if f.endswith(".png")]

            self.images.extend(sorted(images))
            self.masks.extend(sorted(masks))
            
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image_path = self.images[idx]
        mask_path = self.masks[idx]

        # Open images and masks
        image = Image.open(image_path).convert("L")
        mask = Image.open(mask_path).convert("L")  # Convert to grayscale mask
        
        if self.transform:
            image, mask = self.transform(image, mask)
            
        if self.augmentations:
            image, mask = self.augmentations(image, mask)
        
        return image, mask

In [None]:
class TestDataset(Dataset):
    def __init__(self, data_paths, transform=None, augmentations=None):
        self.data_paths = sorted(data_paths)
        self.transform = transform
        self.augmentations = augmentations
        
        self.images = []
        
        # Loop over data paths
        for data_path in self.data_paths:
            image_path = os.path.join(data_path, "images")
            
            # Load images
            images = [os.path.join(image_path, f) for f in os.listdir(image_path) if f.endswith(".png")]

            self.images.extend(sorted(images))
            
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image_path = self.images[idx]

        # Open images
        image = Image.open(image_path).convert("L")
        
        if self.transform:
            image = self.transform(image)
            
        if self.augmentations:
            image = self.augmentations(image)
        
        return image

### Определим преобразования для изображений и масок

In [None]:
def transform(image, mask):
    image_transform = transforms.Compose([
        transforms.Resize(size=PATCH_SIZE, antialias=True),
        transforms.ToTensor()
    ])
    
    mask_transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize(size=PATCH_SIZE, antialias=False)
    ])
    
    return image_transform(image), mask_transform(mask).type(torch.int)


def test_transform(image):
    image_transform = transforms.Compose([
        transforms.Resize(size=PATCH_SIZE, antialias=True),
        transforms.ToTensor()
    ])
    
    return image_transform(image)

### Создадим объекты классов Dataset и DataLoader

In [None]:
PATCH_SIZE = (256, 256)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 16


data_path = ["/kaggle/input/radiograph-segmentation-2025/Dataset/COVID",
             "/kaggle/input/radiograph-segmentation-2025/Dataset/Lung_Opacity",
             "/kaggle/input/radiograph-segmentation-2025/Dataset/Normal",
             "/kaggle/input/radiograph-segmentation-2025/Dataset/Viral Pneumonia"]

dataset = LungDataset(data_path, transform=transform, augmentations=None)


test_data_path = ["/kaggle/input/radiograph-segmentation-2025/Test/",]
test_dataset = TestDataset(test_data_path, transform=test_transform, augmentations=None)



# Define the sizes for each split
dataset_size = len(dataset)
val_size  = int(0.20 * dataset_size)
train_size = dataset_size - val_size


# Use random_split to create train and val datasets
train_dataset, val_dataset = random_split(dataset, [train_size, val_size], generator=torch.Generator().manual_seed(42))


# train_dataset.augmentations = train_augmentations

# Create DataLoader instances for each set
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

### Отобразим некоторые примеры

In [None]:
# Function to visualize images and masks
def visualize_samples(dataset, num_samples=5):
    # Visualize the images and masks
    fig, axes = plt.subplots(num_samples, 2, figsize=(8, 2 * num_samples))
    for i in range(num_samples):
        image, mask = dataset[np.random.randint(len(dataset))]
        
        # Display images
        axes[i, 0].imshow(image.cpu().permute(1,2,0))
        axes[i, 0].set_title(f'Sample {i + 1} - Image')
        axes[i, 0].axis('off')

        # Display masks
        axes[i, 1].imshow(mask.cpu().squeeze(), cmap='gray')
        axes[i, 1].set_title(f'Sample {i + 1} - Mask')
        axes[i, 1].axis('off')

    plt.tight_layout()
    plt.show()

# Visualize samples from the train dataset
visualize_samples(train_dataset)

### Определение модели
Далее определим архитектуру модели U-Net для семантической сегментации. Модель построена на архитектуре encoder-decoder с применением skip connections, что облегчает точную локализацию признаков. Для уменьшения переобучения добавлены dropout-слои.

### Определим UNet блоки (Up, DoubleConv, OutConv)

In [None]:
class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None, dropout_rate=0.1):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels, dropout_rate=0.1):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels, dropout_rate=dropout_rate)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True, dropout_rate=0.1):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2, dropout_rate=dropout_rate)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels, dropout_rate=dropout_rate)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

### Определим U-Net модель

In [None]:
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False, dropout_rate=0.1):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = (DoubleConv(n_channels, 64, dropout_rate=dropout_rate))
        self.down1 = (Down(64, 128, dropout_rate=dropout_rate))
        self.down2 = (Down(128, 256, dropout_rate=dropout_rate))
        self.down3 = (Down(256, 512, dropout_rate=dropout_rate))
        factor = 2 if bilinear else 1
        self.down4 = (Down(512, 1024 // factor, dropout_rate=dropout_rate))
        self.up1 = (Up(1024, 512 // factor, bilinear, dropout_rate=dropout_rate))
        self.up2 = (Up(512, 256 // factor, bilinear, dropout_rate=dropout_rate))
        self.up3 = (Up(256, 128 // factor, bilinear, dropout_rate=dropout_rate))
        self.up4 = (Up(128, 64, bilinear, dropout_rate=dropout_rate))
        self.outc = (OutConv(64, n_classes))

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

### Создадим объект модели UNet

In [None]:
unet = UNet(1, 2, bilinear=False, dropout_rate=0.1).to(DEVICE)

### Настройка обучения
Настроим процесс обучения, определяя функцию потерь, оптимизатор и планировщик скорости обучения (learning rate scheduler). Применим функцию потерь Dice для многоклассовой сегментации и оптимизатор Adam. Кроме того, настроим learning rate scheduler так, чтобы корректировать скорость обучения learning rate в зависимости от потери на валидационной выборке. Также определим критерий ранней остановки (early stopping) для предотвращения переобучения.

In [None]:
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Define constants
LEARNING_RATE = 0.0003
LR_FACTOR = 0.5
LR_PATIENCE = 2
EARLY_STOP_PATIENCE = 4
NUM_EPOCHS = 4

# Define the loss function, optimizer, and learning rate scheduler
criterion = smp.losses.DiceLoss('multiclass')
optimizer = optim.Adam(unet.parameters(), lr=LEARNING_RATE)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=LR_FACTOR, patience=LR_PATIENCE, verbose=False)

# Initialize early stopping parameters
early_stop_counter = 0
best_val_loss = float('inf')

### Обучим модель

In [None]:
from tqdm import tqdm
from torch.nn.parallel import DataParallel

def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=20, early_stop_patience=4):
    best_val_loss = float('inf')
    early_stop_counter = 0

    train_losses = []
    val_losses = []

    model = DataParallel(model)
    
    for epoch in range(1, num_epochs + 1):
        model.train()
        train_loss = 0.0

        for batch_idx, (images, masks) in tqdm(enumerate(train_loader), total=len(train_loader)):
            images, masks = images.to(DEVICE), masks.to(DEVICE, dtype=torch.long)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, masks)
            
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # Validation phase
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.to(DEVICE), masks.to(DEVICE, dtype=torch.long)
                outputs = model(images)
                loss = criterion(outputs, masks)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        # Update learning rate scheduler
        scheduler.step(avg_val_loss)

        # Print and check for early stopping
        print(f'Epoch [{epoch}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}')

        if avg_val_loss < best_val_loss:
            torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
            best_val_loss = avg_val_loss
            early_stop_counter = 0
        else:
            early_stop_counter += 1

        if early_stop_counter >= early_stop_patience:
            print(f'Early stopping after {early_stop_patience} epochs without improvement.')
            break

    return train_losses, val_losses

# Now, call the function with your specific parameters
train_model(unet, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=NUM_EPOCHS, early_stop_patience=EARLY_STOP_PATIENCE)

# Load the best model after training
unet = nn.DataParallel(unet) # this is necessary to get matching key dicts
unet.load_state_dict(torch.load('best_model.pth'))

### Отобразим примеры из валидационного набора

In [None]:
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F

# Function to plot examples with predicted and true masks
def plot_examples(model, dataset, num_examples=5):
    model.eval()
    
    for i in range(num_examples):
        image, mask = dataset[i]
        
        with torch.no_grad():
            output = model(image.unsqueeze(0).to(DEVICE)).cpu()
        
        pred_mask = torch.argmax(output, dim=1)
        
        # Plot the images and masks
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 3, 1)
        plt.imshow(image.permute(1,2,0), cmap='gray')
        plt.axis("off")
        plt.title('Image')
        
        plt.subplot(1, 3, 2)
        plt.imshow(mask.permute(1,2,0), cmap='gray')
        plt.axis("off")
        plt.title('True Mask')
        
        plt.subplot(1, 3, 3)
        plt.imshow(pred_mask.permute(1,2,0), cmap='gray')
        plt.axis("off")
        plt.title('Predicted Mask')
        
        plt.tight_layout()
        plt.show()

# Plot examples from the test dataloader
plot_examples(unet, val_dataset, num_examples=10)

### Рассчитаем метрики на валидационном наборе

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, jaccard_score, f1_score, precision_score, recall_score

def compute_metrics(model, dataloader):
    model.eval()
    
    all_true_masks = []
    all_pred_masks = []
    
    confusion = np.zeros((2,2))
    for images, masks in tqdm(dataloader, total=len(dataloader)):
        images, masks = images.to(DEVICE), masks.to(DEVICE)
        
        with torch.no_grad():
            outputs = model(images)
        
        # Convert probability maps to binary masks using a threshold
        pred_masks = torch.argmax(outputs, dim=1)
        
        true_masks_np = masks.cpu().detach().numpy().ravel()
        pred_masks_np = pred_masks.cpu().detach().numpy().ravel()
        
        confusion += confusion_matrix(true_masks_np, pred_masks_np)
    
    # Calculate metrics
    TN, FP, FN, TP = confusion.ravel()
    accuracy = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1_score = 2 * (precision * recall) / (precision + recall)
    jaccard_index = TP / (TP + FP + FN)

    print(f"  Accuracy : {accuracy:.4f}")
    print(f"  IoU      : {jaccard_index:.4f}")
    print(f"  F1 Score : {f1_score:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall   : {recall:.4f}")

# Ensure DEVICE variable is defined
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Compute metrics for validation set
print("\nMetrics for Validation Set:")
compute_metrics(unet, val_loader)

### Predict на тестовой выборке и последующее формирование файла submission

In [None]:
# Generate predictions
pred_masks = []

unet.eval()
    
for image in test_dataset:

    with torch.no_grad():
        output = unet(image.unsqueeze(0).to(DEVICE)).cpu()

    pred_mask = torch.argmax(output, dim=1).cpu().detach().numpy()
    pred_masks.append(pred_mask)

In [None]:
len(pred_masks)

In [None]:
# RLE-кодировка / декодировка для submission
def encode_mask_to_rle(mask):
    '''
    mask: бинарная маска в виде numpy массива 
    1 - объект
    0 - фон
    Возвращает закодированную длину серии 
    '''
    pixels = mask.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    return ' '.join(str(x) for x in runs)

def decode_rle_to_mask(rle, height, width, viz = False):
    '''
    rle : длина серии в строковом формате (начальное значение, число элементов)
    height : высота изображения маски 
    width : ширина изображения маски
    Возвращает бинарную маску
    '''
    rle = np.array(rle.split(' ')).reshape(-1, 2)
    mask = np.zeros((height * width, 1, 3))
    if viz:
        color = np.random.rand(3)
    else:
        color = [1, 1, 1]
    for i in rle:
        mask[int(i[0]):int(i[0]) + int(i[1]), :, :] = color
        
    return cv2.cvtColor(mask.reshape(height, width, 3).astype(np.uint8), cv2.COLOR_BGR2GRAY)

In [None]:
# RLE-кодирование предсказаний
rle_pred = []
for i in range(len(pred_masks)):
    encoded = encode_mask_to_rle(pred_masks[i])
    rle_pred.append(encoded)

In [None]:
len(rle_pred)

In [None]:
# Данные для submission
import pandas as pd

pred = pd.DataFrame(data = {'id':range(len(rle_pred)), 
                          'target':rle_pred})

In [None]:
# Вывод данных
pred

In [None]:
# Создание csv-файла для submission
pred.to_csv('/kaggle/working/submission.csv', index=False)