# LiverNet

In [None]:
## Importing necessary libraries

from keras.layers import *
from keras.callbacks import *
from keras.optimizers import *
from keras.models import load_model, Model
import cv2
import os
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from keras import backend as K

from tensorflow.keras import regularizers
from keras.utils import conv_utils

In [None]:
# defining file paths for training, validation and test sets
train_path = './data/Train/'
val_path = './data/Validation/'
test_path = './data/Test/'

# initializing fixed parameters
batch_size = 4
img_height = 224
img_width = 224
input_shape = (img_height , img_width , 3)

In [None]:
## initializing data generators
train_datagen = ImageDataGenerator(
    rescale=1. / 255,
    featurewise_center=True,
    featurewise_std_normalization=True,horizontal_flip = True,vertical_flip = True)

valid_datagen = ImageDataGenerator(
    rescale=1. / 255,
    featurewise_center=True,
    featurewise_std_normalization=True)

train_generator = train_datagen.flow_from_directory(
    train_path,
    target_size=(img_height, img_width),
    batch_size=batch_size,
    shuffle = True,
    class_mode='categorical')

validation_generator = valid_datagen.flow_from_directory(
    val_path,
    target_size=(img_height, img_width),
    batch_size=batch_size,
    shuffle = True,
    class_mode='categorical')

print(train_generator.class_indices)

In [None]:
def aspp(x,input_shape,out_stride):

    """
        ASPP Block
        
        Arguments:
        
            x: input feature map to the ASPP block
            input_shape: input shape of the feature map
            out_stride: the output stride
            
        Returns: 
            
            output feature map after processing
    """
    
    b0=Conv2D(256,(1,1),padding="same",use_bias=False)(x)
    b0=BatchNormalization()(b0)
    b0=Activation("relu")(b0)

    b1=DepthwiseConv2D((3,3),dilation_rate=(2,2),padding="same",use_bias=False)(x)
    b1=BatchNormalization()(b1)
    b1=Activation("relu")(b1)
    b1=Conv2D(256,(1,1),padding="same",use_bias=False)(b1)
    b1=BatchNormalization()(b1)
    b1=Activation("relu")(b1)

    b2=DepthwiseConv2D((3,3),dilation_rate=(3,3),padding="same",use_bias=False)(x)
    b2=BatchNormalization()(b2)
    b2=Activation("relu")(b2)
    b2=Conv2D(256,(1,1),padding="same",use_bias=False)(b2)
    b2=BatchNormalization()(b2)
    b2=Activation("relu")(b2)	

    b3=DepthwiseConv2D((3,3),dilation_rate=(4,4),padding="same",use_bias=False)(x)
    b3=BatchNormalization()(b3)
    b3=Activation("relu")(b3)
    b3=Conv2D(256,(1,1),padding="same",use_bias=False)(b3)
    b3=BatchNormalization()(b3)
    b3=Activation("relu")(b3)

    b5=DepthwiseConv2D((3,3),dilation_rate=(6,6),padding="same",use_bias=False)(x)
    b5=BatchNormalization()(b5)
    b5=Activation("relu")(b5)
    b5=Conv2D(256,(1,1),padding="same",use_bias=False)(b5)
    b5=BatchNormalization()(b5)
    b5=Activation("relu")(b5)

    # b5 = DepthwiseConv2D((3,3),dilation_rate=(6,6),padding="same",use_bias=False)(x)


    out_shape=int(input_shape[0]/out_stride)
    b4=AveragePooling2D(pool_size=(out_shape,out_shape))(x)
    b4=Conv2D(256,(1,1),padding="same",use_bias=False)(b4)
    b4=BatchNormalization()(b4)
    b4=Activation("relu")(b4)
    b4=BilinearUpsampling((out_shape,out_shape))(b4)

    x=Concatenate()([b4,b0,b1,b2,b3,b5])
    return x

In [None]:
def cbam_block(cbam_feature, ratio=8):
   
    """
        CBAM block
        
        Arguments:
        
            cbam_feature: input feature map to CBAM block
            ratio: channel division ratio in channel attention module
            
        Returns:
        
            output feature map after processing in CBAM block
    """

    cbam_feature = channel_attention(cbam_feature, ratio)
    cbam_feature = spatial_attention(cbam_feature)
    return cbam_feature

def channel_attention(input_feature, ratio=8):
    
    """
        Channel attention module
        
        Arguments:
            
            input_feature: input feature map to channel attention module
            ratio: channel reduction ratio
            
        Returns:
        
            the product of the channel attention map and input feature map 
    """
    
    channel_axis = 1 if K.image_data_format() == "channels_first" else -1
    channel = input_feature._keras_shape[channel_axis]
    
    shared_layer_one = Dense(channel//ratio,
                             activation='relu',
                             kernel_initializer='he_normal',
                             use_bias=True,
                             bias_initializer='zeros')
    shared_layer_two = Dense(channel,
                             kernel_initializer='he_normal',
                             use_bias=True,
                             bias_initializer='zeros')
    
    avg_pool = GlobalAveragePooling2D()(input_feature)    
    avg_pool = Reshape((1,1,channel))(avg_pool)
    assert avg_pool._keras_shape[1:] == (1,1,channel)
    avg_pool = shared_layer_one(avg_pool)
    assert avg_pool._keras_shape[1:] == (1,1,channel//ratio)
    avg_pool = shared_layer_two(avg_pool)
    assert avg_pool._keras_shape[1:] == (1,1,channel)
    
    max_pool = GlobalMaxPooling2D()(input_feature)
    max_pool = Reshape((1,1,channel))(max_pool)
    assert max_pool._keras_shape[1:] == (1,1,channel)
    max_pool = shared_layer_one(max_pool)
    assert max_pool._keras_shape[1:] == (1,1,channel//ratio)
    max_pool = shared_layer_two(max_pool)
    assert max_pool._keras_shape[1:] == (1,1,channel)
    
    cbam_feature = Add()([avg_pool,max_pool])
    cbam_feature = Activation('sigmoid')(cbam_feature)

    if K.image_data_format() == "channels_first":
        cbam_feature = Permute((3, 1, 2))(cbam_feature)
    
    return multiply([input_feature, cbam_feature])

def spatial_attention(input_feature):
    
    """
        Spatial attention module
        
        Arguments:
            
            input_feature: input feature map
            
        Returns:
            
            product of spatial attention map and input feature map
    """
    
    kernel_size = 7
    
    if K.image_data_format() == "channels_first":
        channel = input_feature._keras_shape[1]
        cbam_feature = Permute((2,3,1))(input_feature)
    else:
        channel = input_feature._keras_shape[-1]
        cbam_feature = input_feature
    
    avg_pool = Lambda(lambda x: K.mean(x, axis=3, keepdims=True))(cbam_feature)
    assert avg_pool._keras_shape[-1] == 1
    max_pool = Lambda(lambda x: K.max(x, axis=3, keepdims=True))(cbam_feature)
    assert max_pool._keras_shape[-1] == 1
    concat = Concatenate(axis=3)([avg_pool, max_pool])
    assert concat._keras_shape[-1] == 2
    cbam_feature = Conv2D(filters = 1,
                    kernel_size=kernel_size,
                    strides=1,
                    padding='same',
                    activation='sigmoid',
                    kernel_initializer='he_normal',
                    use_bias=False)(concat)	
    assert cbam_feature._keras_shape[-1] == 1
    
    if K.image_data_format() == "channels_first":
        cbam_feature = Permute((3, 1, 2))(cbam_feature)
        
    return multiply([input_feature, cbam_feature])



def residual_block(y, nb_channels, _strides=(1, 1), _project_shortcut=False):
    
    """
        Residual block (resnet block)
        
        Arguments: 
            
            y: input feature map
            nb_channels: number of channels
            _strides: output strides
            _project_shortcut: shortcut connection
            
        Returns:
        
            output feature map after processing
        
    """
    
  shortcut = y

  # down-sampling is performed with a stride of 2
  y = Conv2D(nb_channels, kernel_size=(3, 3), strides=_strides, padding='same')(y)
  y = BatchNormalization()(y)
  y = LeakyReLU()(y)

  y = Conv2D(nb_channels, kernel_size=(3, 3), strides=(1, 1), padding='same')(y)
  y = BatchNormalization()(y)

  # identity shortcuts used directly when the input and output are of the same dimensions
  if _project_shortcut or _strides != (1, 1):
    # when the dimensions increase projection shortcut is used to match dimensions (done by 1×1 convolutions)
    # when the shortcuts go across feature maps of two sizes, they are performed with a stride of 2
    shortcut = Conv2D(nb_channels, kernel_size=(1, 1), strides=_strides, padding='same')(shortcut)
    shortcut = BatchNormalization()(shortcut)

  y = add([shortcut, y])
  y = LeakyReLU()(y)

  return y


class BilinearUpsampling(Layer):

    """
        Bilinear Upsampling Class
    """
    
    def __init__(self, upsampling=(2, 2), data_format=None, **kwargs):

        """
            Constructor of Bilinear-Upsampling
        """
        
        super(BilinearUpsampling, self).__init__(**kwargs)
        self.data_format = K.normalize_data_format(data_format)
        self.upsampling = conv_utils.normalize_tuple(upsampling, 2, 'size')
        self.input_spec = InputSpec(ndim=4)

    def compute_output_shape(self, input_shape):
        height = self.upsampling[0] * \
            input_shape[1] if input_shape[1] is not None else None
        width = self.upsampling[1] * \
            input_shape[2] if input_shape[2] is not None else None
        return (input_shape[0],
                height,
                width,
                input_shape[3])

    def call(self, inputs):
        return tf.image.resize(inputs, (int(inputs.shape[1]*self.upsampling[0]),
                                                   int(inputs.shape[2]*self.upsampling[1])))

    def get_config(self):
        config = {'size': self.upsampling,
                  'data_format': self.data_format}
        base_config = super(BilinearUpsampling, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


def create_model():
    
    """
        Method to create the final model
    """
    
    dropRate = 0.3
    
    init = Input((224,224,3))
    x = Conv2D(32, (3, 3), activation=None, padding='same')(init) 
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(32, (3, 3), activation=None, padding='same')(x) 
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x1 = MaxPooling2D((2,2))(x)
    
    x = Conv2D(64, (3, 3), activation=None, padding='same')(x1)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = cbam_block(x)
    x = residual_block(x, 64)
    x2 = MaxPooling2D((2,2))(x)
    
    x = Conv2D(128, (3, 3), activation=None, padding='same')(x2)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = cbam_block(x)
    x = residual_block(x, 128)
    x3 = MaxPooling2D((2,2))(x)
    
    x1_aspp = aspp(x1,(112,112),1)
    x2_aspp = aspp(x2,(56,56),1)
    x3_aspp = aspp(x3,(28,28),1)

    ginp1 = UpSampling2D(size=(2, 2), interpolation='bilinear')(x1_aspp)
    ginp2 = UpSampling2D(size=(4, 4), interpolation='bilinear')(x2_aspp)
    ginp3 = UpSampling2D(size=(8, 8), interpolation='bilinear')(x3_aspp)
    
    hypercolumn = Concatenate()([ginp1, ginp2, ginp3]) 
    gap = GlobalAveragePooling2D()(hypercolumn)

    x = Dense(256, activation=None)(gap)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(dropRate)(x)
    
    x = Dense(256, activation=None)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    y = Dense(4, activation='softmax')(x)
   
    model = Model(init, y)
    return model



model = create_model()

for layer in model.layers:
    if hasattr(layer, 'kernel_regularizer'):
        layer.kernel_regularizer= regularizers.l2(0.003)
    
    if hasattr(layer, 'bias_regularizer'):
        layer.bias_regularizer= regularizers.l2(0.003)

adam = tf.keras.optimizers.Adam(lr = 0.00001)

from keras.optimizers import TFOptimizer
learning_rate = K.variable(0.001)

model.compile(optimizer = 'adam' , loss = 'categorical_crossentropy' , metrics = ["acc"])
model.summary()

In [None]:
## initialize call backs and train the model

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor = 'val_loss' ,
                                                  factor = 0.1 , patience = 3 , verbose=1 , cooldown = 1)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor = 'val_loss' , min_delta = 0.001, patience = 7,
                                                  verbose = 1, mode = 'min')

history = model.fit_generator(train_generator , validation_data = validation_generator , 
                                  steps_per_epoch= len(train_generator) ,
                                  validation_steps = len(validation_generator)
                                  ,epochs = 40,callbacks = [reduce_lr])

In [None]:
## Plot the loss

plt.plot(history.history['loss'] , label = 'train_loss')
plt.plot(history.history['val_loss'] , label = 'val_loss')
plt.ylabel("Loss")
plt.xlabel("Epochs")
plt.legend()
plt.show()


In [None]:
## Plot the accuracy

plt.plot(history.history['acc'] , label = 'train_acc')
plt.plot(history.history['val_acc'] , label = 'val_acc')
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.legend()
plt.show()


In [None]:
test_datagen = ImageDataGenerator(rescale=1. / 255)
test_generator = test_datagen.flow_from_directory(
    test_path,
    target_size=(img_height, img_width),
    batch_size=1,
    shuffle = False,
    class_mode='categorical')

## printing ground truth labels
print(test_generator.labels)

In [None]:
test_step = test_generator.n//test_generator.batch_size
test_generator.reset()
pred = model.predict_generator(test_generator , steps = test_step , verbose = 1)
pred_class_indices = np.argmax(pred,axis=1)

## printing predicted labels
print(pred_class_indices)

In [None]:
## Classwise performance analysis

classes = [0,1,2,3]


for cl in classes:

    print("class: ",cl)

    a1 = np.uint8(test_generator.labels == cl)
    a2 = np.uint8(pred_class_indices == cl)

    print('Accuracy {}'.format(accuracy_score(y_true=a1, y_pred=a2)))
    print('F1 {}'.format(f1_score(y_true=a1, y_pred=a2)))
    print('precision {}'.format(precision_score(y_true=a1, y_pred=a2)))
    print('recall {}'.format(recall_score(y_true=a1, y_pred=a2)))

    print('jaccard {}'.format(jaccard_score(y_true=a1, y_pred=a2)))
    print("_______________________________")

In [None]:
## Overall performance analysis

from sklearn.metrics import accuracy_score,roc_curve, confusion_matrix, roc_auc_score, auc, f1_score,jaccard_score,classification_report
from sklearn.metrics import precision_score,recall_score,jaccard_score

print('Accuracy {}'.format(accuracy_score(y_true=test_generator.labels, y_pred=pred_class_indices)))
print('F1 {}'.format(f1_score(y_true=test_generator.labels, y_pred=pred_class_indices,average = "macro")))
print('precision {}'.format(precision_score(y_true=test_generator.labels, y_pred=pred_class_indices,average = "macro")))
print('recall {}'.format(recall_score(y_true=test_generator.labels, y_pred=pred_class_indices,average = "macro")))

print('jaccard {}'.format(jaccard_score(y_true=test_generator.labels, y_pred=pred_class_indices,average = "macro")))
print('confusion_matrix\n {}'.format(confusion_matrix(y_true=test_generator.labels, y_pred=pred_class_indices)))
print('classification_report\n {}'.format(classification_report(y_true=test_generator.labels, y_pred=pred_class_indices)))
print('\n\n')