In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
################################Resunet++############
import os
import numpy as np
import cv2

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

def squeeze_excite_block(inputs, ratio=8):
    init = inputs
    channel_axis = -1
    filters = init.shape[channel_axis]
    se_shape = (1, 1, filters)

    se = GlobalAveragePooling2D()(init)
    se = Reshape(se_shape)(se)
    se = Dense(filters // ratio, activation='relu', kernel_initializer='he_normal', use_bias=False)(se)
    se = Dense(filters, activation='sigmoid', kernel_initializer='he_normal', use_bias=False)(se)

    x = Multiply()([init, se])
    return x

def stem_block(x, n_filter, strides):
    x_init = x

    ## Conv 1
    x = Conv2D(n_filter, (3, 3), padding="same", strides=strides)(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same")(x)

    ## Shortcut
    s  = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    s = BatchNormalization()(s)

    ## Add
    x = Add()([x, s])
    x = squeeze_excite_block(x)
    return x


def resnet_block(x, n_filter, strides=1):
    x_init = x

    ## Conv 1
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same", strides=strides)(x)
    ## Conv 2
    x = BatchNormalization()(x)
    x = Activation("relu")(x)
    x = Conv2D(n_filter, (3, 3), padding="same", strides=1)(x)

    ## Shortcut
    s  = Conv2D(n_filter, (1, 1), padding="same", strides=strides)(x_init)
    s = BatchNormalization()(s)

    ## Add
    x = Add()([x, s])
    x = squeeze_excite_block(x)
    return x

def aspp_block(x, num_filters, rate_scale=1):
    x1 = Conv2D(num_filters, (3, 3), dilation_rate=(6 * rate_scale, 6 * rate_scale), padding="same")(x)
    x1 = BatchNormalization()(x1)

    x2 = Conv2D(num_filters, (3, 3), dilation_rate=(12 * rate_scale, 12 * rate_scale), padding="same")(x)
    x2 = BatchNormalization()(x2)

    x3 = Conv2D(num_filters, (3, 3), dilation_rate=(18 * rate_scale, 18 * rate_scale), padding="same")(x)
    x3 = BatchNormalization()(x3)

    x4 = Conv2D(num_filters, (3, 3), padding="same")(x)
    x4 = BatchNormalization()(x4)

    y = Add()([x1, x2, x3, x4])
    y = Conv2D(num_filters, (1, 1), padding="same")(y)
    return y

def attetion_block(g, x):
    """
        g: Output of Parallel Encoder block
        x: Output of Previous Decoder block
    """

    filters = x.shape[-1]

    g_conv = BatchNormalization()(g)
    g_conv = Activation("relu")(g_conv)
    g_conv = Conv2D(filters, (3, 3), padding="same")(g_conv)

    g_pool = MaxPooling2D(pool_size=(2, 2), strides=(2, 2))(g_conv)

    x_conv = BatchNormalization()(x)
    x_conv = Activation("relu")(x_conv)
    x_conv = Conv2D(filters, (3, 3), padding="same")(x_conv)

    gc_sum = Add()([g_pool, x_conv])

    gc_conv = BatchNormalization()(gc_sum)
    gc_conv = Activation("relu")(gc_conv)
    gc_conv = Conv2D(filters, (3, 3), padding="same")(gc_conv)

    gc_mul = Multiply()([gc_conv, x])
    return gc_mul

class ResUnetPlusPlus:
    def __init__(self, input_size=256):
        self.input_size = input_size

    def build_model(self):
        n_filters = [16, 32, 64, 128, 256]
        inputs = Input((self.input_size, self.input_size, 3))

        c0 = inputs
        c1 = stem_block(c0, n_filters[0], strides=1)

        ## Encoder
        c2 = resnet_block(c1, n_filters[1], strides=2)
        c3 = resnet_block(c2, n_filters[2], strides=2)
        c4 = resnet_block(c3, n_filters[3], strides=2)

        ## Bridge
        b1 = aspp_block(c4, n_filters[4])

        ## Decoder
        d1 = attetion_block(c3, b1)
        d1 = UpSampling2D((2, 2))(d1)
        d1 = Concatenate()([d1, c3])
        d1 = resnet_block(d1, n_filters[3])

        d2 = attetion_block(c2, d1)
        d2 = UpSampling2D((2, 2))(d2)
        d2 = Concatenate()([d2, c2])
        d2 = resnet_block(d2, n_filters[2])

        d3 = attetion_block(c1, d2)
        d3 = UpSampling2D((2, 2))(d3)
        d3 = Concatenate()([d3, c1])
        d3 = resnet_block(d3, n_filters[1])

        ## output
        outputs = aspp_block(d3, n_filters[0])
        outputs = Conv2D(1, (1, 1), padding="same")(outputs)
        outputs = Activation("sigmoid")(outputs)

        ## Model
        model = Model(inputs, outputs)
        return model
arch = ResUnetPlusPlus(input_size=256)
model = arch.build_model()
model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d (Conv2D)                (None, 256, 256, 16  448         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 batch_normalization (BatchNorm  (None, 256, 256, 16  64         ['conv2d[0][0]']                 
 alization)                     )                                                             

In [None]:
import os
import numpy as np
import cv2
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, TensorBoard, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
from glob import glob
from tensorflow.keras.losses import binary_crossentropy

#from your_model_file import build_model

def read_image(path):
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (256, 256))
    x = x / 255.0
    return x

def read_mask(path):
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (256, 256))
    x = x / 255.0
    x = np.expand_dims(x, axis=-1)
    return x

def preprocess_data1(x, y):
    #size=256
    x = read_image(x)
    #x = cv2.resize(x, (size, size))
    x = x.astype(np.float32) #/ 255.0  # Normalize image
    y = read_mask(y)
    #y = cv2.resize(y, (size, size))
    y = y.astype(np.float32) #/ 255.0  # Normalize mask
    #y = np.expand_dims(y, axis=-1)  # Add channel dimension
    #y = np.concatenate([1 - y, y], axis=-1)  # Convert to binary representation
    return x, y


def load_data1(train_path, valid_path):
    train_x = sorted(glob(os.path.join(train_path, "images", "*.png")))
    train_y = sorted(glob(os.path.join(train_path, "labels", "*.png")))
    valid_x = sorted(glob(os.path.join(valid_path, "images", "*.png")))
    valid_y = sorted(glob(os.path.join(valid_path, "labels", "*.png")))
    train_data = [preprocess_data1(x, y) for x, y in zip(train_x, train_y)]
    valid_data = [preprocess_data1(x, y) for x, y in zip(valid_x, valid_y)]
    return train_data, valid_data

if __name__ == "__main__":
    train_path = "/content/drive/MyDrive/Colab Notebooks/DFU/Foot Ulcer Segmentation Challenge/train"
    valid_path = "/content/drive/MyDrive/Colab Notebooks/DFU/Foot Ulcer Segmentation Challenge/validation"
    model_path = "/content/drive/MyDrive/Colab Notebooks/DFU/Foot Ulcer Segmentation Challenge/model.h5"



##########METRICS######################
    smooth = 1e-15
    def dice_coef(y_true, y_pred):
        y_true = tf.keras.layers.Flatten()(y_true)
        y_pred = tf.keras.layers.Flatten()(y_pred)
        intersection = tf.reduce_sum(y_true * y_pred)
        return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

    def dice_loss(y_true, y_pred):
        return 1.0 - dice_coef(y_true, y_pred)

    def iou(y_true, y_pred):
        def f(y_true, y_pred):
            intersection = (y_true * y_pred).sum()
            union = y_true.sum() + y_pred.sum() - intersection
            x = (intersection + smooth) / (union + smooth)
            x = x.astype(np.float32)
            return x
        return tf.numpy_function(f, [y_true, y_pred], tf.float32)
    def bce_dice_loss(y_true, y_pred):
        return binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)

    def focal_loss(y_true, y_pred):
        alpha=0.25
        gamma=2
        def focal_loss_with_logits(logits, targets, alpha, gamma, y_pred):
            weight_a = alpha * (1 - y_pred) ** gamma * targets
            weight_b = (1 - alpha) * y_pred ** gamma * (1 - targets)
            return (tf.math.log1p(tf.exp(-tf.abs(logits))) + tf.nn.relu(-logits)) * (weight_a + weight_b) + logits * weight_b

        y_pred = tf.clip_by_value(y_pred, tf.keras.backend.epsilon(), 1 - tf.keras.backend.epsilon())
        logits = tf.math.log(y_pred / (1 - y_pred))
        loss = focal_loss_with_logits(logits=logits, targets=y_true, alpha=alpha, gamma=gamma, y_pred=y_pred)
        # or reduce_sum and/or axis=-1
        return tf.reduce_mean(loss)
##############
    batch_size = 10
    epochs = 20
    lr = 1e-4
    #shape = (256, 256, 3)

    arch = ResUnetPlusPlus(input_size=256)
    model1 = arch.build_model()
    metrics = [
        iou,dice_coef,focal_loss,dice_loss,bce_dice_loss,
        tf.keras.metrics.Recall(),
        tf.keras.metrics.Precision()
    ]

    train_data, valid_data = load_data1(train_path, valid_path)
    train_x, train_y = zip(*train_data)
    valid_x, valid_y = zip(*valid_data)


    # train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))
    # train_dataset = train_dataset.batch(batch_size)
    # valid_dataset = tf.data.Dataset.from_tensor_slices((valid_x, valid_y))
    # valid_dataset = valid_dataset.batch(batch_size)

###2nd
    train_dataset = tf.data.Dataset.from_generator(lambda: train_data,
                                              output_signature=(tf.TensorSpec(shape=(256, 256, 3), dtype=tf.float32),
                                                                tf.TensorSpec(shape=(256, 256, 1), dtype=tf.float32)))

    train_dataset = train_dataset.batch(batch_size)

    valid_dataset = tf.data.Dataset.from_generator(lambda: valid_data,
                                              output_signature=(tf.TensorSpec(shape=(256, 256, 3), dtype=tf.float32),
                                                                tf.TensorSpec(shape=(256, 256, 1), dtype=tf.float32)))


    valid_dataset = valid_dataset.batch(batch_size)

    ###3rd
    #train_dataset = tf.data.Dataset.from_tensor_slices((train_data["train_x"], train_data["train_y"]))
    # train_dataset = train_dataset.map(preprocess_data)
    #train_dataset = train_dataset.batch(batch_size)

    #valid_dataset = tf.data.Dataset.from_tensor_slices((valid_data["valid_x"], valid_data["valid_y"]))
    # valid_dataset = valid_dataset.map(preprocess_data)
    #valid_dataset = valid_dataset.batch(batch_size)



    model1.compile(loss=dice_loss, optimizer=Adam(lr), metrics=metrics)

    callbacks = [
        ModelCheckpoint(model_path),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=epochs*.3),
        TensorBoard(),
        EarlyStopping(monitor='val_loss', patience=epochs*.3, restore_best_weights=False)
    ]


    train_steps = len(train_x)//batch_size
    valid_steps = len(valid_x)//batch_size
    train_dataset = train_dataset.repeat()
    # if len(train_x) % batch_size != 0:
    #     train_steps += 1
    # if len(valid_x) % batch_size != 0:
    #     valid_steps += 1



In [None]:
    model1.fit(train_dataset,
        validation_data=valid_dataset,
        epochs=epochs,
        steps_per_epoch=train_steps,
        validation_steps=valid_steps,
        callbacks=callbacks)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x7f74206fb040>

In [None]:
###########
import cv2
import numpy as np
from google.colab.patches import cv2_imshow

def preprocess_test_image(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (256, 256))
    image = image.astype(np.float32) / 255.0
    return image

def postprocess_mask(mask):
    #mask = np.argmax(mask, axis=-1)

    #############
    mask=cv2.resize(mask,(256,256))
    mask = np.squeeze(mask)
    mask= mask>0.5
    #mask = [mask, mask]
    #mask = np.transpose(mask, (1, 2, 0))

    ############
    mask = mask.astype(np.uint8) * 255
    return mask

def predict_test_images(model, test_image_paths):
    predicted_masks = []
    for image_path in test_image_paths:
        image = preprocess_test_image(image_path)
        mask = model.predict(np.expand_dims(image, axis=0))[0]
        mask = postprocess_mask(mask)
        predicted_masks.append(mask)
    return predicted_masks

# Specify the path to your test images
test_path = "/content/drive/MyDrive/Colab Notebooks/DFU/Foot Ulcer Segmentation Challenge/test"

test_image_paths = sorted(glob(os.path.join(test_path, "images", "*.png")))

# Load the trained model
model_path = "/content/drive/MyDrive/Colab Notebooks/DFU/Foot Ulcer Segmentation Challenge/model.h5"
model = tf.keras.models.load_model(model_path, custom_objects={'dice_loss':dice_loss,'focal_loss':focal_loss,'bce_dice_loss':bce_dice_loss,'dice_coef': dice_coef, 'iou': iou})

# Generate predicted masks for test images
predicted_masks = predict_test_images(model, test_image_paths)

# Display the predicted masks
for i, mask in enumerate(predicted_masks):
    image_path = test_image_paths[i]
    image = cv2.imread(image_path)
    print("Test Image:")
    cv2_imshow(image)
    print("Predicted Mask:")
    cv2_imshow(mask)


In [None]:
##########validation folder#########
import cv2
import numpy as np
from glob import glob
import os
from google.colab.patches import cv2_imshow
import tensorflow as tf

def preprocess_test_image(image_path):
    image = cv2.imread(image_path, cv2.IMREAD_COLOR)
    image = cv2.resize(image, (256, 256))
    image = image.astype(np.float32) / 255.0
    return image

def preprocess_test_mask(mask_path):
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (256, 256))
    mask = mask.astype(np.float32) / 255.0
    mask = np.expand_dims(mask, axis=-1)
    return mask

def postprocess_mask(mask):
    mask = cv2.resize(mask, (256, 256))
    mask=mask>0.5

    #mask = np.argmax(mask, axis=-1)
    mask = np.squeeze(mask)
    mask = mask.astype(np.uint8) * 255
    mask = np.concatenate([1 - mask, mask], axis=-1)
    return mask

def predict_test_images(model, test_image_paths, test_mask_paths):
    predicted_masks = []
    for image_path, mask_path in zip(test_image_paths, test_mask_paths):
        image = preprocess_test_image(image_path)
        mask = preprocess_test_mask(mask_path)
        predicted_mask = model.predict(np.expand_dims(image, axis=0))[0]
        predicted_mask = postprocess_mask(predicted_mask)
        predicted_masks.append(predicted_mask)
    return predicted_masks

# Specify the path to your test folder containing images and masks
test_folder = "/content/drive/MyDrive/Foot Ulcer Segmentation Challenge/validation"

test_image_paths = sorted(glob(os.path.join(test_folder, "images", "*.png")))
test_mask_paths = sorted(glob(os.path.join(test_folder, "labels", "*.png")))

# Load the trained model
model_path = "/content/drive/MyDrive/Foot Ulcer Segmentation Challenge/model.h5"
model = tf.keras.models.load_model(model_path, custom_objects={'dice_loss': dice_loss, 'focal_loss': focal_loss, 'bce_dice_loss': bce_dice_loss, 'dice_coef': dice_coef, 'iou': iou})

# Generate predicted masks for test images
predicted_masks = predict_test_images(model, test_image_paths, test_mask_paths)

# Display the test images, corresponding masks, and predicted masks
for i in range(len(test_image_paths)):
    image_path = test_image_paths[i]
    mask_path = test_mask_paths[i]
    predicted_mask = predicted_masks[i]

    test_image = cv2.imread(image_path)
    test_mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

    print("Test Image:")
    cv2_imshow(test_image)
    print("Corresponding Mask:")
    cv2_imshow(test_mask)
    print("Predicted Mask:")
    cv2_imshow(predicted_mask)
