# UNet++ & Unet for microwell-DL

## Import Libraries

In [None]:
import pandas as pd
import numpy as np
import os
from os.path import join
import random
import tensorflow as tf
import cv2
from tqdm import tqdm
from time import time
import datetime
from tensorflow import keras
from keras import layers
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D, Concatenate
from keras.layers import Input, Add, Conv2DTranspose
from keras.models import Sequential, Model
from keras.applications import VGG16
from keras.optimizers import SGD, Adam
from keras.losses import SparseCategoricalCrossentropy, MeanSquaredError, BinaryCrossentropy
from keras.utils import plot_model
from keras import callbacks
from keras.callbacks import Callback
from keras.callbacks import ModelCheckpoint


from  matplotlib import pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline

# from IPython.display import clear_output
# from IPython.display import HTML
# from base64 import b64encode

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

### Optional(Colab): mount google drive to colab workspace

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Set yout own directory
os.chdir('/content/drive/MyDrive/*********/')

### Test tensorflow GPU usability

In [None]:
print(tf.config.list_physical_devices('GPU'))

# Dataloader

### Initialize Constants

In [None]:
# Initialize Constants
BATCH_SIZE = 15
BUFFER_SIZE = 1000
IMG_SIZE = 128
N_CHANNELS = 3
N_CLASSES = 1
SEED = 123

## Set Epochs **IMPORTANT**
EPOCHS = 300

#### function: load image from source directory and return a dictionary

In [None]:
# Function to load image from source directory and return a dictionary
def parse_image(img_path: str) -> dict:
    image = tf.io.read_file(img_path)
    image = tf.image.decode_png(image, channels=3)


    mask_path = tf.strings.regex_replace(img_path,"images", "masks")
    mask_path = tf.strings.regex_replace(mask_path, "well", "mask")
    mask = tf.io.read_file(mask_path)
    mask = tf.image.decode_png(mask, channels=3)


    bac_label = np.array([255, 255, 255])


    # Convert to mask to binary mask
    bac_label = np.array([255, 255, 255])
    mask = tf.experimental.numpy.all(mask == bac_label, axis = 2)
    mask = tf.cast(mask, tf.uint8)
    mask = tf.expand_dims(mask, axis=-1)

    return {'image': image, 'segmentation_mask': mask}




#### function: Image Normalization

In [None]:
# Tensorflow function to rescale images to [0, 1]
@tf.function
def normalize(input_image: tf.Tensor, input_mask: tf.Tensor) -> tuple:
    input_image = tf.cast(input_image, tf.float32) / 255.0
    return input_image, input_mask

#### function: Image Augmentation

In [None]:

# Tensorflow function to apply preprocessing transformations of training images
@tf.function
def load_image_train(datapoint: dict) -> tuple:
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.math.round(input_mask)

    if tf.random.uniform(()) > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

# Tensorflow function to preprocess validation images
@tf.function
def load_image_val(datapoint: dict) -> tuple:
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.math.round(input_mask)
    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

# Tensorflow function to preprocess testing images
@tf.function
def load_image_test(datapoint: dict) -> tuple:
# def load_image_test(datapoint: dict, IMG_SIZE: int = 128) -> tuple:
    input_image = tf.image.resize(datapoint['image'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.image.resize(datapoint['segmentation_mask'], (IMG_SIZE, IMG_SIZE))
    input_mask = tf.math.round(input_mask)
    input_image, input_mask = normalize(input_image, input_mask)
    return input_image, input_mask

#### function: display and save dataset sample

In [None]:
def display_sample(display_list):
    plt.figure(figsize=(18, 18))
    # fig =  plt.subplots()

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')
        plt.savefig("debug_test/" + 'display_sample_test.png', 
                    dpi=300, 
                    transparent=True, 
                    bbox_inches='tight'
                    )
        plt.show()

    return

### Preprocessing functions

In [None]:
def train_dataset_preprocess(train_dataset):
    # -- Train Dataset --#
    train_dataset = train_dataset.map(load_image_train, num_parallel_calls=tf.data.AUTOTUNE)
    train_dataset = train_dataset.shuffle(buffer_size=BUFFER_SIZE, seed=SEED)
    train_dataset = train_dataset.repeat()
    train_dataset = train_dataset.batch(BATCH_SIZE)
    train_dataset = train_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return train_dataset


def val_dataset_preprocess(val_dataset):
    # -- Validation Dataset --#
    val_dataset = val_dataset.map(load_image_test)
    val_dataset = val_dataset.repeat()
    val_dataset = val_dataset.batch(BATCH_SIZE)
    val_dataset = val_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return val_dataset



def test_dataset_preprocess(test_dataset):
    #-- Testing Dataset --#
    test_dataset = test_dataset.map(load_image_test)
    test_dataset = test_dataset.batch(BATCH_SIZE)
    test_dataset = test_dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    return test_dataset

#### function: Get dataset size of train, val, test set

In [None]:
def all_dataset_size(dataset_dir: str =  "./dataset"):
    image_dir = join(dataset_dir, "images")
    train_data_dir = image_dir

    # Number of training examples
    TRAINSET_SIZE = int(round(len(os.listdir(train_data_dir)) * 0.7))
    print(f"Number of Training Examples: {TRAINSET_SIZE}")
    VALIDSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.1)
    print(f"Number of Validation Examples: {VALIDSET_SIZE}")
    TESTSET_SIZE = int(len(os.listdir(train_data_dir)) - TRAINSET_SIZE - VALIDSET_SIZE)
    print(f"Number of Testing Examples: {TESTSET_SIZE}")

    dataset_size = {"TRAINSET_SIZE": TRAINSET_SIZE, 
                    "VALIDSET_SIZE": VALIDSET_SIZE, 
                    "TESTSET_SIZE": TESTSET_SIZE}
    return dataset_size

### function: get all datasets with images:masks pair and augmentations
prepare train/val/test dataset with preprocessing from given path

In [None]:
def get_dataset_all(dataset_dir: str =  "./dataset"):
    # Load directories
    # dataset_dir = "./dataset"
    image_dir = join(dataset_dir, "images")
    train_data_dir = image_dir

    # Number of training examples
    TRAINSET_SIZE = int(round(len(os.listdir(train_data_dir)) * 0.7))
    print(f"Number of Training Examples: {TRAINSET_SIZE}")
    VALIDSET_SIZE = int(len(os.listdir(train_data_dir)) * 0.1)
    print(f"Number of Validation Examples: {VALIDSET_SIZE}")
    TESTSET_SIZE = int(len(os.listdir(train_data_dir)) - TRAINSET_SIZE - VALIDSET_SIZE)
    print(f"Number of Testing Examples: {TESTSET_SIZE}")


    ### Generate dataset variables
    all_dataset = tf.data.Dataset.list_files(train_data_dir + "/*.png",  shuffle = False)
    all_dataset = all_dataset.shuffle(BUFFER_SIZE, seed=SEED, reshuffle_each_iteration=False)
    all_dataset = all_dataset.map(parse_image)


    train_dataset = all_dataset.take(TRAINSET_SIZE + VALIDSET_SIZE)
    val_dataset = train_dataset.skip(TRAINSET_SIZE)
    train_dataset = train_dataset.take(TRAINSET_SIZE)
    test_dataset = all_dataset.skip(TRAINSET_SIZE + VALIDSET_SIZE)


    train_dataset = train_dataset_preprocess(train_dataset)
    val_dataset = val_dataset_preprocess(val_dataset)
    test_dataset = test_dataset_preprocess(test_dataset)

    dataset = {"train": train_dataset, "val": val_dataset, "test": test_dataset}
    return dataset

## Main Dataloader cell:

In [None]:
print(tf.config.list_physical_devices('GPU'))


dataset = get_dataset_all("./dataset")


for image, mask in dataset['train'].take(1):
    sample_image, sample_mask = image, mask
display_sample([sample_image[0], sample_mask[0]])


# Models: UNet++ & U-Net

## UNet++ Architechture


In [None]:
def double_conv_block(x, n_filters):
   # Conv2D then ReLU activation
   x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   # Conv2D then ReLU activation
   x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   return x

def downsample_block(x, n_filters):
   f = double_conv_block(x, n_filters)
   p = layers.MaxPool2D(2)(f)
   p = layers.Dropout(0.5)(p)
   return f, p


def upsample_block_part1(x, n_filters):
   # upsample
   x = layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
   return x

def upsample_block_part2(x, n_filters):
   # dropout
   x = layers.Dropout(0.5)(x)
   # Conv2D twice with ReLU activation
   x = double_conv_block(x, n_filters)
   return x


def build_unetPP_model():
    # inputs
    inputs = layers.Input(shape=(128,128,3))

    # encoder: contracting path - downsample
    f00, p00 = downsample_block(inputs, 64)
    f10, p10 = downsample_block(p00, 128)
    f20, p20 = downsample_block(p10, 256)
    f30, p30 = downsample_block(p20, 512)
    bottleneck = double_conv_block(p30, 1024)

    # decoders: expanding path - upsample
    u01 = upsample_block_part1 (p00, 64)
    u01 = layers.concatenate ([u01, f00])
    u01 = upsample_block_part2 (u01, 64)

    u11 = upsample_block_part1 (p10, 128)
    u11 = layers.concatenate ([u11, f10])
    u11 = upsample_block_part2 (u11, 128)

    u21 = upsample_block_part1 (p20, 256)
    u21 = layers.concatenate ([u21, f20])
    u21 = upsample_block_part2 (u21, 256)

    u31 = upsample_block_part1 (bottleneck, 512)
    u31 = layers.concatenate ([u31, f30])
    u31 = upsample_block_part2 (u31, 512)

    u02 = upsample_block_part1 (u11, 64)
    u02 = layers.concatenate ([u02, f00, u01])
    u02 = upsample_block_part2 (u02, 64)

    u12 = upsample_block_part1 (u21, 128)
    u12 = layers.concatenate ([u12, f10, u11])
    u12 = upsample_block_part2 (u12, 128)

    u22 = upsample_block_part1 (u31, 256)
    u22 = layers.concatenate([u22, f20, u21])
    u22 = upsample_block_part2(u22, 256)

    u03 = upsample_block_part1 (u12, 64)
    u03 = layers.concatenate([u03, f00, u01, u02])
    u03 = upsample_block_part2(u03, 64)

    u13 = upsample_block_part1 (u22, 128)
    u13 = layers.concatenate([u13, f10, u11, u12])
    u13 = upsample_block_part2(u13, 128)

    u04 = upsample_block_part1 (u13, 64)
    u04 = layers.concatenate([u04, f00, u01, u02, u03])
    u04 = upsample_block_part2(u04, 64)

    # outputs
    outputs = layers.Conv2D(1, 1, padding="same", activation = "sigmoid")(u04)
    # unet model with Keras Functional API
    unetPP_model = tf.keras.Model(inputs, outputs, name="UNetPP")
    return unetPP_model





## U-Net Architechture

In [None]:

def double_conv_block(x, n_filters):
   # Conv2D then ReLU activation
   x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   # Conv2D then ReLU activation
   x = layers.Conv2D(n_filters, 3, padding = "same", activation = "relu", kernel_initializer = "he_normal")(x)
   return x

def downsample_block(x, n_filters):
   f = double_conv_block(x, n_filters)
   p = layers.MaxPool2D(2)(f)
   p = layers.Dropout(0.5)(p)
   return f, p


def upsample_block(x, conv_features, n_filters):
   # upsample
   x = layers.Conv2DTranspose(n_filters, 3, 2, padding="same")(x)
   # concatenate
   x = layers.concatenate([x, conv_features])
   # dropout
   x = layers.Dropout(0.5)(x)
   # Conv2D twice with ReLU activation
   x = double_conv_block(x, n_filters)
   return x


def build_unet_model():
    # inputs
    inputs = layers.Input(shape=(128,128,3))
    # encoder: contracting path - downsample
    # 1 - downsample
    f1, p1 = downsample_block(inputs, 64)
    # 2 - downsample
    f2, p2 = downsample_block(p1, 128)
    # 3 - downsample
    f3, p3 = downsample_block(p2, 256)
    # 4 - downsample
    f4, p4 = downsample_block(p3, 512)
    # 5 - bottleneck
    bottleneck = double_conv_block(p4, 1024)
    # decoder: expanding path - upsample
    # 6 - upsample
    u6 = upsample_block(bottleneck, f4, 512)
    # 7 - upsample
    u7 = upsample_block(u6, f3, 256)
    # 8 - upsample
    u8 = upsample_block(u7, f2, 128)
    # 9 - upsample
    u9 = upsample_block(u8, f1, 64)
    # outputs
    outputs = layers.Conv2D(1, 1, padding="same", activation = "sigmoid")(u9)
    # unet model with Keras Functional API
    unet_model = tf.keras.Model(inputs, outputs, name="U-Net")
    return unet_model

# Model Training

## Select the model type: UNet++ or U-Net

In [None]:
## Choose Which DL model to train and perform single-bac well identification
## Two options:
## Option 1: 'UNetPP' 
## Option 2: 'UNet'

model_name = 'UNetPP' 

### Set result folder & build model

In [None]:
if model_name == 'UNetPP':
    unetPP_folder = 'UNetPP_result/'
    result_folder = 'UNetPP_result/'
    model = build_unetPP_model()
    model.summary()
elif model_name == 'UNet':
    unetPP_folder = 'UNet_result/'
    result_folder = 'UNet_result/'
    model = build_unet_model()
    model.summary()
else:
    raise Exception("Sorry, you do not choose the available DL model type")

## Model configuration

In [None]:
## Optimization
opt = keras.optimizers.Adam(learning_rate=1e-4, clipvalue=0.5)
## metric: BinaryIoU
b_iou = tf.keras.metrics.BinaryIoU(
    target_class_ids=[1], threshold=0.5, name=None, dtype=None
)
## model compile
model.compile(loss=BinaryCrossentropy(), optimizer=opt,  metrics=[b_iou])

### Custom Callback: define class

In [None]:
class CustomHistory(Callback):
    def __init__(self):
        self.losses = []
        self.binary_io_u_values = []
        self.val_losses = []
        self.val_binary_io_u_values = []

    def on_epoch_end(self, epoch, logs=None):
        self.losses.append(logs.get('loss'))
        self.binary_io_u_values.append(logs.get('binary_io_u'))
        self.val_losses.append(logs.get('val_loss'))
        self.val_binary_io_u_values.append(logs.get('val_binary_io_u'))


### Setup custom callback in each iteration
1. Add loss and IoU (learning progress) to lists
2. save model checkpoint when val_loss improve
3. update tensorboard log file

In [None]:
custom_history = CustomHistory()

checkpoint = ModelCheckpoint(
    result_folder + 'best_' + model_name +'_model.keras', 
    save_weights_only=True, 
    monitor='val_loss', 
    verbose=1, 
    save_best_only=True, 
    mode='min')

callbacks_list = [
    custom_history, 
    checkpoint,
    callbacks.TensorBoard(result_folder + 'log/', histogram_freq = -1)  
    ]

### Set hyperparameters



In [None]:
## Set Epochs **IMPORTANT**
EPOCHS = 300

## Set Variables
dataset_size = all_dataset_size(dataset_dir = "./dataset")
TRAINSET_SIZE = dataset_size['TRAINSET_SIZE']
VALIDSET_SIZE = dataset_size['VALIDSET_SIZE']
TESTSET_SIZE = dataset_size['TESTSET_SIZE']

STEPS_PER_EPOCH = TRAINSET_SIZE // BATCH_SIZE
VALIDATION_STEPS = VALIDSET_SIZE // BATCH_SIZE
# print(TRAINSET_SIZE)
# print(VALIDSET_SIZE)
# print(BATCH_SIZE)
# print(STEPS_PER_EPOCH)
# print(VALIDATION_STEPS)

## Training Process

In [None]:
initial_time = time()


# Model Training
model.fit(dataset['train'], epochs=EPOCHS,
          steps_per_epoch=STEPS_PER_EPOCH,
          validation_data=dataset['val'],
          validation_steps=VALIDATION_STEPS,
          callbacks=callbacks_list)


print('Finished Unet++ Training')
print('Training time', time() - initial_time)
model.save_weights(result_folder + f'epoch300_{model_name}_model.keras')

## (Optional) Validation set  show

In [None]:
#### Repeat for safety and reabiility #####################################################
# Function to view the images from the directory
def display_sample(display_list):
    plt.figure(figsize=(18, 18))

    title = ['Input Image', 'True Mask', 'Predict Map', 'Predict Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')

    plt.show()

# Function to create a mask out of network prediction
def create_mask(pred_mask: tf.Tensor) -> tf.Tensor:
    # Round to closest
    pred_mask = tf.math.round(pred_mask)

    # [IMG_SIZE, IMG_SIZE] -> [IMG_SIZE, IMG_SIZE, 1]
    pred_mask = tf.expand_dims(pred_mask, axis=-1)
    return pred_mask



# Function to show predictions
def show_validation(dataset=None, num=1):
    if dataset:
        # Predict and show image from input dataset
        for image, mask in dataset.take(num):
            pred_mask = model.predict(image)
            display_sample([image[0], mask, pred_mask, create_mask(pred_mask)])

            img = tf.keras.preprocessing.image.array_to_img(image[0])
            img.save(result_folder + f'outputs/validate_{num}_well.png')

            true_mask_img = tf.keras.preprocessing.image.array_to_img(mask)
            true_mask_img.save(result_folder + f'outputs/validate_{num}_true_mask.png')

            pred_map_img = tf.keras.preprocessing.image.array_to_img(pred_mask)
            pred_map_img.save(result_folder + f'outputs/validate_{num}_predict_map.png')

            pred_mask_img = tf.keras.preprocessing.image.array_to_img(create_mask(pred_mask))
            pred_mask_img.save(result_folder + f'outputs/validate_{num}_predict_mask.png')


    else:
        # Predict and show the sample image
        inference = model.predict(sample_image)
        display_sample([sample_image[0], sample_mask[0],
                        inference[0]])

        img = tf.keras.preprocessing.image.array_to_img(sample_image[0])
        img.save(result_folder + f'outputs/validate_{num}_well.png')

        true_mask_img = tf.keras.preprocessing.image.array_to_img(sample_mask[0])
        true_mask_img.save(result_folder + f'outputs/validate_{num}_true_mask.png')

        pred_mask_img = tf.keras.preprocessing.image.array_to_img(inference[0])
        pred_mask_img.save(result_folder + f'outputs/validate_{num}_predict_mask.png')


## Show validation image, true mask & predict mask
show_validation(dataset['test'], 20)

## Learning Curve

### Save Losses & IoU records in csv format

In [None]:
# Training Loss curve: Dataframe & Save CSV
training_loss = custom_history.losses
epochs_list = list(range(1, len(training_loss) + 1))
df_tloss = pd.DataFrame({"Epochs": epochs_list, "train_loss" : training_loss})
df_tloss.to_csv(unetPP_folder +'training_loss.csv', index=False)

# Validate Loss curve: Dataframe & Save CSV

val_losses = custom_history.val_losses
df_vloss = pd.DataFrame({"Epochs": epochs_list, "validate_loss" : val_losses})
df_vloss.to_csv(unetPP_folder + 'validate_loss.csv', index=False)


# Training binaryIOU curve: Dataframe & Save CSV
binary_io_u_values = custom_history.binary_io_u_values
# print(binary_io_u_values)
df_tbiou = pd.DataFrame({"Epochs": epochs_list, "train_bIOU" : binary_io_u_values})
df_tbiou.to_csv(unetPP_folder + 'training_bIOU.csv', index=False)


# Validation binaryIOU curve: Dataframe & Save CSV
val_binary_io_u_values = custom_history.val_binary_io_u_values
# print(val_binary_io_u_values)
df_vbiou = pd.DataFrame({"Epochs": epochs_list, "val_bIOU" : val_binary_io_u_values})
df_vbiou.to_csv(unetPP_folder + 'validate_bIOU.csv', index=False)

### Plot Learning curve figure

In [None]:

plt.rcParams['font.size'] = 8
plt.rcParams['figure.dpi'] = 300


fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('Epochs', fontsize=12)
ax1.set_ylabel('Loss', color=color, fontsize=12)
ax1.plot(df_tloss["Epochs"], df_tloss["train_loss"], '-', label='Training Loss', color=color)
ax1.plot(df_vloss["Epochs"], df_vloss["validate_loss"], '-', alpha = 0.5, label='Validation Loss', color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax1.set_ylim([-0.01, 0.13])
ax1.set_box_aspect(0.75)



ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
color = 'tab:blue'
ax2.set_ylabel('IoU', color=color, fontsize=12)  # we already handled the x-label with ax1
ax2.plot(df_tbiou["Epochs"], df_tbiou["train_bIOU"], '-', label='Training IoU', color=color)
ax2.plot(df_vbiou["Epochs"], df_vbiou["val_bIOU"], '-', alpha = 0.5, label='Validation IoU', color=color)
ax2.tick_params(axis='y', labelcolor=color)

ax2.set_ylim([-0.1, 0.9])
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc=9)


plt.show()

fig.savefig(result_folder + 'learning_curve.png')

# Model Testing 

###  Load checkpoint model weights at epoch300

In [None]:
## model type
if model_name == 'UNetPP':
    model = build_unetPP_model()
    # result_folder = 'UNetPP_result/'
elif model_name == 'UNet':
    model = build_unet_model()
    # result_folder = 'UNet_result/'
else:
    raise Exception("Sorry, you do not choose the available DL model type")


## compile model
opt = keras.optimizers.Adam(learning_rate=1e-4, clipvalue=0.5)
m_iou = tf.keras.metrics.BinaryIoU(
    target_class_ids=[1], threshold=0.5, name=None, dtype=None
)
model.compile(loss=BinaryCrossentropy(), optimizer=opt,  metrics=[m_iou])

## load checkpoint model weight
model.load_weights(result_folder + f'epoch300_{model_name}_model.keras')
model.summary()

### Calculate Time of test-set prediction

In [None]:
initial_time = time()

for image, mask in dataset['test']:
    pred_mask = model.predict(image)

print(f'Finished {model_name} Testing')
print('Testing time', time() - initial_time)

# Single_Bac Microwell identification

## Microwell bacteria-count inference

### utils functions

In [None]:
# Function to calculate mask over image
def weighted_img(img, initial_img, α=1., β=0.5, γ=0.):
    return cv2.addWeighted(initial_img, α, img, β, γ)

# Function to process an individual image and it's mask
def process_image_mask(image, mask):
    # Round to closest
    mask = tf.math.round(mask)

    # Convert to mask image
    zero_image = np.zeros_like(mask)
    mask = np.dstack((mask, zero_image, zero_image))
    mask = np.asarray(mask, np.float32)

    # Convert to image image
    image = np.asarray(image, np.float32)

    # Get the final image
    final_image = weighted_img(mask, image)

    return final_image

In [None]:
### Function to calculate bacteria number inside microwell
def bacteria_count(mask):
    mask = mask = tf.math.round(mask)
    mask_img = tf.keras.preprocessing.image.array_to_img(mask)
    img2 = cv2.cvtColor(np.asarray(mask_img), cv2.COLOR_RGB2BGR)
    gray = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
    cnts = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cnts = cnts[0] if len(cnts) == 2 else cnts[1]
    bacNum = len(cnts)
    return bacNum

In [None]:
# Function to save the images as a plot
def save_predict_sample(display_list, index, result_folder):
    plt.figure(figsize=(18, 18))

    title = ['Input Image', 'True Mask', 'Predicted Mask']

    for i in range(len(display_list)):
        plt.subplot(1, len(display_list), i+1)
        plt.title(title[i])
        plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
        plt.axis('off')

    plt.savefig(result_folder + f'outputs/{index}.png')
    plt.show()


    image_array = display_list[0]
    true_mask_array = display_list[1]
    pred_mask_array = display_list[2]

    img = tf.keras.preprocessing.image.array_to_img(image)
    img.save(result_folder + f'outputs/{index}_well.png')
    true_mask_img = tf.keras.preprocessing.image.array_to_img(true_mask_array)
    true_mask_img.save(result_folder + f'outputs/{index}_true.png')
    pred_mask_img = tf.keras.preprocessing.image.array_to_img(pred_mask_array)
    pred_mask_img.save(result_folder + f'outputs/{index}_predict.png')
     

### Get Testset images bacteria count prediction

In [None]:
# Function to save predictions
def get_predictions(dataset, result_folder):
    # Predict and save image the from input dataset
    True_Counts = []
    Pred_Counts = []
    index = 0
    for batch_image, batch_mask in dataset:
        for image, mask in zip(batch_image, batch_mask):
            print(f"Processing image : {index}")
            pred_mask = model.predict(tf.expand_dims(image, axis = 0))

            true_bacNum = bacteria_count(mask)
            True_Counts.append(true_bacNum)
            print("True Bac-Num:    ", true_bacNum, "\n")

            pred_bacNum = bacteria_count(pred_mask[0])
            Pred_Counts.append(pred_bacNum)
            print("Predict Bac-Num: ", pred_bacNum, "\n")

            display_list = [image, 
                            process_image_mask(image, mask), 
                            process_image_mask(image, pred_mask[0])
                            ]
            
            save_predict_sample(display_list, 
                                index, 
                                result_folder)

            index += 1


    num_result = pd.DataFrame({'True_Number': True_Counts, 
                               'Predict_Number': Pred_Counts})
    num_result.to_csv(
                result_folder + 'Number_Result.csv', 
                index= False , 
                header = True
                )

    return True_Counts, Pred_Counts
    

In [None]:
True_Counts, Pred_Counts = get_predictions(dataset['test'])

### Bacteria-counts accuracy

In [None]:
def acc_calculator(predict, label):
    total = len(Pred_Counts)
    correct = 0
    for i in range(total):
        if (predict[i] == label[i]):
            correct += 1


    return round(100 * correct / total, 2)

In [None]:
num_acc = acc_calculator(Pred_Counts, True_Counts)
print("Counts Accuracy: ", num_acc)

num_acc_df = pd.DataFrame({'Number_Accuracy': [num_acc]})
num_acc_df.to_csv(result_folder + 'Number_Accuracy.csv', index= False , header = True)

### Bacteria-counts confusion matrix

In [None]:
## Generate Confusion Matrix
def get_confusion_matrix(True_Counts, Pred_Counts, model_name, result_folder):

    conf = confusion_matrix(True_Counts, Pred_Counts, normalize='false')
    nor_conf = confusion_matrix(True_Counts, Pred_Counts,normalize='true')

    conf_df = pd.DataFrame(conf)
    conf_df.to_csv(result_folder + 'Number_Conf.csv', 
                  index= False , 
                  header = False)

    nor_conf_df = pd.DataFrame(nor_conf)  
    nor_conf_df.to_csv(result_folder + 'Number_Conf_Nor.csv', 
                       index= False , 
                       header = False)

    return conf, nor_conf

In [None]:
def plot_confusion_matrix(conf, nor_conf, model_name, result_folder):
    plt.rcParams['font.size'] = 8
    plt.rcParams['figure.dpi'] = 300

    # Number confusion matrix
    conf = np.around(conf, 2)
    conf_disp = ConfusionMatrixDisplay(confusion_matrix=conf)
    conf_disp.plot(cmap ='gist_yarg', colorbar=False)
    plt.xlabel('Predicted Number', fontsize=12)
    plt.ylabel('True Number', fontsize=12)

    plt.savefig(result_folder + 'Number_Conf.png',
                 dpi=300, 
                 transparent=True, 
                 bbox_inches='tight')
    
    # Normalizaed confusion matrix
        
    nor_conf = np.around(nor_conf, 2)
    nor_disp = ConfusionMatrixDisplay(confusion_matrix=nor_conf)
    nor_disp.plot(cmap ='gist_yarg', colorbar=False)
    plt.xlabel('Predicted Number', fontsize=12)
    plt.ylabel('True Number', fontsize=12)

    plt.savefig(result_folder + 'Number_Conf_Nor.png', 
                dpi=300, 
                transparent=True, 
                bbox_inches='tight')

    

In [None]:
conf, nor_conf = get_confusion_matrix(
    True_Counts, Pred_Counts, model_name, result_folder)

plot_confusion_matrix(conf, nor_conf, model_name, result_folder)


## Microwell Single-bacterium inference

### convert bacteria-count to binary single-bacterium class

In [None]:
def convert_singlebac(True_Counts, Pred_Counts, result_folder):
    true_single = np.array(True_Counts)
    true_single[true_single != 1] = 0

    pred_single = np.array(Pred_Counts)
    pred_single[pred_single != 1] = 0

    single_result = pd.DataFrame(
        {'True_Single': true_single, 'Pred_Single': pred_single})
    
    single_result.to_csv(result_folder + 'Single_Result.csv', 
                         index= False , 
                         header = True)
    
    return true_single, pred_single

In [None]:
true_single, pred_single = convert_singlebac(
                                    True_Counts, 
                                    Pred_Counts, 
                                    result_folder
                                    )

### Single-Bacterium accuracy

In [None]:
def acc_calculator(predict, label):
    total = len(Pred_Counts)
    correct = 0
    for i in range(total):
        if (predict[i] == label[i]):
            correct += 1


    return round(100 * correct / total, 2)

In [None]:
acc_single = acc_calculator(pred_single, true_single)
print('Single_Accuracy: ', acc_single)

acc_single_df = pd.DataFrame({'Single_Accuracy': [acc_single]})
acc_single_df.to_csv(result_folder + 'Single_Accuracy.csv', 
                     index= False , 
                     header = True)

### Single-Bacterium confusion matrix

In [None]:
## Generate Single-Bacterium Confusion Matrix
def get_singlebac_confusion_matrix(true_single, pred_single, model_name, result_folder):

    conf = confusion_matrix(true_single, pred_single, normalize='false')
    nor_conf = confusion_matrix(true_single, pred_single, normalize='true')

    conf_df = pd.DataFrame(conf)
    conf_df.to_csv(result_folder + 'Single_Conf.csv', 
                  index= False , 
                  header = False)

    nor_conf_df = pd.DataFrame(nor_conf)  
    nor_conf_df.to_csv(result_folder + 'Single_Conf_Nor.csv', 
                       index= False , 
                       header = False)

    return conf, nor_conf

In [None]:
def plot_singlebac_confusion_matrix(conf, nor_conf, model_name, result_folder):
    plt.rcParams['font.size'] = 8
    plt.rcParams['figure.dpi'] = 300
    singleBac_labels = np.array(['Non-Single', 'Single-Bacterium'])

    ## Plot Single-Bac confusion matrix 
    disp = ConfusionMatrixDisplay(confusion_matrix=conf, display_labels = singleBac_labels)
    disp.plot(cmap ='gist_yarg', colorbar=False)
    plt.xticks(fontsize = 10)
    plt.yticks(rotation=90, ha='right', fontsize = 10, 
               rotation_mode='default', va="center")
    plt.xlabel('Prediction', fontsize=12)
    plt.ylabel('True', fontsize=12)
    plt.savefig(result_folder + 'Single_Conf.png',
                 dpi=300, transparent=True, bbox_inches='tight')

    ## Plot Normalized Single-Bac confusion matrix 
    nor_conf = np.around(nor_conf, 2)
    nor_disp = ConfusionMatrixDisplay(confusion_matrix=nor_conf, display_labels = singleBac_labels)
    nor_disp.plot(cmap ='gist_yarg', colorbar=False)
    plt.xticks(fontsize = 10)
    plt.yticks(rotation=90, ha='right', fontsize = 10, 
               rotation_mode='default', va="center")
    plt.xlabel('Prediction', fontsize=12)
    plt.ylabel('True', fontsize=12)
    plt.savefig(result_folder + 'Single_Conf_Nor.png', 
                dpi=300, transparent=True, bbox_inches='tight')



In [None]:
conf, nor_conf = get_singlebac_confusion_matrix(
    true_single, pred_single, model_name, result_folder)

plot_singlebac_confusion_matrix(conf, nor_conf, model_name, result_folder)
