In [None]:
import os
import cv2
import tensorflow
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from keras.layers import Input, Conv2D, MaxPooling2D, Conv2DTranspose, concatenate, BatchNormalization, Activation, add
from keras.models import Model, model_from_json
from tensorflow.keras.optimizers import Adam
from keras.layers import ELU, LeakyReLU
from keras.utils import plot_model
from keras import backend as K
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
img_files = next(os.walk('/content/drive/MyDrive/Skin/skin/ISIC-2017_Training_Data/ISIC-2017_Training_Data'))[2]
msk_files = next(os.walk('/content/drive/MyDrive/Skin/skin/ISIC-2017_Training_Part1_GroundTruth/ISIC-2017_Training_Part1_GroundTruth'))[2]

img_files.sort()
msk_files.sort()

print(len(img_files))
print(len(msk_files))




X = []
Y = []

for img_fl in tqdm(img_files):
    if(img_fl.split('.')[-1]=='jpg'):


        img = cv2.imread('/content/drive/MyDrive/Skin/skin/ISIC-2017_Training_Data/ISIC-2017_Training_Data/{}'.format(img_fl), cv2.IMREAD_COLOR)
        resized_img = cv2.resize(img,(128,128), interpolation = cv2.INTER_CUBIC)

        X.append(resized_img)

        msk = cv2.imread('/content/drive/MyDrive/Skin/skin/ISIC-2017_Training_Part1_GroundTruth/ISIC-2017_Training_Part1_GroundTruth/{}'.format(img_fl.split('.')[0]+'_segmentation.png'), cv2.IMREAD_GRAYSCALE)
        resized_msk = cv2.resize(msk,(128,128), interpolation = cv2.INTER_CUBIC)

        Y.append(resized_msk)


2000
2000


 96%|█████████▌| 1924/2000 [27:58<02:19,  1.83s/it]

In [None]:
print(len(X))
print(len(Y))

X = np.array(X)
Y = np.array(Y)

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

Y_train = Y_train.reshape((Y_train.shape[0],Y_train.shape[1],Y_train.shape[2],1))
Y_test = Y_test.reshape((Y_test.shape[0],Y_test.shape[1],Y_test.shape[2],1))

X_train = X_train / 255
X_test = X_test / 255
Y_train = Y_train / 255
Y_test = Y_test / 255

Y_train = np.round(Y_train,0)
Y_test = np.round(Y_test,0)

print(X_train.shape)
print(Y_train.shape)
print(X_test.shape)
print(Y_test.shape)

In [None]:
X_train1, X_test_do, Y_train1, Y_test_d0 = train_test_split(X_train, Y_train, test_size=0.2, random_state=42)

In [None]:
print(X_train1.shape)
print(Y_train1.shape)
print(X_test_do.shape)
print(Y_test_d0.shape)

In [None]:
import tensorflow as tf
from sklearn.utils.extmath import cartesian
import math
def dice_coef(y_true, y_pred):
    smooth = 0.0
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    return (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)

def jacard(y_true, y_pred):

    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum ( y_true_f * y_pred_f)
    union = K.sum ( y_true_f + y_pred_f - y_true_f * y_pred_f)

    return intersection/union

def confusion(y_true, y_pred):
    smooth=1
    y_pred_pos = K.clip(y_pred, 0, 1)
    y_pred_neg = 1 - y_pred_pos
    y_pos = K.clip(y_true, 0, 1)
    y_neg = 1 - y_pos
    tp = K.sum(y_pos * y_pred_pos)
    fp = K.sum(y_neg * y_pred_pos)
    fn = K.sum(y_pos * y_pred_neg)
    prec = (tp + smooth)/(tp+fp+smooth)
    recall = (tp+smooth)/(tp+fn+smooth)
    return prec, recall

def tp(y_true, y_pred):
    smooth = 1
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_pos = K.round(K.clip(y_true, 0, 1))
    tp = (K.sum(y_pos * y_pred_pos) + smooth)/ (K.sum(y_pos) + smooth)
    return tp

def tn(y_true, y_pred):
    smooth = 1
    y_pred_pos = K.round(K.clip(y_pred, 0, 1))
    y_pred_neg = 1 - y_pred_pos
    y_pos = K.round(K.clip(y_true, 0, 1))
    y_neg = 1 - y_pos
    tn = (K.sum(y_neg * y_pred_neg) + smooth) / (K.sum(y_neg) + smooth )
    return tn
def cdist(A, B):
    """
    Computes the pairwise Euclidean distance matrix between two tensorflow matrices A & B, similiar to scikit-learn cdist.
    For example:
    A = [[1, 2],
         [3, 4]]
    B = [[1, 2],
         [3, 4]]
    should return:
        [[0, 2.82],
         [2.82, 0]]
    :param A: m_a x n matrix
    :param B: m_b x n matrix
    :return: euclidean distance matrix (m_a x m_b)
    """
    # squared norms of each row in A and B
    na = tf.reduce_sum(tf.square(A), 1)
    nb = tf.reduce_sum(tf.square(B), 1)

    # na as a row and nb as a co"lumn vectors
    na = tf.reshape(na, [-1, 1])
    nb = tf.reshape(nb, [1, -1])

    # return pairwise euclidead difference matrix
    D = tf.sqrt(tf.maximum(na - 2 * tf.matmul(A, B, False, True) + nb, 0.0))
    return D


def weighted_hausdorff_distance(w, h, alpha):
    all_img_locations = tf.convert_to_tensor(cartesian([np.arange(w), np.arange(h)]), dtype=tf.float32)
    max_dist = math.sqrt(w ** 2 + h ** 2)

    def hausdorff_loss(y_true, y_pred):
        def loss(y_true, y_pred):
            eps = 1e-6
            y_true = K.reshape(y_true, [w, h])
            gt_points = K.cast(tf.where(y_true > 0.5), dtype=tf.float32)
            num_gt_points = tf.shape(gt_points)[0]
            y_pred = K.flatten(y_pred)
            p = y_pred
            p_replicated = tf.squeeze(K.repeat(tf.expand_dims(p, axis=-1), num_gt_points))
            d_matrix = cdist(all_img_locations, gt_points)
            num_est_pts = tf.reduce_sum(p)
            term_1 = (1 / (num_est_pts + eps)) * K.sum(p * K.min(d_matrix, 1))

            d_div_p = K.min((d_matrix + eps) / (p_replicated ** alpha + (eps / max_dist)), 0)
            d_div_p = K.clip(d_div_p, 0, max_dist)
            term_2 = K.mean(d_div_p, axis=0)

            return term_1 + term_2

        batched_losses = tf.map_fn(lambda x:
                                   loss(x[0], x[1]),
                                   (y_true, y_pred),
                                   dtype=tf.float32)
        return K.mean(tf.stack(batched_losses))

    return hausdorff_loss

In [None]:
from keras_unet_collection import models, losses
model = models.r2_unet_2d((None, None, 3), [16, 32, 64, 128], n_labels=1,
                          stack_num_down=2, stack_num_up=1, recur_num=2,
                          activation='ReLU', output_activation='Sigmoid',
                          batch_norm=True, pool='max', unpool='nearest', name='r2unet')

In [None]:
!pip install segmentation_models

In [None]:
!pip install keras_unet_collection

In [None]:
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

In [None]:
from keras_unet_collection import losses

def hybrid_loss(y_true, y_pred):

    loss_focal = losses.focal_tversky(y_true, y_pred, alpha=0.5, gamma=4/3)
    loss_iou = losses.iou_seg(y_true, y_pred)

    # (x)
    loss_dice = losses.dice(y_true, y_pred)

    return loss_focal+loss_iou +loss_dice

In [None]:
from tensorflow.keras.optimizers import Adam
import tensorflow
import segmentation_models as sm
model.compile(loss = hybrid_loss, optimizer=Adam(lr = 1e-3), metrics=['accuracy',dice_coef,jacard, confusion,tp,tn,recall_m,precision_m])
#print(model_Unet.summary())
#sm.metrics.IOUScore(threshold=0.5), sm.metrics.FScore(threshold=0.5)


In [None]:
steps_per_epoch = len(X_train1)//16
val_steps_per_epoch = len(X_test)//16

In [None]:
from keras.callbacks import ModelCheckpoint, EarlyStopping, CSVLogger

#ModelCheckpoint callback saves a model at some interval.
filepath="/content/drive/MyDrive/Skin/paper/weights/weights-improvement-{epoch:02d}-{val_accuracy:.2f}.hdf5" #File name includes epoch and validation accuracy.
#Use Mode = max for accuracy and min for loss.
checkpoint = ModelCheckpoint(filepath, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

#https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/EarlyStopping
#early_stop = EarlyStopping(monitor='val_loss', patience=50, verbose=1)
#This callback will stop the training when there is no improvement in
# the validation loss for three consecutive epochs.

#CSVLogger logs epoch, acc, loss, val_acc, val_loss
log_csv = CSVLogger('/content/drive/MyDrive/Skin/paper/weights/Res_SKIN_logs.csv', separator=',', append=False)

callbacks_list = [checkpoint, log_csv]


In [None]:
def bn_act(x, act=True):
    x = keras.layers.BatchNormalization()(x)
    if act == True:
        x = keras.layers.Activation("relu")(x)
    return x

def conv_block(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    conv = bn_act(x)
    conv = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(conv)
    return conv

def stem(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    conv = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides)(x)
    conv = conv_block(conv, filters, kernel_size=kernel_size, padding=padding, strides=strides)

    shortcut = keras.layers.Conv2D(filters, kernel_size=(1, 1), padding=padding, strides=strides)(x)
    shortcut = bn_act(shortcut, act=False)

    output = keras.layers.Add()([conv, shortcut])
    return output

def residual_block(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    res = conv_block(x, filters, kernel_size=kernel_size, padding=padding, strides=strides)
    res = conv_block(res, filters, kernel_size=kernel_size, padding=padding, strides=1)

    shortcut = keras.layers.Conv2D(filters, kernel_size=(1, 1), padding=padding, strides=strides)(x)
    shortcut = bn_act(shortcut, act=False)

    output = keras.layers.Add()([shortcut, res])
    return output

def upsample_concat_block(x, xskip):
    u = keras.layers.UpSampling2D((2, 2))(x)
    c = keras.layers.Concatenate()([u, xskip])
    return c

def ResUNet():
    f = [32, 64, 128, 256,512,1024]
    inputs = keras.layers.Input((128,128, 3))

    ## Encoder
    e0 = inputs
    e1 = stem(e0, f[0])
    e2 = residual_block(e1, f[1], strides=2)
    e3 = residual_block(e2, f[2], strides=2)
    e4 = residual_block(e3, f[3], strides=2)
    e5 = residual_block(e4, f[4], strides=2)
    e6 = residual_block(e5, f[5], strides=2)


    ## Bridge
    b0 = conv_block(e6, f[5], strides=1)
    b1 = conv_block(b0, f[5], strides=1)

    ## Decoder
    u1 = upsample_concat_block(b1, e5)
    d1 = residual_block(u1, f[5])

    u2 = upsample_concat_block(d1, e4)
    d2 = residual_block(u2, f[4])

    u3 = upsample_concat_block(d2, e3)
    d3 = residual_block(u3, f[3])

    u4 = upsample_concat_block(d3, e2)
    d4 = residual_block(u4, f[2])

    u5 = upsample_concat_block(d4, e1)
    d5 = residual_block(u5, f[1])



    outputs = keras.layers.Conv2D(1, (1, 1), padding="same", activation="sigmoid")(d5)
    model = keras.models.Model(inputs, outputs)
    return model

# Model integration
model = models.Model(inputs, conv_final, name="ResUNet")



In [None]:
from datetime import datetime
batch_size=16
start1 = datetime.now()

Unet_history = model.fit(X_train1, Y_train1,
                    verbose=1,epochs=100,
                    steps_per_epoch=steps_per_epoch,
                    validation_data=(X_test, Y_test),
                    validation_steps=val_steps_per_epoch
                   )

stop1 = datetime.now()
#Execution time of the model
execution_time_Unet = stop1-start1
print("MADR execution time is: ", execution_time_Unet)
model.save('/content/drive/MyDrive/Skin/weight/consolidate/Res_SKIN_model.h5')

In [None]:
#model = model()
model.load_weights('/content/drive/MyDrive/Skin/weight/consolidate/Res_SKIN_model.h5') #Trained for 50 epochs and then additional 100

In [None]:
import random
from tensorflow.keras.utils import normalize
test_img_number = 122
test_img = X_test_do[test_img_number]
ground_truth=Y_test_d0[test_img_number]
plt.rcdefaults()
test_img_input=np.expand_dims(test_img, 0)
prediction = (model.predict(test_img_input)[0,:,:,0] > 0.5).astype(np.uint8)



In [None]:
seg=prediction
gt=ground_truth[:,:,0]
dice = (np.sum(seg[gt==1])*2.0 / (np.sum(seg) + np.sum(gt)))*100
dice=round(dice,4)
plt.title(f"Dice score of Att-Unet is {dice}",fontsize=12)
plt.imshow(prediction, cmap='gray')