In [13]:
import numpy as np
from PIL import Image
import os, io

from sklearn.model_selection import train_test_split

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import tensorflow.keras.backend as K
import tensorflow as tf
import datetime
import matplotlib.pyplot as plt

In [22]:
# Define Dice coefficient metric
def dice_coefficient(y_true, y_pred):
    smooth = 1.0
    y_true_f = K.flatten(K.cast(y_true, 'float32'))
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

# Define Dice loss
def dice_loss(y_true, y_pred):
    return 1 - dice_coefficient(y_true, y_pred)

In [23]:
def load_tif_data(data_dir):
    images, masks = [], []
    for patient_dir in os.listdir(data_dir):
        patient_path = os.path.join(data_dir, patient_dir)
        if not os.path.isdir(patient_path):
            continue
        # Get all .tif files in the patient directory
        tif_files = [f for f in os.listdir(patient_path) if f.endswith('.tif') and 'mask' not in f]
        for img_file in tif_files:
            img_path = os.path.join(patient_path, img_file)
            # Construct corresponding mask filename
            mask_file = img_file.replace('.tif', '_mask.tif')
            mask_path = os.path.join(patient_path, mask_file)
            
            if not os.path.exists(mask_path):
                continue  # Skip if mask file doesn’t exist
            
            # Load image and mask
            img = np.array(Image.open(img_path).convert('L'))  # Grayscale
            mask = np.array(Image.open(mask_path).convert('L'))
            
            # Normalize image to [0, 1]
            img = img / 255.0
            # Binarize mask (0 or 1)
            mask = (mask > 0).astype(np.uint8)
            
            # Add channel dimension (256x256x1)
            images.append(img[..., np.newaxis])
            masks.append(mask[..., np.newaxis])
    
    return np.array(images), np.array(masks)

In [16]:
data_dir = './dataset/'  # Update with your dataset path
images, masks = load_tif_data(data_dir)

In [24]:
images_train, images_temp, masks_train, masks_temp = train_test_split(
    images, masks, test_size=0.3, random_state=42
)
images_val, images_test, masks_val, masks_test = train_test_split(
    images_temp, masks_temp, test_size=0.5, random_state=42
)

print(f"Train: {len(images_train)}, Val: {len(images_val)}, Test: {len(images_test)}")

Train: 2750, Val: 589, Test: 590


In [25]:
# Define TumorSegNet (same as before, included for completeness)
def TumorSegNet(input_shape=(256, 256, 1)):
    inputs = Input(input_shape, name='Input_Layer')
    # Encoder
    c1 = Conv2D(64, 3, activation='relu', padding='same', name='Conv1_1')(inputs)
    c1 = Conv2D(64, 3, activation='relu', padding='same', name='Conv1_2')(c1)
    p1 = MaxPooling2D((2, 2), name='MaxPool1')(c1)
    c2 = Conv2D(128, 3, activation='relu', padding='same', name='Conv2_1')(p1)
    c2 = Conv2D(128, 3, activation='relu', padding='same', name='Conv2_2')(c2)
    p2 = MaxPooling2D((2, 2), name='MaxPool2')(c2)
    c3 = Conv2D(256, 3, activation='relu', padding='same', name='Conv3_1')(p2)
    c3 = Conv2D(256, 3, activation='relu', padding='same', name='Conv3_2')(c3)
    p3 = MaxPooling2D((2, 2), name='MaxPool3')(c3)
    # Bottleneck
    c4 = Conv2D(512, 3, activation='relu', padding='same', name='Bottleneck_Conv1')(p3)
    c4 = Conv2D(512, 3, activation='relu', padding='same', name='Bottleneck_Conv2')(c4)
    c4 = Dropout(0.5, name='Bottleneck_Dropout')(c4)
    # Decoder
    u3 = UpSampling2D((2, 2), name='UpSample3')(c4)
    u3 = concatenate([u3, c3], name='Concat3')
    c5 = Conv2D(256, 3, activation='relu', padding='same', name='Conv5_1')(u3)
    c5 = Conv2D(256, 3, activation='relu', padding='same', name='Conv5_2')(c5)
    u2 = UpSampling2D((2, 2), name='UpSample2')(c5)
    u2 = concatenate([u2, c2], name='Concat2')
    c6 = Conv2D(128, 3, activation='relu', padding='same', name='Conv6_1')(u2)
    c6 = Conv2D(128, 3, activation='relu', padding='same', name='Conv6_2')(c6)
    u1 = UpSampling2D((2, 2), name='UpSample1')(c6)
    u1 = concatenate([u1, c1], name='Concat1')
    c7 = Conv2D(64, 3, activation='relu', padding='same', name='Conv7_1')(u1)
    c7 = Conv2D(64, 3, activation='relu', padding='same', name='Conv7_2')(c7)
    outputs = Conv2D(1, 1, activation='sigmoid', name='Output_Layer')(c7)
    return Model(inputs, outputs, name='TumorSegNet')

# Instantiate and compile model
model = TumorSegNet(input_shape=(256, 256, 1))
model.compile(optimizer='adam', loss=dice_loss, metrics=[dice_coefficient, 'accuracy'])

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=10,
    horizontal_flip=True,
    fill_mode='nearest'
)

# TensorBoard callback
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(
    log_dir=log_dir,
    histogram_freq=1,
    write_graph=True,
    write_images=True,
    update_freq='epoch',
    profile_batch=0
)

# Custom callback to log sample predictions to TensorBoard
class PredictionLogger(tf.keras.callbacks.Callback):
    def __init__(self, val_images, val_masks):
        super(PredictionLogger, self).__init__()
        self.val_images = val_images[:5]  # Log first 5 validation images
        self.val_masks = val_masks[:5]

    def on_epoch_end(self, epoch, logs=None):
        preds = self.model.predict(self.val_images)
        preds = (preds > 0.5).astype(np.uint8)
        
        # Create figure with images, true masks, and predictions
        fig = plt.figure(figsize=(15, 5 * len(self.val_images)))
        for i in range(len(self.val_images)):
            plt.subplot(len(self.val_images), 3, i * 3 + 1)
            plt.imshow(self.val_images[i].squeeze(), cmap='gray')
            plt.title('MRI Image')
            plt.axis('off')
            plt.subplot(len(self.val_images), 3, i * 3 + 2)
            plt.imshow(self.val_masks[i].squeeze(), cmap='gray')
            plt.title('True Mask')
            plt.axis('off')
            plt.subplot(len(self.val_images), 3, i * 3 + 3)
            plt.imshow(preds[i].squeeze(), cmap='gray')
            plt.title(f'Predicted Mask (Epoch {epoch + 1})')
            plt.axis('off')
        
        # Log figure to TensorBoard
        writer = tf.summary.create_file_writer(log_dir)
        with writer.as_default():
            tf.summary.image("Validation Predictions", plot_to_image(fig), step=epoch)
        plt.close(fig)

# Convert matplotlib figure to TensorBoard image format
def plot_to_image(figure):
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    plt.close(figure)
    buf.seek(0)
    image = tf.image.decode_png(buf.getvalue(), channels=4)
    image = tf.expand_dims(image, 0)
    return image

# Other callbacks
callbacks = [
    EarlyStopping(patience=10, restore_best_weights=True, monitor='val_loss'),
    ModelCheckpoint('TumorSegNet_best.keras', save_best_only=True, monitor='val_dice_coefficient'),
    tensorboard_callback,
    PredictionLogger(images_val, masks_val)
]

# Train the model
history = model.fit(
    datagen.flow(images_train, masks_train, batch_size=16),
    validation_data=(images_val, masks_val),
    epochs=50,
    callbacks=callbacks
)

Epoch 1/50


  self._warn_if_super_not_called()


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step/step - accuracy: 0.9233 - dice_coefficient: 0.0067 - loss: 0.9933
[1m172/172[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2939s[0m 17s/step - accuracy: 0.9236 - dice_coefficient: 0.0067 - loss: 0.9933 - val_accuracy: 0.9911 - val_dice_coefficient: 6.4429e-05 - val_loss: 0.9999
Epoch 2/50
[1m  3/172[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m48:06[0m 17s/step - accuracy: 0.9862 - dice_coefficient: 7.8753e-05 - loss: 0.9999

KeyboardInterrupt: 