In [1]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array

2026-02-05 11:55:29.418494: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2026-02-05 11:55:29.453616: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2026-02-05 11:55:30.463553: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.


In [2]:
import os
print(os.getcwd())

/run/media/shaunakmishra25/DATA/CB/AI ML/BRISC/UNetpp


In [15]:
IMG_SIZE = 256
BATCH_SIZE = 2
EPOCHS = 25

BASE_PATH = "../archive/brisc2025/segmentation_task"

TRAIN_IMG = f"{BASE_PATH}/train/images"
TRAIN_MASK = f"{BASE_PATH}/train/masks"

TEST_IMG = f"{BASE_PATH}/test/images"
TEST_MASK = f"{BASE_PATH}/test/masks"

PRED_BASE = f"{BASE_PATH}/predictions/unetpp/test"

PRED_MASK_DIR = f"{PRED_BASE}/masks"
OVERLAY_DIR = f"{PRED_BASE}/overlays"

os.makedirs(PRED_MASK_DIR, exist_ok=True)
os.makedirs(OVERLAY_DIR, exist_ok=True)

In [17]:
def load_image_mask(img_path, mask_path):

    #Load image 
    img = load_img(img_path, target_size=(IMG_SIZE, IMG_SIZE))
    img = img_to_array(img).astype("float32") / 255.0

    #Load mask (grayscale)
    mask = load_img(
        mask_path,
        target_size=(IMG_SIZE, IMG_SIZE),
        color_mode="grayscale"
    )
    mask = img_to_array(mask).astype("float32") / 255.0

    #Binarize mask 
    mask = (mask > 0.5).astype("float32")

    return img, mask

In [20]:
def data_generator(img_dir, mask_dir, batch_size):

    img_files = sorted([
        f for f in os.listdir(img_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ])

    while True:
        np.random.shuffle(img_files)

        for i in range(0, len(img_files), batch_size):

            batch_files = img_files[i:i + batch_size]
            imgs, masks = [], []

            for img_file in batch_files:

                img_path = os.path.join(img_dir, img_file)

                # mask name must match image stem
                mask_file = os.path.splitext(img_file)[0] + ".png"
                mask_path = os.path.join(mask_dir, mask_file)

                if not os.path.exists(mask_path):
                    print(f"Missing mask: {mask_file}")
                    continue

                img, mask = load_image_mask(img_path, mask_path)

                imgs.append(img)
                masks.append(mask)

            if len(imgs) > 0:
                yield np.array(imgs), np.array(masks)

In [23]:
def conv_block(x, filters):
    x = layers.Conv2D(filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)

    
    x = layers.Conv2D(filters, 3, padding="same")(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation("relu")(x)
    
    return x

In [26]:
def unetpp(input_shape=(IMG_SIZE, IMG_SIZE, 3)):

    inputs = layers.Input(input_shape)

    # ---------- Encoder ----------
    x0_0 = conv_block(inputs, 64)
    p0 = layers.MaxPooling2D()(x0_0)

    x1_0 = conv_block(p0, 128)
    p1 = layers.MaxPooling2D()(x1_0)

    x2_0 = conv_block(p1, 256)
    p2 = layers.MaxPooling2D()(x2_0)

    x3_0 = conv_block(p2, 512)
    p3 = layers.MaxPooling2D()(x3_0)

    x4_0 = conv_block(p3, 1024)

    # ---------- Decoder (nested) ----------

    x3_1 = conv_block(
        layers.Concatenate()([
            x3_0,
            layers.UpSampling2D()(x4_0)
        ]),
        512
    )

    x2_1 = conv_block(
        layers.Concatenate()([
            x2_0,
            layers.UpSampling2D()(x3_0)
        ]),
        256
    )

    x2_2 = conv_block(
        layers.Concatenate()([
            x2_0,
            x2_1,
            layers.UpSampling2D()(x3_1)
        ]),
        256
    )

    x1_1 = conv_block(
        layers.Concatenate()([
            x1_0,
            layers.UpSampling2D()(x2_0)
        ]),
        128
    )

    x1_2 = conv_block(
        layers.Concatenate()([
            x1_0,
            x1_1,
            layers.UpSampling2D()(x2_1)
        ]),
        128
    )

    x1_3 = conv_block(
        layers.Concatenate()([
            x1_0,
            x1_1,
            x1_2,
            layers.UpSampling2D()(x2_2)
        ]),
        128
    )

    x0_1 = conv_block(
        layers.Concatenate()([
            x0_0,
            layers.UpSampling2D()(x1_0)
        ]),
        64
    )

    x0_2 = conv_block(
        layers.Concatenate()([
            x0_0,
            x0_1,
            layers.UpSampling2D()(x1_1)
        ]),
        64
    )

    x0_3 = conv_block(
        layers.Concatenate()([
            x0_0,
            x0_1,
            x0_2,
            layers.UpSampling2D()(x1_2)
        ]),
        64
    )

    x0_4 = conv_block(
        layers.Concatenate()([
            x0_0,
            x0_1,
            x0_2,
            x0_3,
            layers.UpSampling2D()(x1_3)
        ]),
        64
    )

    outputs = layers.Conv2D(1, 1, activation="sigmoid")(x0_4)

    return models.Model(inputs, outputs)

In [29]:
import tensorflow as tf

def dice_coefficient(y_true, y_pred):
    smooth = 1e-6

    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])

    intersection = tf.reduce_sum(y_true_f * y_pred_f)

    return (2. * intersection + smooth) / (
        tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth
    )


def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)


def bce_dice_loss(y_true, y_pred):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    return bce + dice_loss(y_true, y_pred)

In [30]:
model = unetpp()

model.compile(
    optimizer=Adam(1e-4),
    loss=bce_dice_loss,
    metrics=["accuracy", dice_coefficient]
)

model.summary()

In [12]:
import tensorflow as tf

print("TensorFlow version:", tf.__version__)
print("GPUs visible to TF:", tf.config.list_physical_devices("GPU"))


TensorFlow version: 2.20.0
GPUs visible to TF: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


In [32]:
from sklearn.model_selection import train_test_split

# Split TRAIN into Train / Val

all_train_files = sorted([
    f for f in os.listdir(TRAIN_IMG)
    if f.lower().endswith((".jpg", ".jpeg", ".png"))
])

train_files, val_files = train_test_split(
    all_train_files,
    test_size=0.2,
    random_state=42
)

print("Train samples:", len(train_files))
print("Val samples:", len(val_files))


# Generators from subsets

def subset_generator(img_dir, mask_dir, files, batch_size):

    while True:
        np.random.shuffle(files)

        for i in range(0, len(files), batch_size):

            batch = files[i:i + batch_size]
            imgs, masks = [], []

            for img_file in batch:

                img_path = os.path.join(img_dir, img_file)
                mask_file = os.path.splitext(img_file)[0] + ".png"
                mask_path = os.path.join(mask_dir, mask_file)

                if not os.path.exists(mask_path):
                    continue

                img, mask = load_image_mask(img_path, mask_path)

                imgs.append(img)
                masks.append(mask)

            if len(imgs) > 0:
                yield np.array(imgs), np.array(masks)


train_gen = subset_generator(
    TRAIN_IMG,
    TRAIN_MASK,
    train_files,
    BATCH_SIZE
)

val_gen = subset_generator(
    TRAIN_IMG, 
    TRAIN_MASK,
    val_files,
    BATCH_SIZE
)

from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Stop if validation stops improving
early_stop = EarlyStopping(
    monitor="val_loss",
    patience=5,
    restore_best_weights=True,
    verbose=1
)

# Save best model automatically
checkpoint = ModelCheckpoint(
    filepath="unetpp_best.keras",
    monitor="val_loss",
    save_best_only=True,
    verbose=1
)

Train samples: 3146
Val samples: 787


In [33]:
model.fit(
    train_gen,
    steps_per_epoch=len(train_files) // BATCH_SIZE,
    validation_data=val_gen,
    validation_steps=len(val_files) // BATCH_SIZE,
    epochs=EPOCHS,
    callbacks=[early_stop, checkpoint]
)

Epoch 1/25


2026-02-05 12:07:50.143250: I external/local_xla/xla/service/service.cc:163] XLA service 0x7fb49805ca20 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2026-02-05 12:07:50.143412: I external/local_xla/xla/service/service.cc:171]   StreamExecutor device (0): NVIDIA GeForce RTX 4050 Laptop GPU, Compute Capability 8.9
2026-02-05 12:07:50.387704: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:269] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2026-02-05 12:07:51.892370: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:473] Loaded cuDNN version 91801
2026-02-05 12:07:52.739786: W external/local_xla/xla/tsl/framework/bfc_allocator.cc:310] Allocator (GPU_0_bfc) ran out of memory trying to allocate 4.11GiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2026-02-05 12:07:52.89072

[1m1573/1573[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 280ms/step - accuracy: 0.9729 - dice_coefficient: 0.2401 - loss: 0.8971
Epoch 1: val_loss improved from None to 0.92662, saving model to unetpp_best.keras

Epoch 1: finished saving model to unetpp_best.keras
[1m1573/1573[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m516s[0m 304ms/step - accuracy: 0.9845 - dice_coefficient: 0.3776 - loss: 0.7006 - val_accuracy: 0.9557 - val_dice_coefficient: 0.2475 - val_loss: 0.9266
Epoch 2/25
[1m1573/1573[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 281ms/step - accuracy: 0.9910 - dice_coefficient: 0.6443 - loss: 0.3964
Epoch 2: val_loss improved from 0.92662 to 0.37152, saving model to unetpp_best.keras

Epoch 2: finished saving model to unetpp_best.keras
[1m1573/1573[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m474s[0m 302ms/step - accuracy: 0.9915 - dice_coefficient: 0.6734 - loss: 0.3672 - val_accuracy: 0.9921 - val_dice_coefficient: 0.6722 - val_loss: 0.3715
Epoc

<keras.src.callbacks.history.History at 0x7fb5ae152c90>

In [34]:
def predict_and_save(model, img_dir, mask_dir):

    os.makedirs(PRED_MASK_DIR, exist_ok=True)
    os.makedirs(OVERLAY_DIR, exist_ok=True)

    img_files = sorted([
        f for f in os.listdir(img_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ])

    for file in img_files:

        # image path
        img_path = os.path.join(img_dir, file)
        
        # mask name
        mask_file = os.path.splitext(file)[0] + ".png"
        mask_path = os.path.join(mask_dir, mask_file)

        # skip if mask missing 
        if not os.path.exists(mask_path):
            print(f"Skipping (mask not found): {file}")
            continue

        # load image & mask 
        img, true_mask = load_image_mask(img_path, mask_path)

        # prediction 
        pred = model.predict(img[np.newaxis, ...], verbose=0)[0]
        pred_mask = (pred > 0.5).astype(np.uint8) * 255

        # save predicted mask 
        cv2.imwrite(
            os.path.join(PRED_MASK_DIR, mask_file),
            pred_mask.squeeze()
        )

        # overlay 
        img_vis = cv2.imread(img_path)
        img_vis = cv2.resize(img_vis, (IMG_SIZE, IMG_SIZE))

        overlay = img_vis.copy()
        overlay[pred_mask.squeeze() == 255] = [0, 0, 255]  # red tumor

        combined = cv2.addWeighted(img_vis, 0.7, overlay, 0.3, 0)

        cv2.imwrite(
            os.path.join(OVERLAY_DIR, file),
            combined
        )

    print("Predictions, masks & overlays saved successfully.")


In [35]:
predict_and_save(model, TEST_IMG, TEST_MASK)

Predictions, masks & overlays saved successfully.


In [36]:
model.save("brisc2025_unetpp_latest.keras")

In [37]:
from tensorflow.keras.models import load_model

model = load_model(
    "brisc2025_unetpp_latest.keras",
    custom_objects={
        "bce_dice_loss": bce_dice_loss,
        "dice_coefficient": dice_coefficient
    }
)

In [38]:
from tqdm import tqdm
import os
import numpy as np

# Metric helpers (NumPy)

def dice_np(y_true, y_pred, smooth=1e-6):
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    intersection = np.sum(y_true * y_pred)

    return (2. * intersection + smooth) / (
        np.sum(y_true) + np.sum(y_pred) + smooth
    )


def iou_np(y_true, y_pred, smooth=1e-6):
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    intersection = np.sum(y_true * y_pred)
    union = np.sum(y_true) + np.sum(y_pred) - intersection

    return (intersection + smooth) / (union + smooth)


def precision_recall_np(y_true, y_pred):
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    tp = np.sum((y_true == 1) & (y_pred == 1))
    fp = np.sum((y_true == 0) & (y_pred == 1))
    fn = np.sum((y_true == 1) & (y_pred == 0))

    precision = tp / (tp + fp + 1e-6)
    recall = tp / (tp + fn + 1e-6)

    return precision, recall


def accuracy_np(y_true, y_pred):
    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    correct = np.sum(y_true == y_pred)
    total = len(y_true)

    return correct / total


# Evaluate on test set

def evaluate_test_set(model, img_dir, mask_dir):

    image_files = sorted([
        f for f in os.listdir(img_dir)
        if f.lower().endswith((".jpg", ".jpeg", ".png"))
    ])

    dice_scores = []
    iou_scores = []
    precision_scores = []
    recall_scores = []
    accuracy_scores = []

    print(" Evaluating test set...")

    for file in tqdm(image_files):

        img_path = os.path.join(img_dir, file)

        mask_path = os.path.join(
            mask_dir,
            os.path.splitext(file)[0] + ".png"
        )

        if not os.path.exists(mask_path):
            continue

        img, gt_mask = load_image_mask(img_path, mask_path)

        pred = model.predict(img[np.newaxis, ...], verbose=0)[0]
        pred_bin = (pred > 0.5).astype(np.uint8)

        gt_bin = gt_mask.astype(np.uint8)

        dice_scores.append(dice_np(gt_bin, pred_bin))
        iou_scores.append(iou_np(gt_bin, pred_bin))

        p, r = precision_recall_np(gt_bin, pred_bin)
        precision_scores.append(p)
        recall_scores.append(r)

        accuracy_scores.append(
            accuracy_np(gt_bin, pred_bin)
        )

    print("\n===== TEST SET RESULTS =====")
    print(f"Mean Dice     : {np.mean(dice_scores):.4f}")
    print(f"Mean IoU      : {np.mean(iou_scores):.4f}")
    print(f"Mean Precision: {np.mean(precision_scores):.4f}")
    print(f"Mean Recall   : {np.mean(recall_scores):.4f}")
    print(f"Mean Accuracy : {np.mean(accuracy_scores):.4f}")

    return {
        "dice": dice_scores,
        "iou": iou_scores,
        "precision": precision_scores,
        "recall": recall_scores,
        "accuracy": accuracy_scores,
    }


# RUN EVALUATION
results = evaluate_test_set(
    model,
    TEST_IMG,
    TEST_MASK
)


 Evaluating test set...


100%|████████████████████████████████████████████████████████████████████████████████████████| 860/860 [01:27<00:00,  9.81it/s]


===== TEST SET RESULTS =====
Mean Dice     : 0.8178
Mean IoU      : 0.7460
Mean Precision: 0.8305
Mean Recall   : 0.8440
Mean Accuracy : 0.9947



