In [None]:
!pip install tf-explain

In [None]:
# Common
import os 
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from IPython.display import clear_output as cls

# Data 
from tqdm import tqdm
import tensorflow.data as tfd

# Data Visualization
import matplotlib.pyplot as plt

# Model Building
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras.optimizers.schedules import ExponentialDecay

# Model visualization
from tensorflow.keras.utils import plot_model
from tf_explain.core.grad_cam import GradCAM

# Extra
from typing import List, Tuple, Union

In [None]:
# Image and Mask Dimensions
IMAGE_HEIGHT = 256
IMAGE_WIDTH = 256
N_IMAGE_CHANNELS = 3
N_MASK_CHANNELS = 1

# Image and Mask Size
IMAGE_SIZE = (IMAGE_WIDTH, IMAGE_HEIGHT, N_IMAGE_CHANNELS)
MASK_SIZE = (IMAGE_WIDTH, IMAGE_HEIGHT, N_MASK_CHANNELS)

# Zoom factor: a value less than 1.0 zooms out, greater than 1.0 zooms in
ZOOM_FACTOR = 4.5
    
# Batch Size and Learning Rate
BATCH_SIZE = 32
BASE_LR = 1e-3

# Model Name
MODEL_NAME = 'SIH_T2'

# Model Training
EPOCHS = 100

# Data Paths
ROOT_DIR = 'ADD YOUR PATG'
METADATA_CSV_PATH = 'ADD YOUR META DATA PATAH'

# Model Architecture
FILTERS = 32

In [None]:
# Random Seed
SEED = 42

np.random.seed(SEED)
tf.random.set_seed(SEED)

In [None]:
def load_image_and_mask(image_path: str, mask_path: str) -> Tuple[tf.Tensor, tf.Tensor]:
    
    '''
    This function takes the file paths of an image and its corresponding mask as input. It first reads the images, then decodes them into tensors, 
    and resizes them to a standard size. After that, the image and mask tensors are normalized by clipping the pixel values between 0 and 1. 
    Finally, the function converts the image and mask tensors to the float32 data type and returns them as a tuple.
    
    Arguments : 
        image_path : The path to the image to be loaded. 
        mask_path  : The path to the mask to be loaded.
    
    Returns :
        image : This is the loaded and the processed image. 
        mask  : This is the loaded and the processed mask.
    
    '''
    
    # Read the images
    image = tf.io.read_file(filename = image_path)
    mask  = tf.io.read_file(filename = mask_path)
    
    # Decode the images
    image = tf.image.decode_jpeg(contents = image, channels = N_IMAGE_CHANNELS)
    mask  = tf.image.decode_jpeg(contents = mask,  channels = N_MASK_CHANNELS)
    
    # Convert the image to a Tensor
    image = tf.image.convert_image_dtype(image = image, dtype = tf.float32)
    mask  = tf.image.convert_image_dtype(image = mask, dtype = tf.float32)
    
    # Crop the image and mask to zoom in
    crop_height = int(IMAGE_HEIGHT / ZOOM_FACTOR)
    crop_width = int(IMAGE_WIDTH / ZOOM_FACTOR)
    
    image = tf.image.central_crop(image, central_fraction=1/ZOOM_FACTOR)
    mask = tf.image.central_crop(mask, central_fraction=1/ZOOM_FACTOR)
    
    # Resize the cropped images back to the original dimensions
    image = tf.image.resize(image, size=(IMAGE_HEIGHT, IMAGE_WIDTH))
    mask = tf.image.resize(mask, size=(IMAGE_HEIGHT, IMAGE_WIDTH))

    # Normalize the image
    image = tf.clip_by_value(image, clip_value_min = 0.0, clip_value_max = 1.0)
    mask  = tf.clip_by_value(mask, clip_value_min = 0.0, clip_value_max = 1.0)
    
    # Final conversion
    image = tf.cast(image, dtype = tf.float32)
    mask  = tf.cast(mask,  dtype = tf.float32)
    
    return image, mask

In [None]:
# Load CSV File
metadata = pd.read_csv(METADATA_CSV_PATH)

# Quick look
metadata.head()

In [None]:
# Seperate Training and Testing metadata
test_metadata = metadata[metadata['split']=="test"]
metadata = metadata[metadata['split']=="train"]

In [None]:
# Add root path to image file names
metadata['image'] = [os.path.join(ROOT_DIR,str(filename)) for filename in metadata['sat_image_path']]

# Add mask path to image file names
metadata['mask']  = [os.path.join(ROOT_DIR,str(filename)) for filename in metadata['mask_path']]

In [None]:
image_id = 478
sample_image, sample_mask = load_image_and_mask(metadata['image'][image_id], metadata['mask'][image_id])

plt.figure(figsize=(10, 8))
plt.subplot(1,2,1)
plt.imshow(sample_image)
plt.axis('off')

plt.subplot(1,2,2)
plt.imshow(sample_mask, cmap='gray')
plt.axis('off')
plt.show()

In [None]:
# Quick Check
metadata.head()

In [None]:
def load_dataset(
    image_paths: list, mask_paths: list, split_ratio: float=0.2, 
    batch_size: int=BATCH_SIZE, shuffle: bool=True, 
    buffer_size: int=1000, n_repeat: int=1
) -> Union[Tuple[tfd.Dataset, tfd.Dataset], tfd.Dataset]:
    '''
    This function loads the image and mask data from the provided file paths and creates a TensorFlow dataset. The function
    first creates space to store the image and mask data in numpy arrays. It then iterates over each image and mask pair, 
    loading them using the load_image_and_mask function and storing them in the numpy arrays.
    
    The function then creates a TensorFlow dataset using the numpy arrays. If shuffle is True, it shuffles the dataset
    with a buffer size of buffer_size. If split_ratio is not None, it splits the dataset into two parts with sizes determined
    by the split_ratio, and converts them into batches of size batch_size with drop_remainder=True. The two resulting datasets
    are returned as a tuple.

    If split_ratio is None, the entire dataset is converted into batches of size batch_size with drop_remainder=True, 
    and the resulting dataset is returned.
    
    Args:
        image_paths: A list of strings, containing the file paths of the input images.
        
        mask_paths: A list of strings, containing the file paths of the corresponding mask images.
        
        split_ratio: A float value between 0 and 1, representing the ratio of data to be used for validation. 
                    If split_ratio is set to None, then no data will be split for validation.
                    
        batch_size: An integer, representing the batch size for the input data.
        
        shuffle: A boolean value indicating whether the data should be shuffled or not.
        
        buffer_size: An integer, representing the buffer size for shuffling the data.
        
        n_repeat: An integer, representing the total number of repetations of the data.
    
    Returns:
        If split_ratio is not None, then the function returns a tuple of two Tensorflow datasets. 
        The first dataset contains the training data and the second dataset contains the validation data.
        
        If split_ratio is None, then the function returns a single Tensorflow dataset containing the 
        input data batched and pre-fetched for training.
    
    '''
    
    # Create space for storing the data.
    images = np.empty(shape=(len(image_paths), *IMAGE_SIZE), dtype=np.float32)
    masks  = np.empty(shape=(len(mask_paths), *MASK_SIZE),  dtype=np.float32)
    
    # Iterate over the data.
    index = 0
    for image_path, mask_path in tqdm(zip(image_paths, mask_paths), desc='Loading'):
        
        # Load the image and the mask.
        image, mask = load_image_and_mask(image_path = image_path, mask_path = mask_path)
        
        # Store the image and the mask.
        images[index] = image
        masks[index]  = mask
        
        # Increment the index.
        index += 1
    
    # Create a Tensorflow data.
    data_set = tfd.Dataset.from_tensor_slices((images, masks)).repeat(n_repeat)
    
    # Shuffle the data set.
    if shuffle:
        data_set = data_set.shuffle(buffer_size)
    
    # Split the data 
    if split_ratio is not None:
        
        # Calculate new data sizes after splitting.
        keep_ratio = 1-split_ratio
        data_1_len = int((keep_ratio) * len(images))
        data_2_len = int(split_ratio * len(images))
        
        # Divide the data into 2 parts.
        data_1 = data_set.take(data_1_len)
        data_2 = data_set.skip(data_1_len).take(data_2_len)
        
        # Convert data into batches.
        data_1 = data_1.batch(batch_size, drop_remainder=True).prefetch(tfd.AUTOTUNE)
        data_2 = data_2.batch(batch_size, drop_remainder=True).prefetch(tfd.AUTOTUNE)
        
        # Return the data 
        return data_1, data_2
    
    else:
        
        # Convert data into batches
        data_set = data_set.batch(batch_size, drop_remainder=True).prefetch(tfd.AUTOTUNE)
        
        # Return the data
        return data_set

In [None]:
# Training and Testing Data
full_train_ds, test_ds = load_dataset(
    image_paths = metadata['image'],
    mask_paths = metadata['mask'],
    split_ratio = 0.1,
    shuffle = True,
    n_repeat=3,
)

In [None]:
print("*"*100)
print(f"{' '*30}Training Data Size : {full_train_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Testing Data Size  : {test_ds.cardinality().numpy() * BATCH_SIZE}")
print("*"*100)

In [None]:
# Training Data size
full_train_size = full_train_ds.cardinality().numpy()

# Split Ratio
train_val_split = 0.1
valid_size = int(full_train_size * train_val_split)
train_size = full_train_size - valid_size

# Split Data 
train_ds = full_train_ds.take(train_size)
valid_ds = full_train_ds.skip(train_size).take(valid_size)

In [None]:
print("*"*100)
print(f"{' '*30}Training Data Size   : {train_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Validation Data Size : {valid_ds.cardinality().numpy() * BATCH_SIZE}")
print(f"{' '*30}Testing Data Size    : {test_ds.cardinality().numpy() * BATCH_SIZE}")
print("*"*100)

# **Data Visualization**

In [None]:
def show_images_and_masks(data : tfd.Dataset, n_images: int=10, FIGSIZE: tuple=(25, 5), model: tf.keras.Model=None):
    
    # Configuration
    if model is None:
        n_cols = 3
    else:
        n_cols = 5
    
    # Collect the data
    images, masks = next(iter(data))
    
    # Iterate over the data
    for n in range(n_images):
        
        # Plotting configuration
        plt.figure(figsize=FIGSIZE)
        
        # Plot the image
        plt.subplot(1, n_cols, 1)
        plt.title("Original Image")
        plt.imshow(images[n])
        plt.axis('off')
        
        # Plot the Mask
        plt.subplot(1, n_cols, 2)
        plt.title("Original Mask")
        plt.imshow(masks[n], cmap='gray')
        plt.axis('off')
        
        # Plot image and mask overlay
        plt.subplot(1, n_cols, 3)
        plt.title('Image and Mask overlay')
        plt.imshow(masks[n], alpha=0.8, cmap='binary_r')
        plt.imshow(images[n], alpha=0.5)
        plt.axis('off')
        
        # Model predictions
        if model is not None:
            pred_mask = model.predict(tf.expand_dims(images[n], axis=0))[0]
            pred_mask = pred_mask>=0.5
            plt.subplot(1, n_cols, 4)
            plt.title('Predicted Mask')
            plt.imshow(pred_mask, cmap='gray')
            plt.axis('off')
            
            plt.subplot(1, n_cols, 5)
            plt.title('Predicted Mask Overlay')
            plt.imshow(pred_mask, alpha=0.8, cmap='binary_r')
            plt.imshow(images[n], alpha=0.5)
            plt.axis('off')
    
        # Show final plot
        plt.show()

show_images_and_masks(data=train_ds)

In [None]:
class EncoderBlock(layers.Layer):
    
    def __init__(self, filters: int, max_pool: bool=True, rate=0.2, **kwargs) -> None:
        super().__init__(**kwargs)
        
        # Params
        self.rate = rate
        self.filters = filters
        self.max_pool = max_pool
        
        # Layers : Initialize the model layers that will be later called
        self.max_pooling = layers.MaxPool2D(pool_size=(2,2), strides=(2,2))
        self.conv1 = layers.Conv2D(
            filters=filters,
            kernel_size=3,
            strides=1,
            padding='same',
            activation='relu',
            kernel_initializer='he_normal'
        )
        self.conv2 = layers.Conv2D(
            filters=filters,
            kernel_size=3,
            strides=1,
            padding='same',
            activation='relu',
            kernel_initializer='he_normal'
        )
        self.drop = layers.Dropout(rate)
        self.bn = layers.BatchNormalization()
        
    def call(self, X, **kwargs):
        
        X = self.bn(X)
        X = self.conv1(X)
        X = self.drop(X)
        X = self.conv2(X)
        
        # Apply Max Pooling if required
        if self.max_pool:
            y = self.max_pooling(X)
            return y, X
        else:
            return X
    
    def get_config(self):
        config = super().get_config()
        config.update({
            'filters': self.filters,
            'max_pool': self.max_pool,
            'rate': self.rate
        })
        return config

    def __repr__(self):
        return f"{self.__class__.__name__}(F={self.filters}, Pooling={self.max_pool})"

In [None]:
class DecoderBlock(layers.Layer):
    
    def __init__(self, filters: int, rate: float = 0.2, **kwargs):
        super().__init__(**kwargs)
        
        self.filters = filters
        self.rate = rate
        
        # Initialize the model layers
        self.convT = layers.Conv2DTranspose(
            filters = filters,
            kernel_size = 3,
            strides = 2,
            padding = 'same',
            activation = 'relu',
            kernel_initializer = 'he_normal'
        )
        self.bn = layers.BatchNormalization()
        self.net = EncoderBlock(filters = filters, rate = rate, max_pool = False)
        
    def call(self, inputs, **kwargs):
        
        # Get both the inputs
        X, skip_X = inputs
        
        # Up-sample the skip connection
        X = self.bn(X)
        X = self.convT(X)
        
        # Concatenate both inputs
        X = layers.Concatenate(axis=-1)([X, skip_X])
        X = self.net(X)
        
        return X

    def get_config(self):
        config = super().get_config()
        config.update({
            'filters': self.filters,
            'rate': self.rate,
        })
        return config

    def __repr__(self):
        return f"{self.__class__.__name__}(F={self.filters}, rate={self.rate})"


In [None]:
# Input Layer
input_layer = layers.Input(shape=(IMAGE_SIZE), name="InputLayer")

# The encoder network
pool1, encoder1 = EncoderBlock(FILTERS,   max_pool=True, rate=0.1, name="EncoderLayer1")(input_layer)
pool2, encoder2 = EncoderBlock(FILTERS*2, max_pool=True, rate=0.1, name="EncoderLayer2")(pool1)
pool3, encoder3 = EncoderBlock(FILTERS*4, max_pool=True, rate=0.2, name="EncoderLayer3")(pool2)
pool4, encoder4 = EncoderBlock(FILTERS*8, max_pool=True, rate=0.2, name="EncoderLayer4")(pool3)

# The encoder encoding
encoding = EncoderBlock(FILTERS*16, max_pool=False, rate=0.3, name="EncodingSpace")(pool4)

# The decoder network
decoder4 = DecoderBlock(FILTERS*8, rate=0.2, name="DecoderLayer1")([encoding, encoder4])
decoder3 = DecoderBlock(FILTERS*4, rate=0.2, name="DecoderLayer2")([decoder4, encoder3])
decoder2 = DecoderBlock(FILTERS*2, rate=0.1, name="DecoderLayer3")([decoder3, encoder2])
decoder1 = DecoderBlock(FILTERS,  rate=0.1, name="DecoderLayer4")([decoder2, encoder1])
        
# Final output layer.
final_conv = layers.Conv2D(
    filters = 1, 
    kernel_size = 1, 
    strides=1, 
    padding='same', 
    activation='sigmoid', 
    name="OutputMap"
)(decoder1)

# Unet Model
unet_model = keras.Model(
    inputs = input_layer,
    outputs = final_conv,
    name = "UNetModel"
)

In [None]:
# Model Summary
unet_model.summary()

In [None]:
tf.keras.utils.plot_model(model = unet_model, to_file = "UnetModel-SIH-ZoomedData.png", dpi = 96, show_shapes=True)

## **UNet - Model Training**
---


In [None]:
# Exponential learning rate decay
# initial_learning_rate = BASE_LR
# decay_steps = 500
# decay_rate = 0.96

# lr_schedule = ExponentialDecay(
#     initial_learning_rate,
#     decay_steps,
#     decay_rate,
#     staircase=True
# )

optimizer = optimizers.Adam(learning_rate=BASE_LR)

# Compile Model
unet_model.compile(
    loss = 'binary_crossentropy',
    optimizer = optimizer,
)

In [None]:
# Model Training
unet_model_history = unet_model.fit(
    train_ds,
    epochs = EPOCHS,
    batch_size = BATCH_SIZE,
    validation_data = valid_ds,
)

In [None]:
show_images_and_masks(data=train_ds, model=unet_model)

In [None]:
show_images_and_masks(data=test_ds, model=unet_model)

In [None]:
show_images_and_masks(data=test_ds, model=unet_model)

In [None]:
# Save in Keras Format
unet_model.save(MODEL_NAME + "-SIH-ZoomedData.keras")