## SETUP


In [1]:
!pip install numpy pandas matplotlib seaborn 
!pip install scikit-learn albumentations monai 
!pip install torch torchvision opencv-python


Defaulting to user installation because normal site-packages is not writeable
Collecting scikit-learn
  Downloading scikit_learn-1.5.2-cp311-cp311-win_amd64.whl.metadata (13 kB)
Collecting albumentations
  Downloading albumentations-1.4.21-py3-none-any.whl.metadata (31 kB)
Collecting monai
  Downloading monai-1.4.0-py3-none-any.whl.metadata (11 kB)
Collecting joblib>=1.2.0 (from scikit-learn)
  Downloading joblib-1.4.2-py3-none-any.whl.metadata (5.4 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn)
  Downloading threadpoolctl-3.5.0-py3-none-any.whl.metadata (13 kB)
Collecting albucore==0.0.20 (from albumentations)
  Downloading albucore-0.0.20-py3-none-any.whl.metadata (5.3 kB)
Collecting eval-type-backport (from albumentations)
  Downloading eval_type_backport-0.2.0-py3-none-any.whl.metadata (2.2 kB)
Collecting stringzilla>=3.10.4 (from albucore==0.0.20->albumentations)
  Downloading stringzilla-3.10.10-cp311-cp311-win_amd64.whl.metadata (81 kB)
Collecting simsimd>=5.9.2 (from a

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from segmentation_models_pytorch import UnetPlusPlus
import numpy as np
import pydicom
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split


## MRI Data

In [None]:
class MRIDataset(Dataset):
    def __init__(self, image_paths, mask_paths, transform=None):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # obraz
        image_path = self.image_paths[idx]
        image = pydicom.dcmread(image_path).pixel_array  # DICOM na macierz
        image = cv2.resize(image, (256, 256))           # Zmiana rozmiaru na 256x256
        image = np.expand_dims(image, axis=0) / 4095.0  # Normalizacja (12-bit -> [0,1])
        
        # maska
        mask_path = self.mask_paths[idx]
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
        mask = cv2.resize(mask, (256, 256))
        mask = np.expand_dims(mask, axis=0) / 255.0     # Normalizacja

        # Augmentacja
        if self.transform:
            augmented = self.transform(image=image[0], mask=mask[0])
            image, mask = augmented["image"], augmented["mask"]

        return torch.tensor(image, dtype=torch.float32), torch.tensor(mask, dtype=torch.float32)


## Dataset


In [None]:
image_dir = "path_to_images"
mask_dir = "path_to_masks"

image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".dcm")])
mask_paths = sorted([os.path.join(mask_dir, f) for f in os.listdir(mask_dir) if f.endswith(".png")])

train_images, val_images, train_masks, val_masks = train_test_split(image_paths, mask_paths, test_size=0.2, random_state=42)

# Augmentacje
from albumentations import Compose, HorizontalFlip, RandomBrightnessContrast
transform = Compose([
    HorizontalFlip(p=0.5),
    RandomBrightnessContrast(p=0.2)
])

# Datasety
train_dataset = MRIDataset(train_images, train_masks, transform=transform)
val_dataset = MRIDataset(val_images, val_masks)

# Loadery danych
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)


## U-NET ++

In [None]:
from segmentation_models_pytorch import UnetPlusPlus

class UNetPlusPlusModel(nn.Module):
    def __init__(self, encoder_name="resnet34", encoder_weights="imagenet", in_channels=1, num_classes=1):
        super(UNetPlusPlusModel, self).__init__()
        self.model = UnetPlusPlus(
            encoder_name=encoder_name,       # np. 'efficientnet-b4'
            encoder_weights=encoder_weights, # np. 'imagenet' lub None
            in_channels=in_channels,         # Liczba kanałów wejściowych (MRI = 1)
            classes=num_classes               # Liczba klas wyjściowych
        )
        # Opcjonalne: dodatkowe warstwy wyjściowe
        self.final_activation = nn.Sigmoid()  # Aktywacja końcowa (dla segmentacji binarnej)

    def forward(self, x):
        x = self.model(x)
        return self.final_activation(x)

# Inicjalizacja modelu
model = UNetPlusPlusModel(
    encoder_name="efficientnet-b0",  # Wybierz encoder
    encoder_weights="imagenet",      # Wagi pretrained
    in_channels=1,                   # MRI to obraz szarości
    num_classes=1                    # Segmentacja binarna
).cuda()  # GPU


## Dropout i Batch Normalization

In [None]:
class UNetPlusPlusWithDropout(nn.Module):
    def __init__(self, encoder_name="resnet34", encoder_weights="imagenet", in_channels=1, num_classes=1, dropout_rate=0.5):
        super(UNetPlusPlusWithDropout, self).__init__()
        self.model = UnetPlusPlus(
            encoder_name=encoder_name,
            encoder_weights=encoder_weights,
            in_channels=in_channels,
            classes=num_classes
        )
        self.dropout = nn.Dropout(dropout_rate)  # Dropout
        self.batch_norm = nn.BatchNorm2d(num_classes)  # Batch Normalization
        self.final_activation = nn.Sigmoid()

    def forward(self, x):
        x = self.model(x)
        x = self.dropout(x)
        x = self.batch_norm(x)
        return self.final_activation(x)

# Inicjalizacja modelu
model = UNetPlusPlusWithDropout(
    encoder_name="resnet50",
    encoder_weights="imagenet",
    in_channels=1,
    num_classes=1,
    dropout_rate=0.3
).cuda()


## Attention

In [None]:
class AttentionBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(AttentionBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        attn = self.conv1(x)
        attn = self.conv2(attn)
        attn = self.sigmoid(attn)
        return x * attn

class UNetPlusPlusWithAttention(nn.Module):
    def __init__(self, encoder_name="resnet34", encoder_weights="imagenet", in_channels=1, num_classes=1):
        super(UNetPlusPlusWithAttention, self).__init__()
        self.model = UnetPlusPlus(
            encoder_name=encoder_name,
            encoder_weights=encoder_weights,
            in_channels=in_channels,
            classes=num_classes
        )
        self.attention = AttentionBlock(num_classes, num_classes)
        self.final_activation = nn.Sigmoid()

    def forward(self, x):
        x = self.model(x)
        x = self.attention(x)
        return self.final_activation(x)

# Inicjalizacja modelu
model = UNetPlusPlusWithAttention(
    encoder_name="resnet101",
    encoder_weights="imagenet",
    in_channels=1,
    num_classes=1
).cuda()


## Trening

In [None]:
class DiceLoss(nn.Module):
    def __init__(self):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, smooth=1):
        inputs = torch.sigmoid(inputs)
        inputs = inputs.view(-1)
        targets = targets.view(-1)
        intersection = (inputs * targets).sum()
        dice = (2. * intersection + smooth) / (inputs.sum() + targets.sum() + smooth)
        return 1 - dice

criterion = DiceLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [None]:
def train_model(model, train_loader, val_loader, epochs=20):
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for images, masks in train_loader:
            images, masks = images.cuda(), masks.cuda()
            outputs = model(images)
            loss = criterion(outputs, masks)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        # Walidacja
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for images, masks in val_loader:
                images, masks = images.cuda(), masks.cuda()
                outputs = model(images)
                loss = criterion(outputs, masks)
                val_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}")

train_model(model, train_loader, val_loader, epochs=20)


## Encoders + Test i visualization 

In [None]:
def test_encoders(encoders, data_loader, base_model_class):
    """
    Testuje różne encodery i oblicza średni wynik Dice.
    
    :param encoders: Lista nazw encoderów do przetestowania.
    :param data_loader: Loader danych walidacyjnych/testowych.
    :param base_model_class: Klasa modelu (np. UnetPlusPlus z segmentation_models_pytorch).
    :return: Lista słowników z wynikami {"encoder": encoder, "dice_score": avg_dice}.
    """
    results = []
    for encoder in encoders:
        print(f"Testing encoder: {encoder}")
        
        # Tworzenie modelu z wybranym encoderem
        model = base_model_class(encoder_name=encoder, classes=1, activation=None)
        model = model.cuda()
        model.load_state_dict(torch.load(f"best_model_{encoder}.pth"))
        
        model.eval()
        dice_scores = []
        
        with torch.no_grad():
            for images, masks in data_loader:
                images, masks = images.cuda(), masks.cuda()
                outputs = model(images)
                preds = torch.sigmoid(outputs).cpu().numpy()
                preds = (preds > 0.5).astype(np.uint8)
                
                # Obliczanie metryki Dice
                dice_score = (2 * np.sum(preds * masks.cpu().numpy()) + 1e-6) / \
                             (np.sum(preds) + np.sum(masks.cpu().numpy()) + 1e-6)
                dice_scores.append(dice_score)
        
        avg_dice = np.mean(dice_scores)
        print(f"Average Dice Score for {encoder}: {avg_dice:.4f}")
        results.append({"encoder": encoder, "dice_score": avg_dice})
    
    return results


In [None]:
def visualize_predictions(model, data_loader, num_images=2):
    """
    Wizualizuje predykcje modelu dla danych walidacyjnych/testowych.
    
    :param model: Wytrenowany model.
    :param data_loader: Loader danych walidacyjnych/testowych.
    :param num_images: Liczba przykładów do wizualizacji (domyślnie 2).
    """
    model.eval()
    with torch.no_grad():
        for images, masks in data_loader:
            images, masks = images.cuda(), masks.cuda()
            outputs = model(images)
            preds = torch.sigmoid(outputs).cpu().numpy()
            preds = (preds > 0.5).astype(np.uint8)

            # Wizualizacja
            for i in range(min(len(images), num_images)):
                plt.figure(figsize=(10, 5))
                plt.subplot(1, 3, 1)
                plt.title("Input Image")
                plt.imshow(images[i].cpu().numpy().squeeze(), cmap='gray')

                plt.subplot(1, 3, 2)
                plt.title("Ground Truth")
                plt.imshow(masks[i].cpu().numpy().squeeze(), cmap='gray')

                plt.subplot(1, 3, 3)
                plt.title("Prediction")
                plt.imshow(preds[i].squeeze(), cmap='gray')
                plt.show()


In [None]:
from segmentation_models_pytorch import UnetPlusPlus

# Lista encoderów do przetestowania
encoders = ["resnet34", "efficientnet-b0", "vgg16"]

# Testowanie encoderów
results = test_encoders(encoders, val_loader, UnetPlusPlus)

# Opcjonalna wizualizacja wyników dla najlepszego encodera
best_encoder = max(results, key=lambda x: x["dice_score"])["encoder"]
print(f"Best encoder: {best_encoder}")

# Tworzenie modelu z najlepszym encoderem
best_model = UnetPlusPlus(encoder_name=best_encoder, classes=1, activation=None)
best_model = best_model.cuda()
best_model.load_state_dict(torch.load(f"best_model_{best_encoder}.pth"))

# Wizualizacja predykcji
visualize_predictions(best_model, val_loader)
