### 4.3.2. U-Net - przygotowanie danych i trening

#### Importy i ustawienia środowiska

In [None]:
from pathlib import Path
import random
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import GroupKFold
from tensorflow.keras import layers, models, optimizers, callbacks

#### Funkcja `load_image`
Funkcja odpowiedzialna za wczytanie obrazu i normalizacja pikseli do [0,1].

In [None]:
def load_image(path):
    img = cv2.imread(str(path))
    if img is None:
        print("Nie można wczytać obrazu")
        return None
    return img.astype(np.float32) / 255.0

#### Funkcja `load_mask`
Funkcja odpowiedziana za wczytanie obrazu maski i zastosowanie label smoothing który poprawia stabilność uczenia i ułatwia generalizację modelu

In [None]:
def load_mask(path, smooth=0.1):
    mask = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
    if mask is None:
        print("Nie można wczytać maski")
        return None
    m = mask.astype(np.float32) / 255.0
    
    m = m * (1 - smooth) + (smooth / 2)
    return np.expand_dims(m, axis=-1)

#### Funkcja `augmentation`
Funkcja odpowiedzialna za przeprowadzenie odbicia poziomego, rotacji, zmiany jasności i kontrastu na zdjęciach

In [None]:
def augmentation(img, mask):
    if random.random() < 0.5:
        img = cv2.flip(img, 1)
        mask = cv2.flip(mask, 1)
    if random.random() < 0.5:
        angle = random.uniform(-30, 30)
        h, w = img.shape[:2]
        center = (w // 2, h // 2)
        M = cv2.getRotationMatrix2D(center, angle, 1.0)
        img = cv2.warpAffine(img, M, (w, h))
        mask = cv2.warpAffine(mask, M, (w, h))
    if random.random() < 0.5:
        alpha = random.uniform(0.8, 1.2)
        beta = random.uniform(-0.1, 0.1)
        img = np.clip(img * alpha + beta, 0, 1)

    return img, mask

#### Funkcja `data_generator`
Funkcja odpowiedzialna za generowanie batchy zawierających pary obrazy, maski wraz z augmentacjami

In [None]:
def data_generator(pairs, batch_size=8):
    while True:
        random.shuffle(pairs)
        for i in range(0, len(pairs), batch_size):
            batch = pairs[i:i+batch_size]
            imgs, msks = [], []
            for img_path, msk_path in batch:
                img = load_image(img_path)
                if img is None:
                    continue

                msk2d = load_mask(msk_path)[..., 0]
                if msk2d is None:
                    continue

                img_aug, mask_aug = augmentation(img, msk2d)
                imgs.append(img_aug)
                msks.append(np.expand_dims(mask_aug, axis=-1))
            imgs_array = np.stack(imgs, axis=0)
            msks_array = np.stack(msks, axis=0)
            yield imgs_array, msks_array

#### Funkcja `unet_model`
Budowa 5-poziomowego U-Net modelu

In [None]:
def unet_model(input_size=(256, 256, 3)):
    inputs = layers.Input(input_size)

    # Encoder
    c1 = layers.Conv2D(64, 3, activation='relu', padding='same')(inputs)
    c1 = layers.Conv2D(64, 3, activation='relu', padding='same')(c1)
    p1 = layers.MaxPooling2D(2)(c1)
    c2 = layers.Conv2D(128, 3, activation='relu', padding='same')(p1)
    c2 = layers.Conv2D(128, 3, activation='relu', padding='same')(c2)
    p2 = layers.MaxPooling2D(2)(c2)
    c3 = layers.Conv2D(256, 3, activation='relu', padding='same')(p2)
    c3 = layers.Conv2D(256, 3, activation='relu', padding='same')(c3)
    p3 = layers.MaxPooling2D(2)(c3)
    c4 = layers.Conv2D(512, 3, activation='relu', padding='same')(p3)
    c4 = layers.Conv2D(512, 3, activation='relu', padding='same')(c4)
    p4 = layers.MaxPooling2D(2)(c4)

    # Bottleneck
    c5 = layers.Conv2D(1024, 3, activation='relu', padding='same')(p4)
    c5 = layers.Conv2D(1024, 3, activation='relu', padding='same')(c5)

    # Decoder
    u6 = layers.Conv2DTranspose(512, 2, strides=2, padding='same')(c5)
    u6 = layers.concatenate([u6, c4])
    c6 = layers.Conv2D(512, 3, activation='relu', padding='same')(u6)
    c6 = layers.Conv2D(512, 3, activation='relu', padding='same')(c6)
    u7 = layers.Conv2DTranspose(256, 2, strides=2, padding='same')(c6)
    u7 = layers.concatenate([u7, c3])
    c7 = layers.Conv2D(256, 3, activation='relu', padding='same')(u7)
    c7 = layers.Conv2D(256, 3, activation='relu', padding='same')(c7)
    u8 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(c7)
    u8 = layers.concatenate([u8, c2])
    c8 = layers.Conv2D(128, 3, activation='relu', padding='same')(u8)
    c8 = layers.Conv2D(128, 3, activation='relu', padding='same')(c8)
    u9 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(c8)
    u9 = layers.concatenate([u9, c1])
    c9 = layers.Conv2D(64, 3, activation='relu', padding='same')(u9)
    c9 = layers.Conv2D(64, 3, activation='relu', padding='same')(c9)
    outputs = layers.Conv2D(1, 1, activation='sigmoid')(c9)
    
    return models.Model(inputs, outputs)

#### Funkcja `iou_metric`
Funkcja wyliczająca miarę IoU

In [None]:
def iou_metric(y_true, y_pred, smooth=1e-6):
    y_pred = tf.round(y_pred)
    inter = tf.reduce_sum(y_true * y_pred)
    union = tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) - inter
    return (inter + smooth) / (union + smooth)

#### Funkcja `dice_coef`
Funkcja wylicająca miarę DICE

In [None]:
def dice_coef(y_true, y_pred, smooth=1e-6):
    y_pred = tf.round(y_pred)
    inter = tf.reduce_sum(y_true * y_pred)
    return (2 * inter + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

#### Funkcja `recall_metric`
Funkcja wyliczająca miarę recall

In [None]:
def recall_metric(y_true, y_pred, smooth=1e-6):
    y_pred = tf.round(y_pred)
    tp = tf.reduce_sum(y_true * y_pred)
    return (tp + smooth) / (tf.reduce_sum(y_true) + smooth)

#### Funkcja `bce_dice_loss`
Funkcja obliczająca stratę

In [None]:
def bce_dice_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    dice = 1 - dice_coef(y_true, y_pred)
    return 0.7 * bce + 0.3 * dice

#### Przygotowanie listy par obraz–maska i identyfikatorów grup

In [None]:
pairs, groups = [], []
for img_path in sorted(Path("./processed/images_to_train/train/images").glob("*.*")):
    base = img_path.stem
    mask_path = Path("./processed/images_to_train/train/masks") / f"{base}.jpg"
    if not mask_path.exists():
        continue
    pairs.append((img_path, mask_path, base))
    groups.append(base)

#### Cross‐validation i trening U-Net  
Pętla wykonuje 5-krotną walidację krzyżową z GroupKFold, trenując i walidując model U-Net w każdym foldzie oraz śledząc najlepszy model na podstawie współczynnika Dice.


In [None]:
gkf = GroupKFold(n_splits=5)
best_dice = -1.0
best_model = None

for fold, (train_idx, val_idx) in enumerate(gkf.split(pairs, groups=groups), start=1):
    train_pairs = []
    for i in train_idx:
        img_path, mask_path, _ = pairs[i]
        train_pairs.append((img_path, mask_path))

    val_pairs = []
    for i in val_idx:
        img_path, mask_path, _ = pairs[i]
        val_pairs.append((img_path, mask_path))

    train_gen = data_generator(train_pairs, batch_size=8)
    val_gen = data_generator(val_pairs, batch_size=8)

    model = unet_model()
    model.compile(optimizer=optimizers.Adam(1e-4), loss=bce_dice_loss,
                  metrics=[iou_metric, dice_coef, recall_metric])

    history = model.fit(train_gen, steps_per_epoch=len(train_pairs)//8, validation_data=val_gen,
                        validation_steps=len(val_pairs)//8, epochs=50, callbacks=[
                        callbacks.ReduceLROnPlateau(monitor='val_dice_coef', factor=0.5, patience=5), callbacks.EarlyStopping(monitor='val_iou_metric', patience=4, restore_best_weights=True)])

    scores = model.evaluate(val_gen, steps=len(val_pairs)//8, return_dict=True)

    print(
        f"Fold {fold}: IoU={scores['iou_metric']:.4f}, Dice={scores['dice_coef']:.4f}, Recall={scores['recall_metric']:.4f}")
    
    if scores['dice_coef'] > best_dice:
        best_dice = scores['dice_coef']
        best_model = model

#### Zapisanie najlepszego modelu

In [None]:
best_model.save("unet_best_model.h5")
print(f"\nNajlepszy Dice Coef: {best_dice:.4f} → zapisano unet_best_model.h5")