In [None]:
!pip install keras-unet-collection

In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
from tensorflow.keras import backend as K
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision

In [None]:
Height = 320
Width = 320

""" Creating a directory """
def make_dir(file_path):
    if not os.path.exists(file_path):
        os.makedirs(file_path)

def shuffle_data(image, mask):
    image, mask = shuffle(image, mask, random_state=42)
    return image, mask
def data_loading(data_path, split=0.25):
    images = sorted(glob(os.path.join(data_path, "ISIC2018_Task1-2_Training_Input", "*.jpg")))
    masks = sorted(glob(os.path.join(data_path, "ISIC2018_Task1_Training_GroundTruth", "*.png")))

    split_ratio = int(len(images) * split)

    train_x, valid_x = train_test_split(images, test_size=split_ratio, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=split_ratio, random_state=42)

    train_x, test_x = train_test_split(train_x, test_size=split_ratio, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=split_ratio, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

def load_image(img_path):
    img_path = img_path.decode()
    img = cv2.imread(img_path, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (Width, Height))
    img = img/255.0
    img = img.astype(np.float32)
    return img

def load_mask(mask_path):
    mask_path = mask_path.decode()
    mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
    mask = cv2.resize(mask, (Width, Height))
    mask = mask/255.0
    mask = mask.astype(np.float32)
    mask = np.expand_dims(mask, axis=-1)
    return mask

def tf_parsing(image, mask):
    def _parse(image, mask):
        image = load_image(image)
        mask = load_mask(mask)
        return image, mask

    image, mask = tf.numpy_function(_parse, [image, mask], [tf.float32, tf.float32])
    image.set_shape([Height, Width, 3])
    mask.set_shape([Height, Width, 1])
    return image, mask

def TF_dataset(images, masks, batch_size=16):
    dataset = tf.data.Dataset.from_tensor_slices((images, masks))
    dataset = dataset.map(tf_parsing)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(10)
    return dataset

In [None]:
np.random.seed(42)
tf.random.set_seed(42)
make_dir("files")
batch_size = 4
lr = 1e-4
num_epochs = 60
savemodel_path = os.path.join("files", "model_u2net.h5")
data_path = os.path.join("files", "data.csv")

dataset_path = "../input"
train_path = os.path.join(dataset_path, "isic2018-challenge-task1-data-segmentation")

(Xtrain, Ytrain), (Xvalid, Yvalid), (Xtest, Ytest) = data_loading(train_path)
Xtrain, Ytrain = shuffle_data(Xtrain, Ytrain)

print(f"Train: {len(Xtrain)} - {len(Ytrain)}")
print(f"Valid: {len(Xvalid)} - {len(Yvalid)}")
print(f"Test: {len(Xtest)} - {len(Ytest)}")

train_dataset = TF_dataset(Xtrain, Ytrain, batch_size=batch_size)
valid_dataset = TF_dataset(Xvalid, Yvalid, batch_size=batch_size)

In [None]:
from keras_unet_collection import models

In [None]:
model = models.unet_2d((Height, Width, 3), [64, 128, 256, 512, 1024], n_labels=1,
                       stack_num_down=2, stack_num_up=2,
                    activation='ReLU', output_activation='Sigmoid',
                       batch_norm=True, pool='max', unpool='nearest', name='unet')

In [None]:
model.summary()

In [None]:
def iou_score(y_true, y_pred):
    def iou(y_true, y_pred):
        intersection = (y_true * y_pred).sum()
        union = y_true.sum() + y_pred.sum() - intersection
        iou_value = (intersection + 1e-15) / (union + 1e-15)
        iou_value = iou_value.astype(np.float32)
        return iou_value
    return tf.numpy_function(iou, [y_true, y_pred], tf.float32)

smooth = 1e-15
def dice_coeff(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)

def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [None]:
model.compile(loss= tf.keras.losses.BinaryCrossentropy(), optimizer=Adam(lr), metrics=[dice_coeff, iou_score, Recall(), Precision()])

callbacks = [
        ModelCheckpoint(savemodel_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        CSVLogger(data_path),
        EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False),
]
model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks
    )

In [None]:
from IPython.display import FileLink
FileLink(r'./files/model_unet.h5')