In [1]:
from google.colab import drive
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import load_img
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import load_model
from tensorflow.keras import layers, losses
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt

In [None]:
# This code was used in Colab
drive.mount('/content/drive')

!cp "/content/drive/My Drive/airbus_train.zip" "airbus_train.zip"
!cp "/content/drive/My Drive/airbus_test.zip" "airbus_test.zip"
!cp "/content/drive/My Drive/train_ship_segmentations_v2.csv" "train_ship_segmentations_v2.csv"

! mkdir airbus
! unzip airbus_train.zip -d airbus

! mkdir airbus_test
! unzip airbus_test.zip -d airbus_test

In [3]:
#Reading a csv file that contains train_ship_segmentations
df = pd.read_csv('train_ship_segmentations_v2.csv')

In [4]:
#Define local variables
TRAIN_PATH = '/content/airbus'
TEST_PATH = '/content/airbus_test'
CHECKPOINT_FILEPATH = '/content/unet.h5'

train_files_name = [f for f in os.listdir(TRAIN_PATH) if f.endswith('.jpg')]
test_files_name = [f for f in os.listdir(TEST_PATH) if f.endswith('.jpg')]

BATCH_SIZE = 128
IMAGE_SIZE = (192, 192)
ORIG_IMAGE_SIZE = (768, 768)
BUFFER_SIZE = 500

NUM_EPOCHS = 100
TRAIN_LENGTH = len(train_files_name)
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE

VAL_SUBSPLITS = 5
VALIDATION_LENGTH = int(len(test_files_name) * 0.7)
VALIDATION_STEPS = VALIDATION_LENGTH // BATCH_SIZE // VAL_SUBSPLITS

TEST_LENGTH = len(test_files_name) - VALIDATION_LENGTH

METRIC = 'val_loss'

In [5]:
def rle_decode(mask_rle, shape):
    """
    Decode a run-length encoded (RLE) mask and return the corresponding binary mask.

    :param mask_rle: Run-length encoded string representing the mask.
    :param shape: Tuple representing the shape of the target binary mask (height, width).

    :return: Binary mask as a NumPy array.
    """
    # Initialize an array of zeros with the shape of the target binary mask
    img = np.zeros(shape[0] * shape[1], dtype=np.uint8)

    # Check if the input is a non-empty string
    if isinstance(mask_rle, str):
        # Split the RLE string into starts and lengths
        s = mask_rle.split()
        starts, lengths = [np.asarray(x, dtype=int) for x in (s[0:][::2], s[1:][::2])]

        # Adjust starts to zero-based indexing
        starts -= 1

        # Calculate ends based on starts and lengths
        ends = starts + lengths

        # Set the corresponding pixels in the array to 1 based on RLE information
        for lo, hi in zip(starts, ends):
            img[lo:hi] = 1

    # Reshape the 1D array to the specified shape and transpose
    img = img.reshape(shape).T

    # Add an extra dimension to represent the channel (usually for grayscale images)
    return np.expand_dims(img, axis=-1)

df['EncodedPixels'] = df['EncodedPixels'].fillna('')

all_masks = df.groupby(by='ImageId')['EncodedPixels'].agg(lambda seq: ' '.join(seq))

In [6]:
def resize(img, mask):
    """
    Resize the image and mask to the specified target size.

    :param img: Input image.
    :param mask: Input mask.
    :return: Resized image and mask.
    """

    img = tf.image.resize(img, IMAGE_SIZE, method="nearest")
    mask = tf.image.resize(mask, IMAGE_SIZE, method="nearest")

    return img, mask

def flip(img, mask):
    """
    Flip the image and mask horizontally or vertically with a certain probability.

    :param img: Input image.
    :param mask: Input mask.
    :return: Flipped image and mask.
    """

    # Flip left-right with a probability greater than 0.5
    if np.random.uniform() > 0.5:
        img = tf.image.flip_left_right(img[np.newaxis])[0]
        mask = tf.image.flip_left_right(mask[np.newaxis])[0]

    # Flip up-down with a probability greater than 0.5
    if np.random.uniform() > 0.5:
        img = tf.image.flip_up_down(img[np.newaxis])[0]
        mask = tf.image.flip_up_down(mask[np.newaxis])[0]

    return np.array(img), np.array(mask)

def normalization(img):
    """
    Normalize the pixel values of the image to the range [0, 1].

    :param img: Input image.
    :return: Normalized image.
    """

    img = img / 255.0
    return img

def load_image_train(image, mask):
    """
    Preprocess and augment training images and masks.

    :param image: Input image.
    :param mask: Input mask.
    :return: Processed and augmented image and mask.
    """

    image = tf.cast(image, tf.float32)
    mask = tf.cast(mask, tf.float32)

    image, mask = resize(image, mask)
    image = normalization(image)
    image, mask = flip(image, mask)

    return image, mask

In [7]:
# Generator function to load images and masks for training or testing
def dataset_generator(image_file_names, path):
    """
    Generates preprocessed image and mask pairs for a given set of image file names.

    :param image_file_names: List of image file names.
    :param path: Path to the directory containing the images.
    :return: Yield preprocessed image and mask pairs.
    """

    for image_name in image_file_names:
        image_path = os.path.join(path, image_name)

        # Load image using target size specified by IMAGE_SIZE
        image = load_img(image_path, target_size=IMAGE_SIZE)

        # Decode mask using the run-length encoding (RLE) and original image size
        mask = rle_decode(all_masks.loc[image_name], ORIG_IMAGE_SIZE)

        # Apply preprocessing and augmentation to image and mask
        image, mask = load_image_train(image, mask)

        # Yield the processed image and mask for each iteration
        yield image, mask

# Generator function for the training dataset
def train_generator():
    """
    Generator function for the training dataset.

    :return: Yield preprocessed image and mask pairs for training.
    """

    return dataset_generator(train_files_name, TRAIN_PATH)

# Generator function for the test dataset
def test_generator():
    """
    Generator function for the test dataset.

    :return: Yield preprocessed image and mask pairs for testing.
    """

    return dataset_generator(test_files_name, TEST_PATH)


In [8]:
def dice_coeff(y_true, y_pred, smooth=1e-6):
    """
    Compute the dice coefficient between the true and predicted binary masks.

    :param y_true: True binary mask.
    :param y_pred: Predicted binary mask.
    :param smooth: Smoothing factor to avoid division by zero.
    :return: Dice coefficient.
    """

    y_true_f = tf.cast(y_true, tf.float32)
    y_pred_f = tf.cast(y_pred, tf.float32)

    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)

    return score

def dice_loss(y_true, y_pred):
    """
    Compute the dice loss between the true and predicted binary masks.

    :param y_true: True binary mask.
    :param y_pred: Predicted binary mask.
    :return: Dice loss.
    """

    return 1 - dice_coeff(y_true, y_pred)

def bce_dice_loss(y_true, y_pred):
    """
    Compute the combination of binary crossentropy (BCE) and Dice loss.

    :param y_true: True binary mask.
    :param y_pred: Predicted binary mask.
    :return: Combined loss.
    """

    # Combine binary crossentropy and dice loss
    loss = losses.binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)

    return loss


In [9]:
def double_conv_block(x, n_filters):
    """
    Double convolution block with specified number of filters.

    :param x: Input tensor.
    :param n_filters: Number of filters.
    :return: Output tensor after two convolution operations.
    """
    x = layers.Conv2D(n_filters, 3, padding="same", activation="relu", kernel_initializer='random_normal')(x)
    x = layers.Conv2D(n_filters, 3, padding="same", activation="relu", kernel_initializer='random_normal')(x)
    return x

def downsample_block(x, n_filters):
    """
    Downsample block with double convolution, max pooling, and dropout.

    :param x: Input tensor.
    :param n_filters: Number of filters.
    :return: Feature tensor after double convolution, and downsampled tensor.
    """
    f = double_conv_block(x, n_filters)
    p = layers.MaxPool2D(2)(f)
    p = layers.Dropout(0.3)(p)
    return f, p

def upsample_block(x, conv_features, n_filters):
    """
    Upsample block with transposed convolution, concatenation, dropout, and double convolution.

    :param x: Input tensor.
    :param conv_features: Features from the corresponding downsample block.
    :param n_filters: Number of filters.
    :return: Output tensor after upsampling.
    """
    x = layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
    x = layers.concatenate([x, conv_features])
    x = layers.Dropout(0.3)(x)
    x = double_conv_block(x, n_filters)
    return x

def build_unet_model():
    """
    Build the U-Net model for semantic segmentation.

    :return: U-Net model.
    """

    inputs = layers.Input(shape=(*IMAGE_SIZE, 3))

    # Downsample blocks
    f1, p1 = downsample_block(inputs, 16)
    f2, p2 = downsample_block(p1, 32)
    f3, p3 = downsample_block(p2, 64)

    # Bottleneck layer
    bottleneck = double_conv_block(p3, 128)

    # Upsample blocks
    u3 = upsample_block(bottleneck, f3, 64)
    u2 = upsample_block(u3, f2, 32)
    u1 = upsample_block(u2, f1, 16)

    # Output layer
    outputs = layers.Conv2D(1, 1, padding="same", activation="sigmoid")(u1)

    # Create and return the U-Net model
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")
    return unet_model

In [10]:
# Create tf datasets using the generator functions
train_dataset = tf.data.Dataset.from_generator(
    train_generator,
    output_signature=(
        tf.TensorSpec(shape=(*IMAGE_SIZE, 3), dtype=tf.float32),  # Assuming RGB images
        tf.TensorSpec(shape=(*IMAGE_SIZE, 1), dtype=tf.float32),  # Assuming single-channel masks
    )
)

test_dataset = tf.data.Dataset.from_generator(
    test_generator,
    output_signature=(
        tf.TensorSpec(shape=(*IMAGE_SIZE, 3), dtype=tf.float32),
        tf.TensorSpec(shape=(*IMAGE_SIZE, 1), dtype=tf.float32),
    )
)

# Prepare batches for training, validation, and testing
train_batches = train_dataset.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_batches = train_batches.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
validation_batches = test_dataset.take(VALIDATION_LENGTH).batch(BATCH_SIZE)
test_batches = test_dataset.skip(VALIDATION_LENGTH).take(TEST_LENGTH).batch(BATCH_SIZE)

In [None]:
# Create an instance of the U-Net model
unet_model = build_unet_model()

# Compile the model with Adam optimizer, BCE + Dice loss, and Dice coefficient as a metric
unet_model.compile(optimizer='adam',
                   loss=bce_dice_loss,
                   metrics=[dice_coeff])

# Define a ModelCheckpoint callback to save the best model during training
checkpoint = ModelCheckpoint(
    filepath=CHECKPOINT_FILEPATH,
    monitor=METRIC,
    mode='min',
    save_best_only=True
)

# Define an EarlyStopping callback to stop training early if the loss doesn't improve
earlystop = EarlyStopping(monitor='loss', patience=5)

# List of callbacks to be used during training
callbacks_list = [checkpoint, earlystop]

# Train the model using the training and validation datasets
model_history = unet_model.fit(train_batches,
                               validation_data=validation_batches,
                               epochs=NUM_EPOCHS,
                               steps_per_epoch=STEPS_PER_EPOCH,
                               validation_steps=VALIDATION_STEPS,
                               callbacks=callbacks_list)

print(unet_model.evaluate(test_batches))


In [None]:
#Optional
# Plot the training and validation dice coefficients
plt.plot(model_history.history['dice_coeff'], label='Train')
plt.plot(model_history.history['val_dice_coeff'], label='Validation')
plt.title('Model Dice Coefficient')
plt.ylabel('Dice Coefficient')
plt.xlabel('Epoch')
plt.legend(loc='upper left')
plt.show()

# Plot the training and validation losses
plt.plot(model_history.history['loss'], label='Train')
plt.plot(model_history.history['val_loss'], label='Validation')
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper left')
plt.show()