In [1]:
import os
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# import data 
data = np.load('/kaggle/input/homework2cleaned/mars_no_aliens.npz')
training_set = data["training_set"]

X_train = training_set[:, 0]
y_train = training_set[:, 1]

X_test = data["test_set"]

#add dimention 
if X_train.ndim == 3:
    X_train = np.expand_dims(X_train, axis=-1)  # 转换为 (samples, height, width, 1)
if y_train.ndim == 3:
    y_train = np.expand_dims(y_train, axis=-1)  # 转换为 (samples, height, width, 1)

# split train and validation set
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42
)
#y_val = tf.keras.utils.to_categorical(y_val, num_classes=5)


print(f"X_train shape: {X_train.shape}")  # should be (2092, 64, 128, 1)
print(f"X_val shape: {X_val.shape}")      # should be (523, 64, 128, 1)
print(f"y_train shape: {y_train.shape}")  # should be (2092, 64, 128, 1)
print(f"y_val shape: {y_val.shape}")      # should be (523, 64, 128, 1)


X_train shape: (2004, 64, 128, 1)
X_val shape: (501, 64, 128, 1)
y_train shape: (2004, 64, 128, 1)
y_val shape: (501, 64, 128, 1)


In [2]:
import tensorflow as tf
import numpy as np
import scipy.ndimage

# image size and batch size
IMAGE_SIZE = (64, 128)
BATCH_SIZE = 32
AUTOTUNE = tf.data.experimental.AUTOTUNE

def preprocess_image_and_mask(image, mask):
    image = tf.cast(image, tf.float32)/255.0
    mask = tf.cast(mask, tf.float32)
    image = tf.image.grayscale_to_rgb(image)
    # mask = tf.one_hot(tf.cast(mask, tf.int32), depth=5)
    return image, mask

def elastic_transform(image, mask, alpha, sigma, random_state=None):
    
    if random_state is None:
        random_state = np.random.RandomState(None)

    shape = image.shape[:2]  # Only height and width are needed

    # Generate random displacement fields
    dx = random_state.uniform(-1, 1, size=shape) * alpha
    dy = random_state.uniform(-1, 1, size=shape) * alpha

    # Smooth the displacement fields using Gaussian filter
    dx = scipy.ndimage.gaussian_filter(dx, sigma=sigma, mode="constant", cval=0)
    dy = scipy.ndimage.gaussian_filter(dy, sigma=sigma, mode="constant", cval=0)

    # Create coordinate grid
    x, y = np.meshgrid(np.arange(shape[1]), np.arange(shape[0]))
    indices = np.stack([(y + dy).flatten(), (x + dx).flatten()])

    # Apply transformation for each channel independently
    transformed_image = np.zeros_like(image)
    for i in range(image.shape[2]):  # Iterate over channels
        transformed_image[..., i] = scipy.ndimage.map_coordinates(
            image[..., i], indices, order=1, mode='reflect'
        ).reshape(shape)

    # Apply transformation for mask
    transformed_mask = scipy.ndimage.map_coordinates(
        mask[..., 0], indices, order=1, mode='reflect'
    ).reshape(shape + (1,))

    return transformed_image, transformed_mask

def elastic_transform_wrapper(image, mask, alpha=34, sigma=4):
    def numpy_transform(image, mask):
        image = image.numpy()
        mask = mask.numpy()
        transformed_image, transformed_mask = elastic_transform(image, mask, alpha, sigma)

        # Ensure mask shape has a single channel
        if transformed_mask.ndim == 2:  
            transformed_mask = np.expand_dims(transformed_mask, axis=-1)
        return transformed_image, transformed_mask

    transformed_image, transformed_mask = tf.py_function(
        func=numpy_transform,
        inp=[image, mask],
        Tout=[tf.float32, tf.float32]  
    )

    transformed_image.set_shape(image.shape)
    transformed_mask.set_shape(mask.shape)
    return transformed_image, transformed_mask


In [3]:

import numpy as np
import random

def extract_regions_with_mask(image, mask, target_label=4):
    """
    extract `mask == 4` region
    """
    regions = []
    mask_positions = np.where(mask == target_label)  
    if len(mask_positions[0]) > 0:
        x_min, x_max = mask_positions[0].min(), mask_positions[0].max()
        y_min, y_max = mask_positions[1].min(), mask_positions[1].max()
        cropped_region = image[x_min:x_max+1, y_min:y_max+1]
        cropped_mask = mask[x_min:x_max+1, y_min:y_max+1]
        regions.append((cropped_region, cropped_mask))
    return regions

def insert_region(image, mask, region, region_mask):
    """
    random insert
    """
    region_h, region_w, _ = region.shape
    img_h, img_w, _ = image.shape

    max_x = img_h - region_h
    max_y = img_w - region_w
    
    if max_x < 0 or max_y < 0:
        return image, mask
    
    start_x = random.randint(0, max_x)
    start_y = random.randint(0, max_y)
    
    updated_image = image.copy()
    updated_image[start_x:start_x+region_h, start_y:start_y+region_w] = region
    
    updated_mask = mask.copy()
    updated_mask[start_x:start_x+region_h, start_y:start_y+region_w] = region_mask
    
    return updated_image, updated_mask

def augment_with_fixed_regions(X_train, y_train, target_label=4, num_regions=1):
    """
    ensuse every pic contains `mask=4` region，and maintian the shape
    """
    images_with_target = [(img, msk) for img, msk in zip(X_train, y_train) if np.any(msk == target_label)]
    regions = []
    for image, mask in images_with_target:
        regions.extend(extract_regions_with_mask(image, mask, target_label))
    
    if not regions:
        print("No regions with mask=4 found.")
        return X_train, y_train
    
    augmented_X_train = []
    augmented_y_train = []
    
    for image, mask in zip(X_train, y_train):
        augmented_image = image.copy()
        augmented_mask = mask.copy()
        
        for _ in range(num_regions):
            region, region_mask = random.choice(regions)  
            augmented_image, augmented_mask = insert_region(augmented_image, augmented_mask, region, region_mask)
        
        augmented_X_train.append(augmented_image)
        augmented_y_train.append(augmented_mask)
    
    return np.array(augmented_X_train), np.array(augmented_y_train)


X_train_augmented, y_train_augmented = augment_with_fixed_regions(X_train, y_train, target_label=4, num_regions=1)
#y_train_one_hot = tf.keras.utils.to_categorical(y_train_augmented, num_classes=5)


In [4]:
import tensorflow as tf

# train dataset
original_train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
original_train_dataset = (
    original_train_dataset
    .map(preprocess_image_and_mask, num_parallel_calls=tf.data.AUTOTUNE)
    #.map(elastic_transform_wrapper,num_parallel_calls = tf.data.AUTOTUNE)
    .shuffle(10 * BATCH_SIZE)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)

# validation dataset
val_dataset = tf.data.Dataset.from_tensor_slices((X_val, y_val))
val_dataset = (
    val_dataset
    .map(preprocess_image_and_mask, num_parallel_calls=tf.data.AUTOTUNE)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)


In [5]:
# Import other libraries
import os
import math
from PIL import Image
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

NUM_CLASSES =5
seed = 42

# Import TensorFlow and Keras
import tensorflow as tf
from tensorflow import keras as tfk
from tensorflow.keras import layers as tfkl

In [33]:
def attention_weighted_fusion(skip_connection, upsampled_features, filters, name):
    # Adjust skip connection to match upsampled features dimensions
    skip_connection = conv1x1(skip_connection, filters, name=name + "_adjust_skip")
    upsampled_features = conv1x1(upsampled_features, filters, name=name + "_adjust_upsampled")

    # Ensure spatial dimensions match
    skip_shape = tf.shape(skip_connection)
    upsample_shape = tf.shape(upsampled_features)

    if skip_shape[1] != upsample_shape[1] or skip_shape[2] != upsample_shape[2]:
        upsampled_features = tf.image.resize(upsampled_features, [skip_shape[1], skip_shape[2]])

    # Attention mechanism
    gate = Conv2D(filters, kernel_size=1, padding='same', kernel_initializer=initializer, name=name + '_gate_conv')(upsampled_features)
    attention = Add(name=name + '_add')([skip_connection, gate])
    attention = tf.keras.layers.Activation('relu', name=name + '_activation')(attention)
    attention_weights = Conv2D(1, kernel_size=1, padding='same', activation='sigmoid', kernel_initializer=initializer, name=name + '_attention_weights')(attention)

    # Apply attention weights
    weighted_skip = Multiply(name=name + "_weighted_skip")([attention_weights, skip_connection])
    weighted_upsampled = Multiply(name=name + "_weighted_upsampled")([(1 - attention_weights), upsampled_features])

    # Combine skip and upsampled features
    fused_features = Add(name=name + "_fused_features")([weighted_skip, weighted_upsampled])
    return fused_features


In [6]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, LayerNormalization, BatchNormalization, Add, Multiply, Concatenate
from tensorflow.keras.models import Model

initializer = tf.keras.initializers.HeNormal()

# 1x1 convolution for dimension adjustment
def conv1x1(inputs, filters, name):
    return Conv2D(filters, kernel_size=1, padding='same', kernel_initializer=initializer, name=name)(inputs)

# U-Net block with Batch Normalization
def unet_block_with_bn(inputs, filters, name):
    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv1')(inputs)
    x = BatchNormalization(name=name + 'bn1')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu1')(x)

    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv2')(x)
    x = BatchNormalization(name=name + 'bn2')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu2')(x)

    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv3')(x)
    x = BatchNormalization(name=name + 'bn3')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu3')(x)
    return x

# U-Net block with Layer Normalization
def unet_block_with_ln(inputs, filters, name):
    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv1')(inputs)
    x = LayerNormalization(name=name + 'ln1')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu1')(x)

    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv2')(x)
    x = LayerNormalization(name=name + 'ln2')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu2')(x)

    x = Conv2D(filters, kernel_size=3, padding='same', kernel_initializer=initializer, name=name + 'conv3')(x)
    x = LayerNormalization(name=name + 'ln3')(x)
    x = tf.keras.layers.ReLU(name=name + 'relu3')(x)
    return x

# SE Attention Module
def se_block(inputs, reduction=16, name="se_block"):
    filters = inputs.shape[-1]
    se = tf.keras.layers.GlobalAveragePooling2D(name=name + '_gap')(inputs)
    se = tf.keras.layers.LayerNormalization(name=name + '_ln1')(se)
    se = tf.keras.layers.Dense(filters // reduction, activation='relu', name=name + '_fc1')(se)
    se = tf.keras.layers.LayerNormalization(name=name + '_ln2')(se)
    se = tf.keras.layers.Dense(filters, activation='sigmoid', name=name + '_fc2')(se)
    se = tf.keras.layers.Reshape([1, 1, filters], name=name + '_reshape')(se)
    se = tf.keras.layers.Multiply(name=name + '_scale')([inputs, se])
    return se

# Bottleneck with SE Attention
def bottleneck_with_attention(inputs, filters, name):
    x = unet_block_with_ln(inputs, filters, name=name + "_unet_block")
    x = se_block(x, name=name + "_se_block")
    return x

# Dynamic Weighted Fusion
def dynamic_weighted_fusion(skip_connection, upsampled_features, name):
    skip_connection = conv1x1(skip_connection, upsampled_features.shape[-1], name=name + "_adjust_skip")
    skip_weight = tf.Variable(initial_value=0.5, trainable=True, name=name + "_skip_weight")
    fused_features = Add(name=name + "_fused_features")([
        skip_weight * skip_connection,
        (1 - skip_weight) * upsampled_features
    ])
    return fused_features

# Attention Gate
def attention_gate(encoder_features, decoder_features, filters, name):
    gate = Conv2D(filters, kernel_size=1, padding='same', kernel_initializer=initializer, name=name + '_gate_conv')(decoder_features)
    skip = Conv2D(filters, kernel_size=1, padding='same', kernel_initializer=initializer, name=name + '_skip_conv')(encoder_features)
    combined = Add(name=name + '_add')([gate, skip])
    combined = tf.keras.layers.Activation('relu', name=name + '_activation')(combined)
    combined = Conv2D(1, kernel_size=1, padding='same', activation='sigmoid', kernel_initializer=initializer, name=name + '_attention_weights')(combined)
    attention = Multiply(name=name + '_attention_multiply')([encoder_features, combined])
    return attention

"""
# Full U-Net Model
def get_unet_model(input_shape=(64, 128, 3), num_classes=5, seed=None):
    tf.random.set_seed(seed)
    input_layer = Input(shape=input_shape, name='input_layer')

    # Downsampling path
    down_block_1 = unet_block_with_bn(input_layer, 32, name='down_block1_')
    d1 = MaxPooling2D()(down_block_1)

    down_block_2 = unet_block_with_bn(d1, 64, name='down_block2_')
    d2 = MaxPooling2D()(down_block_2)

    down_block_3 = unet_block_with_bn(d2, 128, name='down_block3_')
    d3 = MaxPooling2D()(down_block_3)

    # Bottleneck
    bottleneck = bottleneck_with_attention(d3, 256, name='bottleneck')

    # Upsampling path
    u1_seg = UpSampling2D()(bottleneck)
    adjusted_skip1 = conv1x1(down_block_3, 128, name='adjust_skip1')
    u1_seg = dynamic_weighted_fusion(adjusted_skip1, u1_seg, name='fusion1')
    u1_seg = unet_block_with_ln(u1_seg, 128, name='up_block1_seg')

    u2_seg = UpSampling2D()(u1_seg)
    adjusted_skip2 = conv1x1(down_block_2, 64, name='adjust_skip2')
    u2_seg = dynamic_weighted_fusion(adjusted_skip2, u2_seg, name='fusion2')
    u2_seg = unet_block_with_ln(u2_seg, 64, name='up_block2_seg')

    u3_seg = UpSampling2D()(u2_seg)
    adjusted_skip3 = conv1x1(down_block_1, 32, name='adjust_skip3')
    u3_seg = dynamic_weighted_fusion(adjusted_skip3, u3_seg, name='fusion3')
    u3_seg = unet_block_with_ln(u3_seg, 32, name='up_block3_seg')

    # Output
    seg_output = Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", kernel_initializer=initializer, name='seg_output')(u3_seg)

    model = Model(inputs=input_layer, outputs=seg_output, name='UNet_with_Dynamic_Fusion')
    return model
"""

In [34]:
# Full U-Net Model
def get_unet_model(input_shape=(64, 128, 3), num_classes=5, seed=None):
    tf.random.set_seed(seed)
    input_layer = Input(shape=input_shape, name='input_layer')

    # Downsampling path
    down_block_1 = unet_block_with_bn(input_layer, 32, name='down_block1_')
    d1 = MaxPooling2D()(down_block_1)

    down_block_2 = unet_block_with_bn(d1, 64, name='down_block2_')
    d2 = MaxPooling2D()(down_block_2)

    down_block_3 = unet_block_with_bn(d2, 128, name='down_block3_')
    d3 = MaxPooling2D()(down_block_3)

    # Bottleneck
    bottleneck = bottleneck_with_attention(d3, 256, name='bottleneck')

    # Upsampling path
    u1_seg = UpSampling2D()(bottleneck)
    adjusted_skip1 = conv1x1(down_block_3, 128, name='adjust_skip1')
    u1_seg = dynamic_weighted_fusion(adjusted_skip1, u1_seg, name='fusion1')
    u1_seg = unet_block_with_ln(u1_seg, 128, name='up_block1_seg')

    u2_seg = UpSampling2D()(u1_seg)
    adjusted_skip2 = conv1x1(down_block_2, 64, name='adjust_skip2')
    u2_seg = dynamic_weighted_fusion(adjusted_skip2, u2_seg, name='fusion2')
    u2_seg = unet_block_with_ln(u2_seg, 64, name='up_block2_seg')

    u3_seg = UpSampling2D()(u2_seg)
    adjusted_skip3 = conv1x1(down_block_1, 32, name='adjust_skip3')
    u3_seg = dynamic_weighted_fusion(adjusted_skip3, u3_seg, name='fusion3')
    u3_seg = unet_block_with_ln(u3_seg, 32, name='up_block3_seg')

    # Output
    seg_output = Conv2D(num_classes, kernel_size=1, padding='same', activation="softmax", kernel_initializer=initializer, name='seg_output')(u3_seg)

    model = Model(inputs=input_layer, outputs=seg_output, name='UNet_with_Dynamic_Fusion')
    return model

In [None]:
"""
def focal_loss(alpha=0.25, gamma=2.0, class_weights=None, class_indexes=[1, 2, 3, 4]):
    def loss(y_true, y_pred):
        # Remove redundant channel dimension if present
        y_true = tf.squeeze(y_true, axis=-1)

        # Convert y_true to one-hot encoding
        num_classes = tf.shape(y_pred)[-1]
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=num_classes)

        # Clip predictions to avoid log(0)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

        # Filter out unwanted classes (exclude background)
        y_true_one_hot = tf.gather(y_true_one_hot, class_indexes, axis=-1)
        y_pred = tf.gather(y_pred, class_indexes, axis=-1)

        # Compute cross-entropy loss
        cross_entropy = -y_true_one_hot * tf.math.log(y_pred)

        # Compute the modulating factor (focal loss term)
        weights = alpha * tf.math.pow(1 - y_pred, gamma)

        # Apply class-specific weights if provided
        if class_weights is not None:
            # Create a weight tensor for the specified class indexes
            class_weight_tensor = tf.constant(
                [class_weights.get(cls, 1.0) for cls in class_indexes],
                dtype=tf.float32
            )
            # Gather class weights for each pixel
            pixel_weights = tf.gather(class_weight_tensor, tf.argmax(y_true_one_hot, axis=-1))
            # Expand pixel_weights to match the last dimension of weights
            pixel_weights = tf.expand_dims(pixel_weights, axis=-1)
            # Broadcast pixel_weights to all classes
            pixel_weights = tf.tile(pixel_weights, [1, 1, 1, len(class_indexes)])
            weights = weights * pixel_weights

        # Compute the focal loss
        focal_loss = tf.reduce_sum(weights * cross_entropy, axis=-1)

        return tf.reduce_mean(focal_loss)
    return loss

"""

In [28]:
# focal loss(class weights)
def focal_loss(alpha=0.25, gamma_dict=None, class_weights=None, class_indexes=[1, 2, 3, 4], label_smoothing=0.1):
    def loss(y_true, y_pred):
        # Remove redundant channel dimension if present
        y_true = tf.squeeze(y_true, axis=-1)

        # Convert y_true to one-hot encoding
        num_classes = tf.shape(y_pred)[-1]
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=num_classes)

        # Ensure label smoothing and num_classes are float types
        y_true_one_hot = tf.cast(y_true_one_hot, tf.float32)
        num_classes = tf.cast(num_classes, tf.float32)

        # Apply label smoothing
        if label_smoothing > 0:
            y_true_one_hot = y_true_one_hot * (1 - label_smoothing) + label_smoothing / num_classes

        # Clip predictions to avoid log(0)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

        # Filter out unwanted classes
        y_true_one_hot = tf.gather(y_true_one_hot, class_indexes, axis=-1)
        y_pred = tf.gather(y_pred, class_indexes, axis=-1)

        # Compute cross-entropy loss
        cross_entropy = -y_true_one_hot * tf.math.log(y_pred)

        # Compute modulating factor with dynamic gamma
        if gamma_dict:
            gamma_tensor = tf.constant(
                [gamma_dict.get(cls, 2.0) for cls in class_indexes],
                dtype=tf.float32
            )
            pixel_gamma = tf.gather(gamma_tensor, tf.argmax(y_true_one_hot, axis=-1))
            pixel_gamma = tf.expand_dims(pixel_gamma, axis=-1)  # Expand to [?, 64, 128, 1]
            weights = alpha * tf.math.pow(1 - y_pred, pixel_gamma)  # Broadcast to [?, 64, 128, 4]
        else:
            weights = alpha * tf.math.pow(1 - y_pred, 2.0)

        # Apply class-specific weights
        if class_weights:
            class_weight_tensor = tf.constant(
                [class_weights.get(cls, 1.0) for cls in class_indexes],
                dtype=tf.float32
            )
            pixel_weights = tf.gather(class_weight_tensor, tf.argmax(y_true_one_hot, axis=-1))
            pixel_weights = tf.expand_dims(pixel_weights, axis=-1)
            pixel_weights = tf.tile(pixel_weights, [1, 1, 1, len(class_indexes)])
            weights = weights * pixel_weights

        # Compute focal loss
        focal_loss = tf.reduce_sum(weights * cross_entropy, axis=-1)

        # Normalize loss by total foreground pixels
        normalizer = tf.reduce_sum(y_true_one_hot, axis=[0, 1, 2])
        class_normalized_loss = tf.reduce_sum(focal_loss) / (tf.reduce_sum(normalizer) + tf.keras.backend.epsilon())

        return class_normalized_loss
    return loss



In [29]:
# Combined loss 

def combined_loss(
    alpha=0.25,
    gamma_dict=None,
    class_weights=None,
    class_indexes=[1, 2, 3, 4],
    label_smoothing=0.1,
    dice_weight=0.5,
    focal_weight=0.5,
    dice_smooth=1e-6
):
    """
    Combined loss function of Focal Loss and Class-Balanced Dice Loss.
    """
    def focal_loss(y_true, y_pred):
        # Remove redundant channel dimension if present
        y_true = tf.squeeze(y_true, axis=-1)

        # Convert y_true to one-hot encoding
        num_classes = tf.shape(y_pred)[-1]
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=num_classes)

        # Ensure label smoothing and num_classes are float types
        y_true_one_hot = tf.cast(y_true_one_hot, tf.float32)
        num_classes = tf.cast(num_classes, tf.float32)

        # Apply label smoothing
        if label_smoothing > 0:
            y_true_one_hot = y_true_one_hot * (1 - label_smoothing) + label_smoothing / num_classes

        # Clip predictions to avoid log(0)
        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())

        # Filter out unwanted classes
        y_true_one_hot = tf.gather(y_true_one_hot, class_indexes, axis=-1)
        y_pred = tf.gather(y_pred, class_indexes, axis=-1)

        # Compute cross-entropy loss
        cross_entropy = -y_true_one_hot * tf.math.log(y_pred)

        # Compute modulating factor with dynamic gamma
        if gamma_dict:
            gamma_tensor = tf.constant(
                [gamma_dict.get(cls, 2.0) for cls in class_indexes],
                dtype=tf.float32
            )
            pixel_gamma = tf.gather(gamma_tensor, tf.argmax(y_true_one_hot, axis=-1))
            pixel_gamma = tf.expand_dims(pixel_gamma, axis=-1)  # Expand to [?, 64, 128, 1]
            weights = alpha * tf.math.pow(1 - y_pred, pixel_gamma)  # Broadcast to [?, 64, 128, 4]
        else:
            weights = alpha * tf.math.pow(1 - y_pred, 2.0)

        # Apply class-specific weights
        if class_weights:
            class_weight_tensor = tf.constant(
                [class_weights.get(cls, 1.0) for cls in class_indexes],
                dtype=tf.float32
            )
            pixel_weights = tf.gather(class_weight_tensor, tf.argmax(y_true_one_hot, axis=-1))
            pixel_weights = tf.expand_dims(pixel_weights, axis=-1)
            pixel_weights = tf.tile(pixel_weights, [1, 1, 1, len(class_indexes)])
            weights = weights * pixel_weights

        # Compute focal loss
        focal_loss = tf.reduce_sum(weights * cross_entropy, axis=-1)

        # Normalize loss by total foreground pixels
        normalizer = tf.reduce_sum(y_true_one_hot, axis=[0, 1, 2])
        class_normalized_loss = tf.reduce_sum(focal_loss) / (tf.reduce_sum(normalizer) + tf.keras.backend.epsilon())

        return class_normalized_loss

    def class_balanced_dice_loss(y_true, y_pred):
        # Remove redundant channel dimension if present
        y_true = tf.squeeze(y_true, axis=-1)

        # Convert y_true to one-hot encoding
        num_classes = tf.shape(y_pred)[-1]
        y_true_one_hot = tf.one_hot(tf.cast(y_true, tf.int32), depth=num_classes)

        # Ensure y_true_one_hot and y_pred are float types
        y_true_one_hot = tf.cast(y_true_one_hot, tf.float32)
        y_pred = tf.cast(y_pred, tf.float32)

        # Filter out unwanted classes
        y_true_one_hot = tf.gather(y_true_one_hot, class_indexes, axis=-1)
        y_pred = tf.gather(y_pred, class_indexes, axis=-1)

        # Compute Dice numerator and denominator
        intersection = tf.reduce_sum(y_true_one_hot * y_pred, axis=[0, 1, 2])
        denominator = tf.reduce_sum(y_true_one_hot + y_pred, axis=[0, 1, 2])

        dice = (2.0 * intersection + dice_smooth) / (denominator + dice_smooth)

        # Apply class weights
        if class_weights:
            class_weight_tensor = tf.constant(
                [class_weights.get(cls, 1.0) for cls in class_indexes],
                dtype=tf.float32
            )
            class_weighted_dice = class_weight_tensor * dice
            dice_loss_value = 1.0 - tf.reduce_mean(class_weighted_dice)
        else:
            dice_loss_value = 1.0 - tf.reduce_mean(dice)

        return dice_loss_value

    def loss(y_true, y_pred):
        # Compute Focal Loss
        focal = focal_loss(y_true, y_pred)

        # Compute Class-Balanced Dice Loss
        dice = class_balanced_dice_loss(y_true, y_pred)

        # Combine losses with weights
        combined = focal_weight * focal + dice_weight * dice
        return combined

    return loss


In [35]:
# Define custom Mean Intersection Over Union metric
class MeanIntersectionOverUnion(tf.keras.metrics.MeanIoU):
    def __init__(self, num_classes, labels_to_exclude=None, name="mean_iou", dtype=None):
        super(MeanIntersectionOverUnion, self).__init__(num_classes=num_classes, name=name, dtype=dtype)
        if labels_to_exclude is None:
            labels_to_exclude = [0]  # Default to excluding label 0
        self.labels_to_exclude = labels_to_exclude

    def update_state(self, y_true, y_pred, sample_weight=None):
        # Convert predictions to class labels
        y_pred = tf.math.argmax(y_pred, axis=-1)
        #y_true = tf.math.argmax(y_true,axis=-1) 

        # Flatten the tensors
        y_true = tf.reshape(y_true, [-1])
        y_pred = tf.reshape(y_pred, [-1])

        # Apply mask to exclude specified labels
        for label in self.labels_to_exclude:
            mask = tf.not_equal(y_true, label)
            y_true = tf.boolean_mask(y_true, mask)
            y_pred = tf.boolean_mask(y_pred, mask)

        # Update the state
        return super().update_state(y_true, y_pred, sample_weight)


In [36]:
from tensorflow.keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint

# Setup callbacks
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_accuracy',
    mode='max',
    patience=PATIENCE,
    restore_best_weights=True
)
reduce_lr = ReduceLROnPlateau(
    monitor='val_accuracy',
    factor=0.5,
    patience=15,
    min_lr=1e-7,
    verbose=1
)

checkpoint = ModelCheckpoint(
    filepath='best_unet.keras',  
    monitor='val_mean_iou',       
    mode='max',
    save_best_only=True,      
    verbose=1                 
)
import numpy as np
from tensorflow.keras.callbacks import LearningRateScheduler

def cosine_annealing_with_warmup(epoch, lr, warmup_epochs=10, max_lr=1e-3, min_lr=1e-6, total_epochs=100):
    if epoch < warmup_epochs:
        # Warm-up 
        return min_lr + (max_lr - min_lr) * (epoch / warmup_epochs)
    else:
        # Cosine Annealing 
        cosine_decay = 0.5 * (1 + np.cos(np.pi * (epoch - warmup_epochs) / (total_epochs - warmup_epochs)))
        return min_lr + (max_lr - min_lr) * cosine_decay


cosine_warmup_scheduler = LearningRateScheduler(
    lambda epoch, lr: cosine_annealing_with_warmup(
        epoch, 
        lr, 
        warmup_epochs=10,   # epochs for warm up
        max_lr=1e-3,        # maxmium learning rate
        min_lr=1e-6,        # minimum learning rate
        total_epochs=EPOCHS   # total training epochs
    ),
    verbose=1
)


In [None]:
"""

# Train the model
history = model.fit(
    original_train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=[early_stopping,checkpoint,cosine_warmup_scheduler],
    verbose=1
).history

# Calculate and print the final validation accuracy
final_val_meanIoU = round(max(history['val_mean_iou'])* 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

# Save the trained model to a file with the accuracy included in the filename
model_filename = 'UNet_'+str(final_val_meanIoU)+'.keras'
model.save(model_filename)

"""

In [37]:
from collections import Counter

# Set learning rate for the optimiser
LEARNING_RATE = 1e-3
# Set early stopping patience threshold
PATIENCE = 30
# Set maximum number of training epochs
EPOCHS_PHASE1 = 40 
EPOCHS_PHASE2 = 360  
TOTAL_EPOCHS = EPOCHS_PHASE1 + EPOCHS_PHASE2

model = get_unet_model()

def calculate_class_weights(y_train):
    flattened_labels = np.concatenate([label.flatten() for label in y_train])
    class_counts = Counter(flattened_labels)
    total_pixels = sum(class_counts.values())
    class_weights = {cls: total_pixels / count for cls, count in class_counts.items()}
    max_weight = max(class_weights.values())
    normalized_class_weights = {cls: weight / max_weight for cls, weight in class_weights.items()}
    return normalized_class_weights

class_weights = calculate_class_weights(y_train)

gamma_dict = {
    1: 2.0,
    2: 5.0,
    3: 2.0,
    4: 10.0
}

# Compile and train the model (Multi-stage training)

# Step 1: train only the decoder part
print("Phase 1: Training decoder only...")
for layer in model.layers:
    if 'down_block' in layer.name or 'bottleneck' in layer.name:
        layer.trainable = False

model.compile(
     loss=focal_loss(
        alpha=0.25,
        gamma_dict=gamma_dict,
        class_weights=class_weights,
        class_indexes=[1, 2, 3, 4],
        label_smoothing=0.1
    ),
    optimizer=tf.keras.optimizers.AdamW(learning_rate=LEARNING_RATE),
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0], name="mean_iou")]
)

history_phase1 = model.fit(
    original_train_dataset,
    epochs=EPOCHS_PHASE1,
    validation_data=val_dataset,
    callbacks=[early_stopping, checkpoint, cosine_warmup_scheduler],
    verbose=0
).history

# Step 2: defrozen all and train the entire model
print("Phase 2: Training entire model...")
for layer in model.layers:
    layer.trainable = True

model.compile(
     loss=focal_loss(
        alpha=0.25,
        gamma_dict=gamma_dict,
        class_weights=class_weights,
        class_indexes=[1, 2, 3, 4],
        label_smoothing=0.1
    ),
    optimizer=tf.keras.optimizers.AdamW(learning_rate=LEARNING_RATE / 10),  
    metrics=["accuracy", MeanIntersectionOverUnion(num_classes=5, labels_to_exclude=[0], name="mean_iou")]
)

history_phase2 = model.fit(
    original_train_dataset,
    epochs=EPOCHS_PHASE2,
    initial_epoch=EPOCHS_PHASE1,  
    validation_data=val_dataset,
    callbacks=[early_stopping, checkpoint, cosine_warmup_scheduler],
    verbose=0
).history

history = {key: history_phase1[key] + history_phase2[key] for key in history_phase1}

# Final validation metrics
final_val_meanIoU = round(max(history['val_mean_iou']) * 100, 2)
print(f'Final validation Mean Intersection Over Union: {final_val_meanIoU}%')

# Save the trained model
model_filename = f'UNet_{final_val_meanIoU}.keras'
model.save(model_filename)


Phase 1: Training decoder only...

Epoch 1: LearningRateScheduler setting learning rate to 1e-06.

Epoch 1: val_mean_iou improved from -inf to 0.00767, saving model to best_unet.keras

Epoch 2: LearningRateScheduler setting learning rate to 0.00010090000000000001.

Epoch 2: val_mean_iou improved from 0.00767 to 0.19937, saving model to best_unet.keras

Epoch 3: LearningRateScheduler setting learning rate to 0.00020080000000000003.

Epoch 3: val_mean_iou did not improve from 0.19937

Epoch 4: LearningRateScheduler setting learning rate to 0.00030070000000000004.

Epoch 4: val_mean_iou did not improve from 0.19937

Epoch 5: LearningRateScheduler setting learning rate to 0.0004006000000000001.

Epoch 5: val_mean_iou did not improve from 0.19937

Epoch 6: LearningRateScheduler setting learning rate to 0.0005005000000000001.

Epoch 6: val_mean_iou did not improve from 0.19937

Epoch 7: LearningRateScheduler setting learning rate to 0.0006004000000000001.

Epoch 7: val_mean_iou did not impro

In [40]:
import numpy as np
from sklearn.metrics import confusion_matrix

# Function to calculate IoU for each class
def calculate_classwise_iou(y_true, y_pred, num_classes):
    ious = []
    for class_id in range(num_classes):
        # Create a mask for each class
        true_mask = (y_true == class_id)
        pred_mask = (y_pred == class_id)
        
        # Calculate intersection and union
        intersection = np.logical_and(true_mask, pred_mask).sum()
        union = np.logical_or(true_mask, pred_mask).sum()
        
        # Calculate IoU and handle division by zero
        iou = intersection / union if union > 0 else 0.0
        ious.append(iou)
    return ious

# Predict and calculate classwise IoU
def evaluate_classwise_iou(model, val_dataset, num_classes):
    all_true = []
    all_pred = []
    
    for images, labels in val_dataset:
        # Predict the segmentation masks
        predictions = model.predict(images)
        predictions = np.argmax(predictions, axis=-1)  # Get the predicted class indices
        labels = np.squeeze(labels)  # Remove redundant dimensions
        
        # Collect predictions and true labels
        all_true.append(labels)
        all_pred.append(predictions)
    
    # Stack all batches
    all_true = np.concatenate(all_true, axis=0)
    all_pred = np.concatenate(all_pred, axis=0)
    
    # Calculate IoU for each class
    classwise_ious = calculate_classwise_iou(all_true, all_pred, num_classes)
    return classwise_ious

# Example usage after training
num_classes = 5  # Change this to match your number of classes
classwise_ious = evaluate_classwise_iou(model, val_dataset, num_classes)

# Print per-class IoU
for i, iou in enumerate(classwise_ious):
    print(f"Class {i} IoU: {iou:.2%}")

# Calculate mean IoU
mean_iou = np.mean(classwise_ious)
print(f"Mean IoU: {mean_iou:.2%}")


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms

In [60]:
from tensorflow.keras.models import load_model

model.load_weights('/kaggle/working/UNet_67.62.keras')


In [44]:
import tensorflow as tf

def convert_to_rgb(images):
    images_tensor = tf.convert_to_tensor(images, dtype=tf.float32)
    
    images_tensor = tf.expand_dims(images_tensor, axis=-1)
    
    images_rgb = tf.image.grayscale_to_rgb(images_tensor)
    
    return images_rgb.numpy()


In [48]:
X_test = convert_to_rgb(X_test)

X_test = X_test.astype("float32")/255.0
print(f"Fixed X_test shape: {X_test.shape}")
preds = model.predict(X_test)
preds = np.argmax(preds, axis=-1)
print(f"Predictions shape: {preds.shape}")
import pandas as pd
def y_to_df(y) -> pd.DataFrame:
    """Converts segmentation predictions into a DataFrame format for Kaggle."""
    n_samples = len(y)
    y_flat = y.reshape(n_samples, -1)
    df = pd.DataFrame(y_flat)
    df["id"] = np.arange(n_samples)
    cols = ["id"] + [col for col in df.columns if col != "id"]
    return df[cols]

Fixed X_test shape: (10022, 64, 128, 3)
[1m314/314[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 22ms/step
Predictions shape: (10022, 64, 128)


In [49]:
# Create and download the csv submission file
timestep_str = model_filename.replace("model_", "").replace(".keras", "")
submission_filename = f"submission_{timestep_str}.csv"
submission_df = y_to_df(preds)
submission_df.to_csv(submission_filename, index=False)