## Import necessary packages

In [None]:
import os
import random
import numpy as np
import glob
import matplotlib.pyplot as plt
plt.style.use("ggplot")
%matplotlib inline

from sklearn.model_selection import KFold, train_test_split
import cv2

import tensorflow as tf

from keras.backend import clear_session
from keras import backend as K
from keras.models import Model, load_model
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.applications import VGG16 
from keras.layers import Input, BatchNormalization, Activation, Dropout
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau, CSVLogger
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator, img_to_array

#unet collection
from keras_unet_collection import models
#import tensorflow as tf
from datetime import datetime 
from PIL import Image

## General part

In [None]:
#define directory where images and masks are located on local disk
image_directory = 'D:/UniBas/Bachelorarbeit/Img_masks/DeepACSA_images_RF/insert_images'
mask_directory = 'D:/UniBas/Bachelorarbeit/Img_masks/DeepACSA_images_RF/insert_masks/'

#define the properties and empty list for resized images and masks
SIZE = 256
image_dataset = []
mask_dataset = []

#define custom function
def IoU(y_true, y_pred, smooth=1):
    intersection = K.sum(K.abs(y_true * y_pred), axis=-1)
    union = K.sum(y_true,-1) + K.sum(y_pred,-1) - intersection
    iou = (intersection + smooth) / ( union + smooth)
    return iou

#enumerate and resize images/masks
images = os.listdir(image_directory)
for i, image_name in enumerate(images):    #enumerate method adds a counter and returns the enumerate object
    if (image_name.split('.')[1] == 'tif'):
        #print(image_directory+image_name)
        image = cv2.imread(image_directory+image_name, 1)
        image = Image.fromarray(image)
        image = image.resize((SIZE, SIZE))
        image_dataset.append(np.array(image))

masks = os.listdir(mask_directory)
for i, image_name in enumerate(masks):
    if (image_name.split('.')[1] == 'tif'):
        image = cv2.imread(mask_directory+image_name, 0)
        image = Image.fromarray(image)
        image = image.resize((SIZE, SIZE))
        mask_dataset.append(np.array(image))

#define some hyperparameters
num_labels = 1  #Binary classificaion
batch_size = 2  #keep it smaller than 3
epochs = 60
num_folds = 5   #define the number of folds (usually 5-10 folds)

#normalize images
image_dataset = np.array(image_dataset)/255
#do not normalize masks, just rescale to 0 to 1. Add RGB-Chanel (3) to mask.
mask_dataset = np.expand_dims((np.array(mask_dataset)),3) /255

#define the K-fold Cross Validator
kfold = KFold(n_splits=num_folds, shuffle=False)

#define per-fold score containers 
acc_per_fold = []
loss_per_fold = []
IoU_per_fold = []

fold_no = 1

## DL-Kfold with unet_2plus

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):

  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'K-foldno{fold_no}-Unet-2d-plus-V3-VL-256.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'K-foldno{fold_no}-Unet-2d-plus-V3-VL-256.csv', separator=',', append=False)]

  #define the model architecture
  #unet_plus_2d requires a Backbone
  model = models.unet_plus_2d((256, 256, 3), filter_num=[64, 128, 256, 512, 1024], 
                           n_labels=num_labels, 
                           stack_num_down=2, stack_num_up=2, 
                           activation='ReLU', 
                           output_activation='Sigmoid', 
                           batch_norm=False, pool=False, unpool=False, 
                           backbone='VGG16', weights='imagenet', 
                           freeze_backbone=True, freeze_batch_norm=True, 
                           name='unet_plus')

  #compile the model
  model.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  #generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  #fit model to data
  Unet_plus_history = model.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)
  
  #append evaluation values for every fold to a list
  acc_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[2])

  #increase fold number
  fold_no += 1

## DL-Kfold with unet_3plus

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):

  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'B1GC-Kfoldno{fold_no}-unet3plus.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'B1GC-Kfoldno{fold_no}-unet3plus.csv', separator=',', append=False)]

  # Define the model architecture
  # unet_plus_2d require depth >= 2
  model = models.unet_3plus_2d((256, 256, 3), n_labels=num_labels, filter_num_down=[64, 128, 256, 512, 1024], 
                             filter_num_skip='auto', filter_num_aggregate='auto', 
                             stack_num_down=2, stack_num_up=2, activation='ReLU', output_activation='Sigmoid',
                             batch_norm=True, pool=True, unpool=False, deep_supervision=False, name='unet3plus')
  # Compile the model
  model.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  Unet_plus_history = model.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)
  
  #append evaluation values for every fold to a list
  acc_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[2])

  # Increase fold number
  fold_no += 1

## DL-Kfold with Trans_unet

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):
  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'B1RF-Kfoldno{fold_no}-transunet.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'B1RF-Kfoldno{fold_no}-transunet.csv', separator=',', append=False)]
  
  #define the model architecture
  model = models.transunet_2d((256, 256, 3), filter_num=[64, 128, 256, 512, 1024], 
                          n_labels=num_labels, stack_num_down=2, stack_num_up=2, 
                          embed_dim=768, num_mlp=3072, num_heads=12, num_transformer=12, 
                          activation='ReLU', mlp_activation='GELU', output_activation='Sigmoid', #output activation from Softmax to Sigmoid
                          batch_norm=True, pool=True, unpool=False, name='transunet')
                          #batchnorm to true, unpool to false

  #compile the model
  model.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  #generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  #fit model to data
  transunet_history = model.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)

  #append evaluation values for every fold to a list
  acc_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[2])

  #increase fold number
  fold_no += 1

## DL-Kfold with Swin_unet

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):
  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'K-foldno{fold_no}-swinunet-V1-VL-256.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'K-foldno{fold_no}-swinunet-V1-VL-256.csv', separator=',', append=False)]
  
  #define the model architecture
  #this model requires depth >= 2
  model = models.swin_unet_2d((256, 256, 3), filter_num_begin=64, n_labels=num_labels, depth=4, stack_num_down=2, stack_num_up=2, 
                            patch_size=(2, 2), num_heads=[4, 8, 8, 8], window_size=[4, 2, 2, 2], num_mlp=512, 
                            output_activation='Softmax', shift_window=False, name='swin_unet') #Guess: Shift_window = False

  #compile the model
  model.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  #generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  #fit model on data
  swin_unet_history = model.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)

  #append evaluation values for every fold to a list
  acc_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[2])

  #increase fold number
  fold_no += 1

## DL-Kfold with ResUnet

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):
  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'K-foldno{fold_no}-resunet-VL-256.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'K-foldno{fold_no}-resunet-VL-256.csv', separator=',', append=False)]

  # Define the model architecture
  # resunet requires depth >= 2
  resunet = models.resunet_a_2d((256, 256, 3), [64, 128, 256, 512], 
                            dilation_num=[1, 3, 15, 31], 
                            n_labels=2, aspp_num_down=256, aspp_num_up=128, 
                            activation='ReLU', output_activation='Softmax', 
                            batch_norm=True, pool="max", unpool='nearest', name='resunet')

  # Compile the model
  resunet.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  # Generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  # Fit data to model
  resunet_history = resunet.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)

  #append evaluation values for every fold to a list
  acc_per_fold.append(resunet.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(resunet.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(resunet.evaluate(image_dataset[test], mask_dataset[test])[2])

  # Increase fold number
  fold_no += 1

## DL-Kfold with r2_unet

In [None]:
for train, test in kfold.split(image_dataset, mask_dataset):
  callbacks = [
    EarlyStopping(patience=8, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=10, min_lr=0.00001, verbose=1),
    ModelCheckpoint(f'K-foldno{fold_no}-model_r2-VL-256.h5', verbose=1, save_best_only=True, save_weights_only=False), # Give the model a name (the .h5 part)
    CSVLogger(f'K-foldno{fold_no}-model_r2-VL-256.csv', separator=',', append=False)]                                  # Give the CSV file a name (.csv)

  #define the model architecture
  #r2_unet_2d requires depth >= 2
  model = models.r2_unet_2d((256, 256, 3), [64, 128, 256, 512], n_labels=num_labels,
                          stack_num_down=2, stack_num_up=1, recur_num=2,
                          activation='ReLU', output_activation='Softmax', 
                          batch_norm=True, pool='max', unpool='nearest', name='r2unet')

  #compile the model
  model.compile(loss='binary_crossentropy', optimizer=Adam(lr = 1e-3), 
              metrics=['accuracy', IoU])

  #generate a print
  print('------------------------------------------------------------------------')
  print(f'Training for fold {fold_no} ...')

  #fit model on data
  resunet_history = model.fit(image_dataset[train], mask_dataset[train], 
                    verbose=1,
                    batch_size = batch_size,
                    validation_data=(image_dataset[test], mask_dataset[test]), 
                    shuffle=False,
                    epochs=epochs,
                    callbacks=callbacks)

  #append evaluation values for every fold to a list
  acc_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[1])
  loss_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[0])
  IoU_per_fold.append(model.evaluate(image_dataset[test], mask_dataset[test])[2])

  #increase fold number
  fold_no += 1

## Provide average scores for K-fold cross validation

In [None]:
# == Provide average scores ==
print('------------------------------------------------------------------------')
print('Score per fold')
for i in range(0, len(acc_per_fold)):
  print('------------------------------------------------------------------------')
  print(f'> Fold {i+1} - Loss: {loss_per_fold[i]} - Accuracy: {acc_per_fold[i]} - IoU: {IoU_per_fold[i]}%')
print('------------------------------------------------------------------------')
print('Average scores for all folds:')
print(f'> Accuracy: {np.mean(acc_per_fold)} (+- {np.std(acc_per_fold)})')
print(f'> Loss: {np.mean(loss_per_fold)}')
print(f'> IoU: {np.mean(IoU_per_fold)}')
print('------------------------------------------------------------------------')