In [1]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
# import kagglehub
# toygarr_camus_dataset_path = kagglehub.dataset_download('toygarr/camus-dataset')
# toygarr_camus_subject_based_path = kagglehub.dataset_download('toygarr/camus-subject-based')

# print('Data source import complete.')


In [2]:
# Standard
import os
import sys
import datetime
import matplotlib.pyplot as plt
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)


import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler
from tensorflow.keras import backend as keras
import tensorflow.keras.backend as K
from tensorflow import argmax
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split

# Utils
import h5py

**Note:** In this notebook, I did replication of https://github.com/albergcg/camus_challenge and I tried something different that I was curious about. I trained model step by step without data leakage.

If you encounter with any error, you can find the dataset from following links:

ready-to-use version of camus: https://www.kaggle.com/datasets/toygarr/camus-dataset<br/>
subject-based splitted version of camus: https://www.kaggle.com/datasets/toygarr/camus-subject-based<br/>
original dataset: https://humanheart-project.creatis.insa-lyon.fr/database/#collection/6373703d73e9f0047faa1bc8

In [3]:
f = h5py.File("segment_data/image_dataset.hdf5", "r")
print("1")

1


In [4]:
frames2ch = f["train 2ch frames"][:,:,:,:]
masks2ch = f["train 2ch masks"][:,:,:,:]

In [5]:
train_frames, test_frames, train_masks, test_masks = train_test_split(frames2ch, masks2ch)

In [6]:

def multiclass_dice(y_true, y_pred, smooth=1e-2, num_classes=4):
    '''
    Multiclass Dice score. Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes=4)[...,1:])
    y_pred_f = K.flatten(K.one_hot(argmax(y_pred, axis=3), num_classes=4)[...,1:])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))


def dice_lv(y_true, y_pred, smooth=1e-2, num_classes=4):
    '''
    Multiclass Dice score. Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes=4)[...,1:2])
    y_pred_f = K.flatten(K.one_hot(argmax(y_pred, axis=3), num_classes=4)[...,1:2])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))

def dice_la(y_true, y_pred, smooth=1e-2, num_classes=4):
    '''
    Multiclass Dice score. Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes=4)[...,3:4])
    y_pred_f = K.flatten(K.one_hot(argmax(y_pred, axis=3), num_classes=4)[...,3:4])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))

def dice_myo(y_true, y_pred, smooth=1e-2, num_classes=4):
    '''
    Multiclass Dice score. Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes=4)[...,2:3])
    y_pred_f = K.flatten(K.one_hot(argmax(y_pred, axis=3), num_classes=4)[...,2:3])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))

In [7]:
def generalized_dice_loss(y_true, y_pred, smooth=1e-2, num_classes=4):

    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes=4)[...,1:])
    y_pred_f = K.flatten(y_pred[...,1:])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return 1.0-K.mean((2. * intersect / (denom + smooth)))

In [8]:
import tensorflow as tf
from tensorflow.keras import layers, models


def double_conv2d(x, filters):
    x = layers.Conv2D(filters, kernel_size=3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)

    x = layers.Conv2D(filters, kernel_size=3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    return x


def CamusUnet2(input_shape=(384, 384, 1), num_classes=4):
    inputs = layers.Input(shape=input_shape)

    # Encoder
    d1 = double_conv2d(inputs, 48)
    p1 = layers.MaxPooling2D((2, 2))(d1)

    d2 = double_conv2d(p1, 96)
    p2 = layers.MaxPooling2D((2, 2))(d2)

    d3 = double_conv2d(p2, 192)
    p3 = layers.MaxPooling2D((2, 2))(d3)

    d4 = double_conv2d(p3, 384)
    p4 = layers.MaxPooling2D((2, 2))(d4)

    d5 = double_conv2d(p4, 768)

    # Decoder
    u1 = layers.Conv2DTranspose(384, (2, 2), strides=(2, 2), padding='same')(d5)
    u1 = layers.Concatenate()([u1, d4])
    u1 = double_conv2d(u1, 384)

    u2 = layers.Conv2DTranspose(192, (2, 2), strides=(2, 2), padding='same')(u1)
    u2 = layers.Concatenate()([u2, d3])
    u2 = double_conv2d(u2, 192)

    u3 = layers.Conv2DTranspose(96, (2, 2), strides=(2, 2), padding='same')(u2)
    u3 = layers.Concatenate()([u3, d2])
    u3 = double_conv2d(u3, 96)

    u4 = layers.Conv2DTranspose(48, (2, 2), strides=(2, 2), padding='same')(u3)
    u4 = layers.Concatenate()([u4, d1])
    u4 = double_conv2d(u4, 48)

    # Output layer
    outputs = layers.Conv2D(num_classes, (1, 1), activation='softmax')(u4)

    return models.Model(inputs, outputs)


In [9]:
model = CamusUnet2(input_shape=(384, 384, 1), num_classes=4)
model.summary()


In [10]:
earlystop = EarlyStopping(monitor='val_multiclass_dice', min_delta=0, patience=5,
                          verbose=1, mode="max", restore_best_weights = True)

reduce_lr = ReduceLROnPlateau(monitor='val_multiclass_dice', factor=0.2, patience=2,
                              verbose=1, mode="max", min_lr=1e-2)

In [11]:
model.compile(optimizer=Adam(learning_rate=1e-2), loss="sparse_categorical_crossentropy", metrics=[multiclass_dice, "accuracy"])

In [None]:
history = model.fit(x=train_frames,
                    y=train_masks,
                    validation_data=[test_frames, test_masks],
                    batch_size=8,
                    epochs=30,
                    callbacks=[earlystop, reduce_lr])

Epoch 1/30
[1m 5/85[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m36:15[0m 27s/step - accuracy: 0.4372 - loss: 1.6125 - multiclass_dice: 0.1893

In [None]:
def display_image_grid(test_frames, test_masks, predicted_masks=None):
    cols = 3 if predicted_masks else 2
    rows = 10
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 24))
    for i in range(10):
        img = test_frames[i,:,:,0]
        mask = test_masks[i,:,:,0]

        ax[i, 0].imshow(img, cmap='gray')
        ax[i, 1].imshow(mask, interpolation="nearest")

        ax[i, 0].set_title("Image")
        ax[i, 1].set_title("Ground truth mask")

        ax[i, 0].set_axis_off()
        ax[i, 1].set_axis_off()

        if predicted_masks:
            predicted_mask = predicted_masks[i]
            ax[i, 2].imshow(predicted_mask)
            ax[i, 2].set_title("Predicted mask")
            ax[i, 2].set_axis_off()
    plt.tight_layout()
    plt.show()


predicted_masks = []
for i in range(10):
    prediction = model.predict(test_frames[i:i+1,:,:,:])
    prediction = prediction.reshape([384, 384, 4])
    y = tf.convert_to_tensor(prediction)
    predicted_masks.append(tf.math.argmax(prediction, axis = 2))

display_image_grid(test_frames, test_masks, predicted_masks=predicted_masks)

In [None]:
def plot_graphs(history, string):
    plt.plot(history.history[string])
    plt.plot(history.history['val_'+string])
    plt.xlabel("Epochs")
    plt.ylabel(string)
    plt.legend([string, 'val_'+string])
    plt.show()

plot_graphs(history, "accuracy")
plot_graphs(history, "loss")
plot_graphs(history, "multiclass_dice")

# For 4ch

In [None]:
frames4ch = f["train 4ch frames"][:,:,:,:]
masks4ch = f["train 4ch masks"][:,:,:,:]

In [None]:
train_frames, test_frames, train_masks, test_masks = train_test_split(frames4ch, masks4ch)

In [None]:
train_frames.shape

In [None]:
earlystop = EarlyStopping(monitor='val_multiclass_dice', min_delta=0, patience=5,
                          verbose=1, mode="max", restore_best_weights = True)

reduce_lr = ReduceLROnPlateau(monitor='val_multiclass_dice', factor=0.2, patience=2,
                              verbose=1, mode="max", min_lr=1e-5)

In [None]:
model.compile(optimizer=Adam(lr=1e-3), loss="sparse_categorical_crossentropy", metrics=[multiclass_dice, "accuracy"])

history = model.fit(x=train_frames,
                    y=train_masks,
                    validation_data=[test_frames, test_masks],
                    batch_size=5,
                    epochs=50,
                    callbacks=[earlystop, reduce_lr])

In [None]:
def display_image_grid(test_frames, test_masks, predicted_masks=None):
    cols = 3 if predicted_masks else 2
    rows = 10
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(10, 24))
    for i in range(10):
        img = test_frames[i,:,:,0]
        mask = test_masks[i,:,:,0]

        ax[i, 0].imshow(img, cmap='gray')
        ax[i, 1].imshow(mask, interpolation="nearest")

        ax[i, 0].set_title("Image")
        ax[i, 1].set_title("Ground truth mask")

        ax[i, 0].set_axis_off()
        ax[i, 1].set_axis_off()

        if predicted_masks:
            predicted_mask = predicted_masks[i]
            ax[i, 2].imshow(predicted_mask)
            ax[i, 2].set_title("Predicted mask")
            ax[i, 2].set_axis_off()
    plt.tight_layout()
    plt.show()


predicted_masks = []
for i in range(10):
    prediction = model.predict(test_frames[i:i+1,:,:,:])
    prediction = prediction.reshape([384, 384, 4])
    y = tf.convert_to_tensor(prediction)
    predicted_masks.append(tf.math.argmax(prediction, axis = 2))

display_image_grid(test_frames, test_masks, predicted_masks=predicted_masks)

In [None]:
def plot_graphs(history, string):
    plt.plot(history.history[string])
    plt.plot(history.history['val_'+string])
    plt.xlabel("Epochs")
    plt.ylabel(string)
    plt.legend([string, 'val_'+string])
    plt.show()

plot_graphs(history, "accuracy")
plot_graphs(history, "loss")
plot_graphs(history, "multiclass_dice")