In [None]:
import os
import sys
import random
import warnings
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import cv2
from tqdm import tqdm_notebook, tnrange
from itertools import chain
from skimage.io import imread, imshow, concatenate_images
from skimage.transform import resize
from skimage.morphology import label
from keras.models import Model, load_model
from keras.layers import Input, Lambda, Conv2D, Conv2DTranspose, MaxPooling2D, concatenate 
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

In [None]:
im_width = 512
im_height = 512
im_chan = 1

path_train = '/kaggle/input/small-dataset/Train/'
path_test = '/kaggle/input/small-dataset/Test/'

train_ids = next(os.walk(path_train+"images"))[2]
test_ids = next(os.walk(path_test+"images"))[2]

In [None]:
X_train = np.zeros((len(train_ids), im_height, im_width, im_chan), dtype=np.uint8)
Y_train = np.zeros((len(train_ids), im_height, im_width, 1), dtype=bool)

print('Getting and resizing train images and masks ... ')

sys.stdout.flush()
for n, id_ in tqdm_notebook(enumerate(train_ids), total=len(train_ids)):
    path = path_train
    img = load_img(path + '/images/' + id_)
    
    x = img_to_array(img)[:,:,1]
    x = np.expand_dims(x, axis=-1)
    X_train[n] = x
    
    
    mask = img_to_array(load_img(path + '/masks/' + id_))[:,:,1]
    mask = np.expand_dims(mask, axis=-1)
    Y_train[n] = mask
    

print('Done!')

In [None]:
ix = random.randint(0, len(train_ids))
plt.figure(figsize=(10, 5))

plt.subplot(1, 3, 1)
plt.imshow(np.dstack((X_train[ix],X_train[ix],X_train[ix])))
plt.title("Image")

plt.subplot(1, 3, 2)
tmp = np.squeeze(Y_train[ix]).astype(np.float32)
plt.imshow(np.dstack((tmp,tmp,tmp)))
plt.title("Ground truth")

plt.show()

In [None]:
from tensorflow.keras import metrics
from tensorflow.keras.utils import register_keras_serializable

@register_keras_serializable()
class MeanIoUMetric(metrics.Metric):
    
    def __init__(self, num_classes, name='mean_iou', **kwargs):
        super(MeanIoUMetric, self).__init__(name=name, **kwargs)
        self.num_classes = num_classes  
        self.iou_metric = metrics.MeanIoU(num_classes=num_classes) 

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_pred_ = tf.cast(y_pred > 0.5, tf.int32)
        self.iou_metric.update_state(y_true, y_pred_)

    def result(self):
        return self.iou_metric.result()

    def reset_state(self):
        self.iou_metric.reset_state()
        
    @classmethod
    def from_config(cls, config):
        return cls(**config)  

    def get_config(self):
        config = super().get_config()
        config.update({"num_classes": self.num_classes})  
        return config
    
    
mean_iou_metric = MeanIoUMetric(num_classes=2)

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2DTranspose
from tensorflow.keras.layers import InputSpec 
import tensorflow.keras.backend as K 


@register_keras_serializable()
class Conv2DTranspose(Conv2D):
    
    def __init__(
        self,
        filters,
        kernel_size,
        strides=(1, 1),
        padding='valid',
        output_shape=None,
        data_format=None,
        activation=None,
        use_bias=True,
        kernel_initializer='glorot_uniform',
        bias_initializer='zeros',
        kernel_regularizer=None,
        bias_regularizer=None,
        activity_regularizer=None,
        kernel_constraint=None,
        bias_constraint=None,
        **kwargs
    ):
        super(Conv2DTranspose, self).__init__(
            filters,
            kernel_size,
            strides=strides,
            padding=padding,
            data_format=data_format,
            activation=activation,
            use_bias=use_bias,
            kernel_initializer=kernel_initializer,
            bias_initializer=bias_initializer,
            kernel_regularizer=kernel_regularizer,
            bias_regularizer=bias_regularizer,
            activity_regularizer=activity_regularizer,
            kernel_constraint=kernel_constraint,
            bias_constraint=bias_constraint,
            **kwargs
        )
        self.input_spec = InputSpec(ndim=4)
        if output_shape is not None:
            try:
                self._output_shape = tuple(output_shape)
            except TypeError:
                raise ValueError('`output_shape` argument must be a ' +
                                 'tuple. Received: ' + str(output_shape))
            if len(self._output_shape) != 3:
                raise ValueError('`output_shape` argument should have ' +
                                 'rank ' + str(3) + '; Received:', str(output_shape))
        else:
            self._output_shape = output_shape

    def build(self, input_shape):
        if len(input_shape) != 4:
            raise ValueError(
                'Inputs should have rank ' + str(4) +
                '; Received input shape:', str(input_shape)
            )
        if self.data_format == 'channels_first':
            channel_axis = 1
        else:
            channel_axis = -1
        if input_shape[channel_axis] is None:
            raise ValueError(
                'The channel dimension of the inputs '
                'should be defined. Found `None`.'
            )
        input_dim = input_shape[channel_axis]
        kernel_shape = self.kernel_size + (self.filters, input_dim)

        self.kernel = self.add_weight(
            shape=kernel_shape,
            initializer=self.kernel_initializer,
            name='kernel',
            regularizer=self.kernel_regularizer,
            constraint=self.kernel_constraint
        )
        if self.use_bias:
            self.bias = self.add_weight(
                shape=(self.filters, ),
                initializer=self.bias_initializer,
                name='bias',
                regularizer=self.bias_regularizer,
                constraint=self.bias_constraint
            )
        else:
            self.bias = None
       
        self.input_spec = InputSpec(ndim=4, axes={channel_axis: input_dim})
        self.built = True

    def call(self, inputs):
        input_shape = K.shape(inputs)
        batch_size = input_shape[0]
        if self.data_format == 'channels_first':
            h_axis, w_axis = 2, 3
        else:
            h_axis, w_axis = 1, 2

        height, width = input_shape[h_axis], input_shape[w_axis]
        kernel_h, kernel_w = self.kernel_size
        stride_h, stride_w = self.strides

      
        if self._output_shape is None:
            out_height = deconv_length(height, stride_h, kernel_h, self.padding)
            out_width = deconv_length(width, stride_w, kernel_w, self.padding)
            if self.data_format == 'channels_first':
                output_shape = (
                    batch_size, self.filters, out_height, out_width
                )
            else:
                output_shape = (
                    batch_size, out_height, out_width, self.filters
                )
        else:
            output_shape = (batch_size,) + self._output_shape

        outputs = K.conv2d_transpose(
            inputs,
            self.kernel,
            output_shape,
            self.strides,
            padding=self.padding,
            data_format=self.data_format
        )

        if self.bias:
            outputs = K.bias_add(
                outputs, self.bias, data_format=self.data_format
            )

        if self.activation is not None:
            return self.activation(outputs)
        return outputs

    def compute_output_shape(self, input_shape):
        output_shape = list(input_shape)
        if self.data_format == 'channels_first':
            c_axis, h_axis, w_axis = 1, 2, 3
        else:
            c_axis, h_axis, w_axis = 3, 1, 2

        kernel_h, kernel_w = self.kernel_size
        stride_h, stride_w = self.strides

        if self._output_shape is None:
            output_shape[c_axis] = self.filters
            output_shape[h_axis] = deconv_length(
                output_shape[h_axis], stride_h, kernel_h, self.padding
            )
            output_shape[w_axis] = deconv_length(
                output_shape[w_axis], stride_w, kernel_w, self.padding
            )
        else:
            output_shape[1:] = self._output_shape

        return tuple(output_shape)

    def get_config(self):
        config = super(Conv2DTranspose, self).get_config()
        config.pop('dilation_rate')
        config['output_shape'] = self._output_shape
        return config

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, BatchNormalization, Activation, Add, Input, Softmax
from tensorflow.keras.models import Model
from tensorflow.keras.backend import int_shape, is_keras_tensor

In [None]:
class LinkNet():

    def __init__(
        self,
        num_classes,
        input_tensor=None,
        input_shape=None,
        initial_block_filters=64,
        bias=False,
        name='linknet',
        
    ):
        self.num_classes = num_classes
        self.initial_block_filters = initial_block_filters
        self.bias = bias
        self.output_shape = input_shape[:-1] + (num_classes, )

         
        if input_tensor is None:
            self.input = Input(shape=input_shape, name='input_img')
        elif is_keras_tensor(input_tensor):
            self.input = input_tensor
        else:
           
            self.input = Input(
                tensor=input_tensor, shape=input_shape, name='input_img'
            )

        self.name = name

    def get_model(
        self,
        pretrained_encoder=True,
        weights_path='./checkpoints/linknet_encoder_weights.h5'
    ):

        
        encoder_model = self.get_encoder()
        if pretrained_encoder:
            encoder_model.load_weights(weights_path)
        encoder_out = encoder_model(self.input)

       
        decoder_model = self.get_decoder(encoder_out)
        decoder_out = decoder_model(encoder_out[:-1])

        return Model(inputs=self.input, outputs=decoder_out, name=self.name)

    def get_encoder(self, name='encoder'):

       
        initial1 = Conv2D(
            self.initial_block_filters,
            kernel_size=7,
            strides=2,
            padding='same',
            use_bias=self.bias,
            name=name + '_0_conv2d_1'
        )(self.input)
        initial1 = BatchNormalization(name=name + '_0_bn_1')(initial1)
        initial1 = Activation('relu', name=name + '_0_relu_1')(initial1)
        initial2 = MaxPooling2D(pool_size=2, name=name + '_0_maxpool_1')(initial1)  

       
        encoder1 = self.encoder_block(
            initial2,
            self.initial_block_filters,
            strides=1,
            bias=self.bias,
            name=name + '_1'
        )
        encoder2 = self.encoder_block(
            encoder1,
            self.initial_block_filters * 2,
            strides=(2, 1),
            bias=self.bias,
            name=name + '_2'
        )
        encoder3 = self.encoder_block(
            encoder2,
            self.initial_block_filters * 4,
            strides=(2, 1),
            bias=self.bias,
            name=name + '_3'
        )
        encoder4 = self.encoder_block(
            encoder3,
            self.initial_block_filters * 8,
            strides=(2, 1),
            bias=self.bias,
            name=name + '_4'
        )

        return Model(
            inputs=self.input,
            outputs=[
                encoder4, encoder3, encoder2, encoder1, initial2, initial1
            ],
            name=name
        )

    def encoder_block(
        self,
        input,
        out_filters,
        kernel_size=3,
        strides=1,
        padding='same',
        bias=False,
        name=''
    ):

        assert isinstance(strides, (int, tuple, list)), (
            "expected int, tuple, or list for strides"
        )  
        if (isinstance(strides, (tuple, list))):
            if len(strides) == 2:
                stride_1, stride_2 = strides
            else:
                raise ValueError("expected a list or tuple on length 2")
        else:
            stride_1 = strides
            stride_2 = strides

        x = self.encoder_basic_block(
            input,
            out_filters,
            kernel_size=kernel_size,
            strides=stride_1,
            padding=padding,
            bias=bias,
            name=name + '_1'
        )

        x = self.encoder_basic_block(
            x,
            out_filters,
            kernel_size=kernel_size,
            strides=stride_2,
            padding=padding,
            bias=bias,
            name=name + '_2'
        )

        return x

    def encoder_basic_block(
        self,
        input,
        out_filters,
        kernel_size=3,
        strides=1,
        padding='same',
        bias=False,
        name=''
    ):

        residual = input

        x = Conv2D(
            out_filters,
            kernel_size=kernel_size,
            strides=strides,
            padding=padding,
            use_bias=bias,
            name=name + '_main_conv2d_1'
        )(input)
        x = BatchNormalization(name=name + '_main_bn_1')(x)
        x = Activation('relu', name=name + '_main_relu_1')(x)

        x = Conv2D(
            out_filters,
            kernel_size=kernel_size,
            strides=1,
            padding=padding,
            use_bias=bias,
            name=name + '_main_conv2d_2'
        )(x)
        x = BatchNormalization(name=name + '_main_bn_2')(x)

        if strides > 1:
            residual = Conv2D(
                out_filters,
                kernel_size=1,
                strides=strides,
                padding=padding,
                use_bias=bias,
                name=name + '_res_conv2d_1'
            )(residual)
            residual = BatchNormalization(name=name + '_res_bn_1')(residual)

        x = Add(name=name + '_add')([x, residual])
        x = Activation('relu', name=name + '_relu_1')(x)

        return x

    def get_decoder(self, inputs, name='decoder'):

       
        encoder4 = Input(shape=int_shape(inputs[0])[1:], name='encoder4')
        encoder3 = Input(shape=int_shape(inputs[1])[1:], name='encoder3')
        encoder2 = Input(shape=int_shape(inputs[2])[1:], name='encoder2')
        encoder1 = Input(shape=int_shape(inputs[3])[1:], name='encoder1')
        initial2 = Input(shape=int_shape(inputs[4])[1:], name='initial2')
        initial1 = inputs[5]

        
        decoder4 = self.decoder_block(
            encoder4,
            self.initial_block_filters * 4,
            strides=2,
            output_shape=int_shape(encoder3)[1:],
            bias=self.bias,
            name=name + '_4'
        )
        decoder4 = Add(name=name + '_shortcut_e3_d4')([encoder3, decoder4])

        decoder3 = self.decoder_block(
            decoder4,
            self.initial_block_filters * 2,
            strides=2,
            output_shape=int_shape(encoder2)[1:],
            bias=self.bias,
            name=name + '_3'
        )
        decoder3 = Add(name=name + '_shortcut_e2_d3')([encoder2, decoder3])

        decoder2 = self.decoder_block(
            decoder3,
            self.initial_block_filters,
            strides=2,
            output_shape=int_shape(encoder1)[1:],
            bias=self.bias,
            name=name + '_2'
        )
        decoder2 = Add(name=name + '_shortcut_e1_d2')([encoder1, decoder2])

        decoder1 = self.decoder_block(
            decoder2,
            self.initial_block_filters,
            strides=1,
            output_shape=int_shape(initial2)[1:],
            bias=self.bias,
            name=name + '_1'
        )
        decoder1 = Add(name=name + '_shortcut_init_d1')([initial2, decoder1])

        shape = (
            int_shape(initial1)[1],
            int_shape(initial1)[2],
            self.initial_block_filters // 2,
        )
        final = Conv2DTranspose(
            self.initial_block_filters // 2,
            kernel_size=3,
            strides=2,
            padding='same',
            output_shape=shape,
            use_bias=self.bias,
            name=name + '_0_transposed2d_1'
        )(decoder1)
        final = BatchNormalization(name=name + '_0_bn_1')(final)
        final = Activation('relu', name=name + '_0_relu_1')(final)

        final = Conv2D(
            self.initial_block_filters // 2,
            kernel_size=3,
            padding='same',
            use_bias=self.bias,
            name=name + '_0_conv2d_1'
        )(final)
        final = BatchNormalization(name=name + '_0_bn_2')(final)
        final = Activation('relu', name=name + '_0_relu_2')(final)

        logits = Conv2DTranspose(
            self.num_classes,
            kernel_size=2,
            strides=2,
            padding='same',
            output_shape=self.output_shape,
            use_bias=self.bias,
            name=name + '_0_transposed2d_2'
        )(final)

        
        prediction = tf.keras.layers.Activation('sigmoid', name=name + '_0_softmax')(logits) 

        return Model(
            inputs=[
                encoder4, encoder3, encoder2, encoder1, initial2
            ],
            outputs=prediction,
            name=name
        )

    def decoder_block(
        self,
        input,
        out_filters,
        kernel_size=3,
        strides=2,
        projection_ratio=4,
        padding='same',
        output_shape=None,
        bias=False,
        name=''
    ):
        
        internal_filters = int_shape(input)[-1] // projection_ratio
        x = Conv2D(
            internal_filters,
            kernel_size=1,
            strides=1,
            padding=padding,
            use_bias=bias,
            name=name + '_conv2d_1'
        )(input)
        x = BatchNormalization(name=name + '_bn_1')(x)
        x = Activation('relu', name=name + '_relu_1')(x)


        shape = output_shape[:-1] + (internal_filters, )
        x = Conv2DTranspose(
            internal_filters,
            kernel_size=kernel_size,
            strides=strides,
            padding=padding,
            output_shape=shape,
            use_bias=bias,
            name=name + '_transposed2d_1'
        )(x)
        x = BatchNormalization(name=name + '_bn_2')(x)
        x = Activation('relu', name=name + '_relu_2')(x)

        x = Conv2D(
            out_filters,
            kernel_size=1,
            strides=1,
            padding=padding,
            use_bias=bias,
            name=name + '_conv2d_2'
        )(x)
        x = BatchNormalization(name=name + '_bn_3')(x)
        x = Activation('relu', name=name + '_relu_3')(x)

        return x

In [None]:
input_shape = (im_height, im_width, 1)  
model = LinkNet(num_classes=1, input_shape=input_shape)
model = model.get_model(
            pretrained_encoder=False
        )
model.compile(optimizer='adam', 
                  loss='binary_crossentropy',  
                  metrics=[mean_iou_metric])

model.summary()

In [None]:
earlystopper = EarlyStopping(patience=10, verbose=2)
checkpointer = ModelCheckpoint('LinkNet.keras', verbose=1, save_best_only=True)
results = model.fit(X_train, Y_train, validation_split=0.2, batch_size=16, epochs=150, 
                    callbacks=[earlystopper, checkpointer])

In [None]:
import matplotlib.pyplot as plt


fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 6)) 

# Loss Plot
ax1.plot(results.history['loss'], label='Training Loss', color='blue', linewidth=2)
ax1.set_xlabel('Epochs', fontsize=14)
ax1.set_ylabel('Loss', fontsize=14)
ax1.set_title('Training and Validation Loss', fontsize=16)
ax1.set_ylim(0, 1)
ax1.grid(True)
ax1.legend(fontsize=12, loc='upper right')

# Mean IoU Plot
ax2.plot(results.history['mean_iou'], label='Training Mean IoU', color='green', linewidth=2)
ax2.plot(results.history['val_mean_iou'], label='Validation Mean IoU', color='purple', linewidth=2, linestyle='--')
ax2.set_xlabel('Epochs', fontsize=14)
ax2.set_ylabel('Mean IoU', fontsize=14)
ax2.set_title('Training and Validation Mean IoU', fontsize=16)
ax2.set_ylim(0, 1)
ax2.grid(True)
ax2.legend(fontsize=12, loc='lower right')

plt.tight_layout() 
plt.savefig('Linknet.png', dpi=600, bbox_inches='tight')
plt.show()

In [None]:
X_test = np.zeros((len(test_ids), im_height, im_width, im_chan), dtype=np.uint8)
Y_test = np.zeros((len(test_ids), im_height, im_width, 1), dtype=bool) 



print('Getting and resizing test images and masks ... ')
sys.stdout.flush()

for n, id_ in tqdm_notebook(enumerate(test_ids), total=len(test_ids)):
    path = path_test
    img = load_img(path + '/images/' + id_)
    x = img_to_array(img)[:,:,1]
    x = np.expand_dims(x, axis=-1)
    X_test[n] = x

    mask = img_to_array(load_img(path + '/masks/' + id_))[:,:,1]
    mask = np.expand_dims(mask, axis=-1)
    Y_test[n] = mask

print('Done!')

In [None]:
model = tf.keras.models.load_model(
    '/kaggle/working/LinkNet.keras',
    custom_objects={'MeanIoUMetric': MeanIoUMetric} ,safe_mode=False
)


preds_test = model.predict(X_test, verbose=1)
preds_test_t = (preds_test > 0.5).astype(np.uint8)

In [None]:
#Perform a sanity check on some random Test Samples

import matplotlib.pyplot as plt
import numpy as np
import random

 
plt.style.use('seaborn-whitegrid') 

fig, axes = plt.subplots(3, 3, figsize=(15, 12))  
fig.subplots_adjust(hspace=0.3, wspace=-0.5)

for i in range(3):
    if i ==0 :
        axes[i, 0].set_title("Image", fontsize=16) 
        axes[i, 1].set_title("Ground Truth", fontsize=16)
        axes[i, 2].set_title("Predicted", fontsize=16)
        
        
    ix = random.randint(0, len(preds_test_t) - 1) 

    # Image
    axes[i, 0].imshow(np.dstack((X_test[ix], X_test[ix], X_test[ix])))
    
    axes[i, 0].axis('off') 

    
    # Ground Truth
    im_gt = axes[i, 1].imshow(Y_test[ix], cmap='magma')
    
    axes[i, 1].axis('off')
    

    
    # Prediction
    im_pred = axes[i, 2].imshow(preds_test_t[ix], cmap='magma')
    
    axes[i, 2].axis('off')
    

plt.savefig("prediction_visualization.png", dpi=600) 
plt.show()