In [None]:
# import libraries that might be needed
import numpy as np

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping

from keras.models import Model
from keras.layers import Input, Dense, Dropout, Flatten, Add , UpSampling2D
from keras.layers import Conv2D, MaxPooling2D ,BatchNormalization, Activation

from keras.backend import concatenate

In [None]:
class Res_or_Conv2D():
    """
    A class to represent a convolution or a residual block
    ...

    Attributes
    ----------
    filters : int -- number of filters for the convolutions
    kernel_size : int or tuple of ints -- kernel size for the convolutions 
    use_residual : bool -- indicates wheather to use convolution or residual block
    ----------
    Methods
    -------
    __call__ : method to apply a convolution or a residual block on a certain input
               input_x is a volume input of size 4 (batch_size , height , width , n_channels)
    """
    def __init__(self , filters , kernel_size, use_residual = False):
        self.filters = filters
        self.kernel_size  = kernel_size
        self.use_residual = use_residual
    def __call__(self , input_x):
        if self.use_residual : 
            # residual block = Conv + Conv --> + Input 
            # to add the input to the output of the convolution the two must have the same number of channels == filters
            # so we must perform a convolution of the same number of channels == filter and 1x1 kernel on the input
            # so we can be able to add them up.
            conv1 = Conv2D(self.filters ,self.kernel_size, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal' )(input_x)
            conv2 = Conv2D(self.filters ,self.kernel_size, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal' )(conv1)
            projected_x = Conv2D(self.filters ,1, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal' )(input_x)
            out = Add()([conv2, projected_x])
        else : 
            out = Conv2D(self.filters ,self.kernel_size, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal' )(input_x)
        return out

# Part 3/ - Question 1 -

In [None]:
"""
In the function in github, we replace Conv2D with the new class created Res_or_Conv2D
and add more parameters to the function
"""
def unet_modified(pretrained_weights = None,input_size = (256,256,3) ,filters = 64 , kernel_size = (3,3), use_residual = False):
    inputs = Input(input_size)
    conv1 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(inputs)
    conv1 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(pool1)
    conv2 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(pool2)
    conv3 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Res_or_Conv2D(8 * filters, kernel_size, use_residual = use_residual)(pool3)
    conv4 = Res_or_Conv2D(8 * filters, kernel_size, use_residual = use_residual)(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    #########################
    # Bottleneck
    conv5 = Res_or_Conv2D(16 * filters, kernel_size, use_residual = use_residual)(pool4)
    conv5 = Res_or_Conv2D(16 * filters, kernel_size, use_residual = use_residual)(conv5)
    drop5 = Dropout(0.5)(conv5)
    #########################

    # Up + Conv is going to double up the shape of its input
    # The convolution block in this line could be left out of the parameters controled in the function
    # because it's only to transform the upsampled input since the upsampling layer is not trainable
    # Up + Conv == Convtransposed
    up6 = Res_or_Conv2D(8 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(drop5))
    merge6 = concatenate([drop4,up6], axis = 3) # we assume that the data_format is NHWC
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(merge6)
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(conv6)

    up7 = Res_or_Conv2D(4 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(merge7)
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(conv7)

    up8 = Res_or_Conv2D(2 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(merge8)
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(conv8)

    up9 = Res_or_Conv2D(filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(merge9)
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(conv9)
    ########################

    conv9 = Res_or_Conv2D(2, kernel_size, use_residual = use_residual)(conv9)
    # If we put a residual block in the last convolution we would obtain values between [0,2] instead of [0,1]
    # so we must divide by 2 or leave a convolution block.
    conv10 = Conv2D(1, 1, activation = 'sigmoid')(conv9)

    model = Model(inputs = inputs, outputs = conv10)

    model.compile(optimizer = Adam(lr = 1e-4), loss = 'binary_crossentropy', metrics = ['accuracy'])
    
    if (pretrained_weights):
        model.load_weights(pretrained_weights)

    return model

In [None]:
u_net = unet_modified( use_residual= True)

In [None]:
u_net.summary()

Model: "model_8"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_7 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
conv2d_340 (Conv2D)             (None, 256, 256, 64) 1792        input_7[0][0]                    
__________________________________________________________________________________________________
conv2d_341 (Conv2D)             (None, 256, 256, 64) 36928       conv2d_340[0][0]                 
__________________________________________________________________________________________________
conv2d_342 (Conv2D)             (None, 256, 256, 64) 256         input_7[0][0]                    
____________________________________________________________________________________________

# Question 2/ 
# - a - Create a keras.model for the encoder only
# - b - Create the model where two encoders take two different images and their output is going to be concatenated and fed to a decoder.

For this question, I think it's much better to create an independent decoder instead of extracting it from the u-net keras.model that we assumed exists, because we need layer names and how many convolution blocks or residual blocks exist. and even if we had acces to layer names it could be messy to loop through layer names and construct a decoder because there is skip connections passed from the encoder , which means they should be identified as well and extracted. <br>
The encoder-decoder is without skip connections but in the function below a paramaters was added if we want the encoder to pass skip connection. <br>
This parameters is ignored by default and the decoder passes only its last output.

And 2 decoder architectures were created for each case if the skip connections were transfered or not.

At the end a final model that combines the two and constructs the model described in the question.

When the skip connections added the architecture is going to be like two U-nets sharing the same decoder


In [None]:
def encoder_half(input_size = (256,256,3) ,filters = 64 , kernel_size = (3,3), use_residual = False , transfer_skip_conx = False):
    """
    This function creates a keras.Model object of only the encoder
    The encoder in the u-net architecture ends at the bottleneck before any up-sampling
    parameters : 
        - input_size : input shape of the encoder
        - filters : number of filters to use in the convolution blocks or residual blocks.
        - kernel_size : kernel sizes for the convolution or residual blocks
        - use_residual : boolean to indicate wheather to use convolution or residual blocks
        - transfer_skip_conx : bool to indicate wheather to output feature maps gathered from the previous 
                               blocks with the output or to output the output only.
                               if True skip connections are outputted
                               if False only the output is the output the last layer
    return : 
        - Keras.Model of the encoder only.
    """
    inputs = Input(input_size)
    conv1 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(inputs)
    conv1 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(pool1)
    conv2 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(pool2)
    conv3 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Res_or_Conv2D(8 * filters, kernel_size, use_residual = use_residual)(pool3)
    conv4 = Res_or_Conv2D(8 * filters, kernel_size, use_residual = use_residual)(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    #########################
    # The Bottleneck is included in the encoder (no Upsampling has started yet)
    conv5 = Res_or_Conv2D(16 * filters, kernel_size, use_residual = use_residual)(pool4)
    conv5 = Res_or_Conv2D(16 * filters, kernel_size, use_residual = use_residual)(conv5)
    output = Dropout(0.5)(conv5)
    #########################

    # Check if skip connections must be transfered or not
    if transfer_skip_conx : 
        encoder = Model(inputs = inputs, outputs = [drop4 , conv3 , conv2 , conv1 ,output])
    else :
        encoder = Model(inputs = inputs, outputs = output)
    return encoder


def decoder_half(inputs , filters = 64 , kernel_size = (3,3), use_residual = False):
    """
    This function applies layers of a decoder to the output of the encoder when the encoder
    does not transfer skip connections in it's output.
    parameters : 
        - inputs : input keras.Tensor the output of the encoder when it does not transfer skip connections.
        - filters : number of filters to use in the convolution blocks or residual blocks.
        - kernel_size : kernel sizes for the convolution or residual blocks
        - use_residual : boolean to indicate wheather to use convolution or residual blocks
    return : 
        - The output of the decoder whitout the skip connections
    """
    up_conv6 = Res_or_Conv2D(8 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(inputs))
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(up_conv6)
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(conv6)

    up_conv7 = Res_or_Conv2D(4 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv6))
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(up_conv7)
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(conv7)

    up_conv8 = Res_or_Conv2D(2 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv7))
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(up_conv8)
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(conv8)

    up_conv9 = Res_or_Conv2D(filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv8))
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(up_conv9)
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(conv9)

    conv9 = Res_or_Conv2D(2, kernel_size, use_residual = use_residual)(conv9)
    # If we put a residual block in the last convolution we would obtain values between [0,2] instead of [0,1]
    # so we must divide by 2 or leave a convolution block.
    output = Conv2D(1, 1, activation = 'sigmoid')(conv9)

    return output

def decoder_half_multiple(all_inputs , filters=64 , kernel_size=(3,3) , use_residual = False ):
    """
    This function applies layers of a decoder to the output of the encoder when the encoder
    transfers skip connections in it's output.
    parameters : 
        - all_inputs : list of keras.Tensor that contain the feature maps to use to concatenate in the layers   
                        of the decoder , and it contains also the last output of the decoder (bottleneck)
                        all_inputs = [feature_maps1 ,.., output of bottleneck]
                        The last entry of the list all_inputs is the output of the bottleneck of the encoder
        - filters : int -- number of filters to use in the convolution blocks or residual blocks.
        - kernel_size : int or tuple -- kernel sizes for the convolution or residual blocks
        - use_residual : boolean -- to indicate wheather to use convolution or residual blocks
    return : 
        - The output of the decoder whith the skip connections
    """
    up_conv6 = Res_or_Conv2D(8 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(all_inputs[4]))
    merge6 = concatenate([all_inputs[0] , up_conv6] , axis = 3)
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(merge6)
    conv6 = Res_or_Conv2D(8 * filters, kernel_size ,use_residual = use_residual)(conv6)

    up_conv7 = Res_or_Conv2D(4 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([all_inputs[1] , up_conv7] , axis = 3)
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(merge7)
    conv7 = Res_or_Conv2D(4 * filters, kernel_size, use_residual = use_residual)(conv7)

    up_conv8 = Res_or_Conv2D(2 * filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([all_inputs[2] , up_conv8] , axis = 3)
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(merge8)
    conv8 = Res_or_Conv2D(2 * filters, kernel_size, use_residual = use_residual)(conv8)

    up_conv9 = Res_or_Conv2D(filters, 2, use_residual = use_residual)(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([all_inputs[3] , up_conv9] , axis = 3)
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(merge9)
    conv9 = Res_or_Conv2D(filters, kernel_size, use_residual = use_residual)(conv9)
    ########################

    conv9 = Res_or_Conv2D(2, kernel_size, use_residual = use_residual)(conv9)
    # If we put a residual block in the last convolution we would obtain values between [0,2] instead of [0,1]
    # so we must divide by 2 or leave a convolution block.
    output = Conv2D(1, 1, activation = 'sigmoid')(conv9)

    return output 

In [None]:
class final_model():
    """
    This class ig going to allow us to create the model where 2 encoders take
    2 images and we take the output of the encoders and concatenate them before feeding them to a decoder.
    The encoders's output depend on skip connections being transfered or not.

    Attributes
    ----------
    filters : int -- number of filters for the convolutions
    kernel_size : int or tuple -- kernel size for the convolutions 
    use_residual : bool -- indicates wheather to use convolution or residual block
    transfer_skip_conx : bool --- indicates wheather to output feature maps gathered from the previous 
                                  blocks with the output or to output the output only.
                                  if True skip connections are outputted
                                  if False only the output is the output the last layer
    Methods
    -------
    __call__ : method to construct the keras.Model and output it.
              parameters : input_shape_1 and input_shape_2 is the inputs of the model i.e the two images 
                           the inputs must be the same in height and width, number of channels is not necessary.
    """
    def __init__(self ,filters = 64 , kernel_size = (3,3) , use_residual = False , transfer_skip_conx = False):
        self.filters = filters
        self.kernel_size  = kernel_size
        self.use_residual = use_residual
        self.transfer_skip_conx = transfer_skip_conx
    
    def __call__(self , input_shape_1 = (256,256,3) , input_shape_2 = (256,256,3)):
        # We define the inputs for the encoders
        inputs_1 = Input(input_shape_1 , name='input_1')
        inputs_2 = Input(input_shape_2 , name='input_2')
        
        # We declare two encoders with the option of using residual block and transfer skip connections
        encoder_1 = encoder_half(input_shape_1 , use_residual = self.use_residual , transfer_skip_conx= self.transfer_skip_conx)
        encoder_2 = encoder_half(input_shape_2 , use_residual = self.use_residual , transfer_skip_conx= self.transfer_skip_conx)

        # the encoder takes it's input and outputs a result
        output_1 = encoder_1(inputs_1)
        output_2 = encoder_2(inputs_2)
        
        # here we check if we have skip connections transfered from the encoders 
        # if True we must concatenate all of them
        # if False we only have the output of the bottleneck.
        if self.transfer_skip_conx:
            # we assuma that data_format is NHWC
            output_concat = [concatenate([output_1[i] , output_2[i]],axis = 3) for i in range(len(output_1))]
            output = decoder_half_multiple(output_concat , use_residual = self.use_residual)
        else :
            output_concat = concatenate([output_1 , output_2],axis = 3)
            output = decoder_half(output_concat, use_residual = self.use_residual)

        return Model([inputs_1 , inputs_2] ,output)

In [None]:
encode_2_decod = final_model(transfer_skip_conx=False , use_residual=False)()

In [None]:
# 2 encoders with encoder without skip connections and with residual blocks
encode_2_decod.summary()

Model: "model_17"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
input_2 (InputLayer)            [(None, 256, 256, 3) 0                                            
__________________________________________________________________________________________________
model_15 (Functional)           (None, 16, 16, 1024) 18843200    input_1[0][0]                    
__________________________________________________________________________________________________
model_16 (Functional)           (None, 16, 16, 1024) 18843200    input_2[0][0]                    
___________________________________________________________________________________________

In [None]:
encode_2_decod = final_model(transfer_skip_conx=False , use_residual=True)()

In [None]:
for layer in encode_2_decod.layers:
  print(layer.name)

input_1
input_2
model_21
model_22
tf.concat_42
up_sampling2d_36
conv2d_672
conv2d_673
conv2d_674
add_188
conv2d_675
conv2d_676
conv2d_677
add_189
conv2d_678
conv2d_679
conv2d_680
add_190
up_sampling2d_37
conv2d_681
conv2d_682
conv2d_683
add_191
conv2d_684
conv2d_685
conv2d_686
add_192
conv2d_687
conv2d_688
conv2d_689
add_193
up_sampling2d_38
conv2d_690
conv2d_691
conv2d_692
add_194
conv2d_693
conv2d_694
conv2d_695
add_195
conv2d_696
conv2d_697
conv2d_698
add_196
up_sampling2d_39
conv2d_699
conv2d_700
conv2d_701
add_197
conv2d_702
conv2d_703
conv2d_704
add_198
conv2d_705
conv2d_706
conv2d_707
add_199
conv2d_708
conv2d_709
conv2d_710
add_200
conv2d_711
