In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision


In [None]:
#metrics
from tqdm import tqdm
import imageio
from albumentations import HorizontalFlip, VerticalFlip, ElasticTransform, Transpose, RandomRotate90, GridDistortion, OpticalDistortion, CoarseDropout
from google.colab.patches import cv2_imshow

def iou(y_true, y_pred):
    def f(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        x = (intersection + 1e-15) / (union + 1e-15)
        x = x.astype(np.float32)
        return x
    return tf.numpy_function(f, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [None]:
#Q2.	Add a multi-channel attention mechanism at the short connection of the model
## the attention module is added on the skip features before the concatination of the convolution from the upsampling layer and the skip feature.
import tensorflow as tf
import numpy as np,sys
from tensorflow.keras.layers import Conv2D, BatchNormalization, Multiply, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input, GlobalAveragePooling2D, GlobalMaxPooling2D, Reshape, Dense
from tensorflow.keras.models import Model


def channel_attention_module(x, ratio=8):
  b, _, _, channel = x.shape
  ## Shared layers
  l1 = Dense(channel//ratio, activation="relu")
  l2 = Dense(channel)

  ## Global Avrage Pooling
  x1 = GlobalAveragePooling2D()(x)
  x1 = l1(x1)
  x1 = l2(x1)

  #Global Max Pooling
  x2 = GlobalMaxPooling2D()(x)
  x2 = l1(x2)
  x2 = l2(x2)

  ##Add both and apply sigmoid
  Z = x1 + x2
  Z = Activation("sigmoid")(Z)
  x = Multiply()([x, Z])

  return x


def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    
    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x

def encoder_block(inputs, num_filters):
    s = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(s)
    return s, p

def decoder_block(inputs, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(inputs)
    sk = channel_attention_module(skip_features)
    x = Concatenate()([x, sk])
    x = conv_block(x, num_filters)
    return x

def build_unet(input_shape):
    """input layer"""
    inputs = Input(input_shape)

    """Encoder"""
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    """Bottleneck"""
    b1 = conv_block(p4, 1024)

    """Decoder"""
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="MCAM_UNET")
    return model

if __name__ == "__main__":
    input_shape = (512, 512, 1)
    model = build_unet(input_shape)
    model.summary()

Model: "MCAM_UNET"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 512, 512, 1  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 512, 512, 64  640         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization (BatchNorm  (None, 512, 512, 64  256        ['conv2d[0][0]']                 
 alization)                     )                                                         

In [None]:
H = 512
W = 512

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def load_data(path):
    x = sorted(glob(os.path.join(path, "image", "*.jpg")))
    y = sorted(glob(os.path.join(path, "mask", "*.jpg")))
    return x, y

def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    # x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  ## (512, 512)
    # x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)              ## (512, 512, 1)
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(X, Y, batch_size=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(4)
    return dataset

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory to save files """
    create_dir("files")

    """ Hyperparameters """
    batch_size = 4
    lr = 0.001
    num_epochs = 150
    model_path = os.path.join("/content/drive/MyDrive/files", "channelmodel.h5")
    csv_path = os.path.join("/content/drive/MyDrive/files", "channeldata.csv")

    """ Dataset """
    dataset_path = "/content/drive/MyDrive/finaldata"
    train_path = os.path.join(dataset_path, "train")
    valid_path = os.path.join(dataset_path, "val")

    train_x, train_y = load_data(train_path)
    train_x, train_y = shuffling(train_x, train_y)
    valid_x, valid_y = load_data(valid_path)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch_size=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch_size=batch_size)

    train_steps = len(train_x)//batch_size
    valid_setps = len(valid_x)//batch_size

    if len(train_x) % batch_size != 0:
        train_steps += 1
    if len(valid_x) % batch_size != 0:
        valid_setps += 1

    """ Model """
    model = build_unet((H, W, 3))
    model.compile(loss=dice_loss, optimizer=Adam(lr), metrics=[dice_coef, iou, Recall(), Precision()])
    # model.summary()

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor="val_loss", factor = 0.01, patience=5, min_lr= 0.0001, verbose=1),
        CSVLogger(csv_path),
        TensorBoard(),
        EarlyStopping(monitor="val_loss", patience=150, restore_best_weights=False)
    ]
    history =  model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        steps_per_epoch=train_steps,
        validation_steps=valid_setps,
        callbacks=callbacks
    )

Train: 160 - 160
Valid: 20 - 20
Epoch 1/150
Epoch 1: val_loss improved from inf to 0.80331, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 2/150
Epoch 2: val_loss improved from 0.80331 to 0.66746, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 3/150
Epoch 3: val_loss improved from 0.66746 to 0.63798, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 4/150
Epoch 4: val_loss improved from 0.63798 to 0.52581, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 5/150
Epoch 5: val_loss improved from 0.52581 to 0.52226, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 6/150
Epoch 6: val_loss improved from 0.52226 to 0.46041, saving model to /content/drive/MyDrive/files/channelmodel.h5
Epoch 7/150
Epoch 7: val_loss did not improve from 0.46041
Epoch 8/150
Epoch 8: val_loss did not improve from 0.46041
Epoch 9/150
Epoch 9: val_loss improved from 0.46041 to 0.36485, saving model to /content/drive/MyDrive/f

In [None]:
#Evaluation
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import pandas as pd
import cv2
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score
#from metrics import dice_loss, dice_coef, iou

H = 512
W = 512

def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    # x = cv2.resize(x, (W, H))
    ori_x = x
    x = x/255.0
    x = x.astype(np.float32)
    return ori_x, x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)  ## (512, 512)
    # x = cv2.resize(x, (W, H))
    ori_x = x
    x = x/255.0
    x = x.astype(np.int32)
    return ori_x, x

def load_data(path):
    x = sorted(glob(os.path.join(path, "val/image", "*.jpg")))
    y = sorted(glob(os.path.join(path, "val/mask", "*.jpg")))
    return x, y

def save_results(ori_x, ori_y, y_pred, save_image_path):
    line = np.ones((H, 10, 3)) * 255     # 10 pixel white line to separate the images

    ori_y = np.expand_dims(ori_y, axis=-1)
    ori_y = np.concatenate([ori_y, ori_y, ori_y], axis=-1)

    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1) * 255

    concat_images = np.concatenate([ori_x, line, ori_y, line, y_pred], axis=1)    #concatinated images
    cv2.imwrite(save_image_path, concat_images)

if __name__ == "__main__":
    """ Save the results in this folder """
    create_dir("results")

    """ Load the model """
    with CustomObjectScope({'iou': iou, 'dice_coef': dice_coef, 'dice_loss': dice_loss}):
        model = tf.keras.models.load_model("/content/drive/MyDrive/files/channelmodel.h5")

    """ Load the dataset """
    dataset_path = os.path.join("/content/drive/MyDrive/finaldata")
    test_x, test_y = load_data(dataset_path)

    """ Make the prediction and calculate the metrics values """
    SCORE = []
    for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
      """ Extracting name """
      name = x.split("/")[-1].split(".")[0]
      #print(name)
      
      #Read the image and mask
      ori_x, x = read_image(x)
      ori_y, y = read_mask(y)

      #Prediction 
      y_pred = model.predict(np.expand_dims(x, axis=0))[0]
      y_pred = y_pred > 0.5   # the threshold to make it 0 or 1 predict
      y_pred = y_pred.astype(np.int32)
      y_pred = np.squeeze(y_pred, axis=-1)

      # Saving the images
      save_image_path = f"/content/drive/MyDrive/channelresults/{name}.jpg"
      save_results(ori_x, ori_y, y_pred, save_image_path)
   
           
      # Flatten the array
      y = y.flatten()
      y_pred = y_pred.flatten()

      #Calculate the metrics 
      acc_value = accuracy_score(y, y_pred)
      f1_value = f1_score(y, y_pred, labels=[0, 1], average="binary")
      jac_value = jaccard_score(y, y_pred, labels=[0, 1], average="binary")
      recall_value = recall_score(y, y_pred, labels=[0, 1], average="binary")
      precision_value = precision_score(y, y_pred, labels=[0, 1], average="binary")
      SCORE.append([name, acc_value, f1_value, jac_value, recall_value, precision_value])

    score = [s[1:] for s in SCORE]
    score = np.mean(score, axis=0)
    
    print(f"Accuracy: {score[0]:0.5f}")
    print(f"F1: {score[1]:0.5f}")
    print(f"Jaccard: {score[2]:0.5f}")
    print(f"Recall: {score[3]:0.5f}")
    print(f"Precision: {score[4]:0.5f}")
    

    #Saving 
    df = pd.DataFrame(SCORE, columns=["Image", "Acc", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("/content/drive/MyDrive/resultfiles/channelscore.csv")

     

  0%|          | 0/20 [00:00<?, ?it/s]



  5%|▌         | 1/20 [00:12<03:49, 12.09s/it]



 10%|█         | 2/20 [00:13<01:45,  5.83s/it]



 15%|█▌        | 3/20 [00:15<01:05,  3.84s/it]



 20%|██        | 4/20 [00:16<00:46,  2.90s/it]



 25%|██▌       | 5/20 [00:17<00:35,  2.37s/it]



 30%|███       | 6/20 [00:19<00:28,  2.02s/it]



 35%|███▌      | 7/20 [00:20<00:23,  1.77s/it]



 40%|████      | 8/20 [00:21<00:19,  1.61s/it]



 45%|████▌     | 9/20 [00:23<00:16,  1.52s/it]



 50%|█████     | 10/20 [00:24<00:14,  1.45s/it]



 55%|█████▌    | 11/20 [00:25<00:12,  1.44s/it]



 60%|██████    | 12/20 [00:26<00:10,  1.35s/it]



 65%|██████▌   | 13/20 [00:28<00:09,  1.34s/it]



 70%|███████   | 14/20 [00:29<00:07,  1.29s/it]



 75%|███████▌  | 15/20 [00:30<00:06,  1.27s/it]



 80%|████████  | 16/20 [00:31<00:04,  1.23s/it]



 85%|████████▌ | 17/20 [00:32<00:03,  1.22s/it]



 90%|█████████ | 18/20 [00:34<00:02,  1.22s/it]



 95%|█████████▌| 19/20 [00:35<00:01,  1.21s/it]



100%|██████████| 20/20 [00:36<00:00,  1.83s/it]

Accuracy: 0.92647
F1: 0.45209
Jaccard: 0.29287
Recall: 0.90697
Precision: 0.30365



