Building the model using tensorflow and keras as backend. [ Tensor flow version - 2.9.0 was used]

In [1]:
#importing libraries
import tensorflow as tf
import keras
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model
from tensorflow.keras.applications import *
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
import cv2
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Activation
from tensorflow.keras.losses import binary_crossentropy
from keras.metrics import Recall,Precision
from sklearn.model_selection import train_test_split
from tensorflow.keras.optimizers import Adam, Nadam

In [2]:
#Building the model


def squeeze_excite_block(inputs, ratio=8):
    init = inputs
    channel_axis = -1
    filters = init.shape[channel_axis]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)

    x = Multiply()([init, se])
    return x


def conv_block(inputs, filters):
    x = inputs
    

    x = Conv2D(filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = Conv2D(filters, (3, 3), padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = squeeze_excite_block(x)

    return x

def ASPP(x, filter):
    shape = x.shape

    y1 = AveragePooling2D(pool_size=(shape[1], shape[2]))(x)
    y1 = Conv2D(filter, 1, padding="same")(y1)
    y1 = BatchNormalization()(y1)
    y1 = Activation("relu")(y1)
    y1 = UpSampling2D((shape[1], shape[2]), interpolation='bilinear')(y1)

    y2 = Conv2D(filter, 1, dilation_rate=1, padding="same", use_bias=False)(x)
    y2 = BatchNormalization()(y2)
    y2 = Activation("relu")(y2)

    y3 = Conv2D(filter, 3, dilation_rate=6, padding="same", use_bias=False)(x)
    y3 = BatchNormalization()(y3)
    y3 = Activation("relu")(y3)

    y4 = Conv2D(filter, 3, dilation_rate=12, padding="same", use_bias=False)(x)
    y4 = BatchNormalization()(y4)
    y4 = Activation("relu")(y4)

    y5 = Conv2D(filter, 3, dilation_rate=18, padding="same", use_bias=False)(x)
    y5 = BatchNormalization()(y5)
    y5 = Activation("relu")(y5)

    y = Concatenate()([y1, y2, y3, y4, y5])

    y = Conv2D(filter, 1, dilation_rate=1, padding="same", use_bias=False)(y)
    y = BatchNormalization()(y)
    y = Activation("relu")(y)

    return y

def attention_gate(x, g):
    # Squeeze and excitation block
    filters = x.shape[-1]

    # Global average pooling
    g_ap = tf.reduce_mean(g, axis=[1, 2], keepdims=True)

    # Fully connected layer
    g_ap = Conv2D(filters // 8, (1, 1), activation='sigmoid', kernel_initializer='he_normal', padding='same')(g_ap)
    g_ap = Conv2D(filters, (1, 1), activation='sigmoid', kernel_initializer='he_normal', padding='same')(g_ap)

    # Element-wise multiplication
    x = tf.keras.layers.Multiply()([x, g_ap])
    return x



def combine_outputs_with_attention(outputs1, outputs2, outputs3):
    # Concatenate the outputs along the channel axis
    concatenated_outputs = Concatenate(axis=-1)([outputs1, outputs2, outputs3])

    # Apply a convolutional layer to reduce the channel dimension
    reduced_channels = Conv2D(1, (1, 1), activation='sigmoid', kernel_initializer='he_normal', padding='same')(concatenated_outputs)

    # Element-wise multiplication with the original concatenated outputs
    combined_output = tf.keras.layers.Multiply()([concatenated_outputs, reduced_channels])

    return combined_output

def encoder1(inputs):
    base_model= tf.keras.applications.EfficientNetV2L(include_top=False,weights="imagenet",input_tensor=inputs)
    skip_connections = [
        base_model.get_layer('stem_conv').output,
        base_model.get_layer('block2g_expand_conv').output,
        base_model.get_layer('block4a_expand_conv').output,
        base_model.get_layer('block5s_expand_conv').output,
    ]
    
    output = base_model.get_layer('block7g_expand_conv').output
    return output, skip_connections

def decoder1(inputs, skip_connections):
    num_filters = [256, 128, 64,32]
    skip_connections.reverse()
    x = inputs

    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2), interpolation='bilinear')(x)
        x = Concatenate()([x, skip_connections[i]])
        x = conv_block(x, f)

    return x

def encoder2(inputs):
    model_2= tf.keras.applications.DenseNet201(include_top=False,weights="imagenet",input_tensor=inputs)
    skip_connections = [
        model_2.get_layer('conv1/conv').output,
        model_2.get_layer('conv2_block6_concat').output,
        model_2.get_layer('conv3_block12_concat').output,
        model_2.get_layer('conv4_block48_concat').output,
    ]
    
    output = model_2.get_layer('conv5_block32_concat').output
    return output, skip_connections



def encoder3(inputs):
    num_filters = [32,64, 128, 256]
    skip_connections = []
    x = inputs
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    for i, f in enumerate(num_filters):
        x = conv_block(x, f)
        skip_connections.append(x)
        x = MaxPool2D((2, 2))(x)

    return x, skip_connections






def decoder3(inputs, skip_1, skip_2,skip_3):
    num_filters = [256, 128, 64, 32]
    skip_1.reverse()
    x = inputs

    for i, f in enumerate(num_filters):
        x = UpSampling2D((2, 2), interpolation='bilinear')(x)
        
        # Apply attention gate
        skip_1_att = attention_gate(skip_1[i], x)
        skip_2_att = attention_gate(skip_2[i], x)
        skip_3_att = attention_gate(skip_3[i], x)
        
        # Concatenate attention gated skip connections
        x = Concatenate()([x, skip_1_att, skip_2_att,skip_3_att])
        
        # Apply convolution block
        x = conv_block(x, f)

    return x


def output_block1(inputs):
    # Final conv layer
    x = Conv2D(1, (1, 1), padding="same")(inputs)
    x = tf.keras.layers.UpSampling2D(size=(2, 2), interpolation='nearest')(x)  
    outputs = tf.keras.layers.Conv2D(1, 1, activation='sigmoid')(x)
    return outputs




# shape=(128,128,3)
def build_model(shape):
    inputs = Input(shape)
    
    #efficient net v2
    z, skip_1_z = encoder1(inputs)
    
    z = ASPP(z,64)
    z = decoder1(z, skip_1_z)
    
    output1=output_block1(z)
        
    
    #dense201    
   
    y,skip_1_y = encoder2(inputs)
    y=ASPP(y,64)
    y=decoder1(y,skip_1_y)
    output2=output_block1(y)
    
              

    x = inputs * output1 * output2
    
      
    x, skip_2 = encoder3(x)
    x = ASPP(x, 64)
    x = decoder3(x, skip_2,skip_1_y,skip_1_z)
    outputs2 = output_block1(x)
    
    combined_output = combine_outputs_with_attention(output1, output2, outputs2)
          
    
    model = Model(inputs, combined_output)
    return model

model = build_model((128, 128, 3))
model.summary()



Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 128, 128, 3  0           []                               
                                )]                                                                
                                                                                                  
 rescaling (Rescaling)          (None, 128, 128, 3)  0           ['input_1[0][0]']                
                                                                                                  
 stem_conv (Conv2D)             (None, 64, 64, 32)   864         ['rescaling[0][0]']              
                                                                                                  
 stem_bn (BatchNormalization)   (None, 64, 64, 32)   128         ['stem_conv[0][0]']          

IOPub message rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [None]:
# training parameters
r=1e-4
model.compile(loss='BinaryCrossentropy',optimizer=Adam(lr))

callbacks=[tf.keras.callbacks.EarlyStopping(patience=10,monitor='val_loss'),
           tf.keras.callbacks.TensorBoard(log_dir='logs')]

results=model.fit(X_train,Y_train,validation_split=0.2,batch_size=4,epochs=50,callbacks=callbacks)
model.save("Dualpath_ff_net_model_trained") # specify the path for saving the model parameters.

In [None]:
#loading the model

from tensorflow.keras.models import load_model

loaded_model = tf.keras.models.load_model("Dualpath_ff_net_model_trained") # Replace with actual path where the trained model is saved.





In [None]:
#inferencig the prediction
predict_test=loaded_model.predict(X_subset,verbose=1) #replace X_subset with a image tensor of shape of minimum (1,128,128,3)
                                                        # where 1 represents the number of images. 
                                                        

In [None]:
#visualizing the prediction
plt.imshow(predict_test[0])