# Bacteria cell segmentation : practical application of a Unet

With this example, you will a very simple application of a Unet for semantic segmentation. The data are 2d images of bacteria, acquired using brightfield imaging modality. 

Data were kindly provided by Caroline Clerté from the Centre of Biologie Structurale, Montpellier (FRANCE). 

This notebook was inspired from the EMBL notebooks from the Kreshuk lab (https://github.com/kreshuklab/teaching-dl-course-2020/).

## I - Data importation

The data are 2d 16bit images saved in the tif format on a google drive. All images are paired :
- one raw image 
- one labelled image displaying three pixel values : 0 = background ; 1 = cell ; 2 = cell contour

The first step is to load all the python packages we will use in the notebook:

In [None]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

import os
import sys
import random
import numpy as np
import matplotlib.pyplot as plt

from glob import glob
from tqdm import tqdm
from skimage.io import imread, imshow, imread_collection, concatenate_images, imsave

from skimage.measure import label, regionprops

from keras.models import Model, load_model
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D
from keras.layers import Concatenate, RandomFlip, RandomContrast
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K
from keras.layers import Input, BatchNormalization, Activation, Dense, Dropout
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import normalize

from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder

In [None]:
from google.colab import drive
drive.mount('/content/drive')

All the images are 128 x 128 pixels and are already sorted and are already sorted in two separate folders:
- one for the training data (111 images)
- one for the test data (19 images)

No validation set was available for this set of data. 

Note that **the path to the different folders will need to be updated** accordingly.

In [None]:
IMG_WIDTH = 128
IMG_HEIGHT = 128
IMG_CHANNELS = 1
TRAIN_PATH = '/content/drive/MyDrive/Deep_learning_formation_MRI/Doc_JB_2022/Notebooks for workshop /Data/Bacteria_segmentation/128x128_dataset/training_data'
TEST_PATH = '/content/drive/MyDrive/Deep_learning_formation_MRI/Doc_JB_2022/Notebooks for workshop /Data/Bacteria_segmentation/128x128_dataset/testing_data'

The following method **get_data** is used to download the images and convert them to the right format (according to the parameters defined above).

In [None]:
def get_data(data_path, mask_path=None):

  # look for all the tif files in the selected folder
  imgs = sorted(glob(data_path + os.sep + '*.tif'))
  X = np.zeros((len(imgs), IMG_HEIGHT, IMG_WIDTH))
  
  # load the raw data and save them in array X
  print('Loading the raw images ... ')
  for n, element in tqdm(enumerate(imgs), total = len(imgs)):
    img = imread(element)
    X[n,:,:] = img
    
  # perform the same for the labelled images and save the labelled images in Y
  if mask_path is not None:
    masks = sorted(glob(mask_path + os.sep + '*.tif'))
    Y = np.zeros((len(masks), IMG_HEIGHT, IMG_WIDTH))
    print('Loading the masks ... ')
    for n, element in tqdm(enumerate(masks), total = len(masks)):
      mask = imread(element)
      Y[n,:,:] = mask
      
    return X,Y
  
  else:

    return X

The training and testing sets are defined below. By convention, the raw data are called X and the the labelled images Y.

In [None]:
train_data_path = TRAIN_PATH + os.sep + 'raw' 
train_mask_path = TRAIN_PATH + os.sep + 'mask'

test_data_path = TEST_PATH + os.sep + 'raw'
test_mask_path = TEST_PATH + os.sep + 'mask'

# load the training and testing set
x_train, Y_train = get_data(train_data_path, train_mask_path)
x_test, Y_test = get_data(test_data_path, test_mask_path)

Since the images were selected from different experiments, the imaging conditions were not always indentical. In order to homogeneize the data and help the network generalize, the raw data are normalized.

In [None]:
# normalize the images
X_train = np.zeros(x_train.shape)
X_test = np.zeros(x_test.shape)

for n in tqdm(range(len(X_train))):
  x = x_train[n, :, :]
  X_train[n, :, :] = (x - np.mean(x)) / np.std(x)

for n in tqdm(range(len(X_test))):
  x = x_test[n, :, :]
  X_test[n, :, :] = (x - np.mean(x)) / np.std(x)

# X_train = normalize(X_train)
# X_test = normalize(X_test)

## II- Data vizualization :

Display an example of images

In [None]:
ix = random.randint(0, len(X_train)-1)
print(X_train.shape)
im = X_train[ix,:,:]
plt.figure(figsize=(10,10))
plt.subplot(1,2,1)
plt.imshow(im)
plt.subplot(1,2,2)
plt.imshow(Y_train[ix,:,:])

print(f'The max pixel value for the normalized image is {np.max(im)} and min is {np.min(im)}')

As illustrated above, there is a strong class imbalance between background (0) and cells (1). This is to keep in mind during the training in order to interpret the results. Indeed, even if the network is systematically assigning the background label to all pixels, the average accuracy of the network will be >87%!

In [None]:
# Analyze the class imbalance
unique, counts = np.unique(Y_test, return_counts=True)
print(f'The background represents {counts[0]/np.sum(counts)*100}% of the total number of pixels')

## III - Building a U-Net from scratch :

As for the previous convolutional network used for image classification, the Unet is still built by sequentially adding convolutional blocks.

In [None]:
# Each block of u-net architecture consists of two Convolution layers
# These two layers are written in a function to make our code clean
# -----------------------------------------------------------------

def conv2d_block(input_tensor, n_filters, kernel_size=3):

    # first layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size),padding="same")(input_tensor)
    x = Activation("relu")(x)
    x = BatchNormalization()(x)

    # second layer
    x = Conv2D(filters=n_filters, kernel_size=(kernel_size, kernel_size), padding="same")(x)
    x = Activation("relu")(x)
    x = BatchNormalization()(x)
    
    return x

In [None]:
# The u-net architecture consists of contracting and expansive paths which
# shrink and expands the in/out image respectivly. The output image have the 
# same size of the input image.
# -----------------------------

def get_unet(input_img, n_filters):

    # contracting path
    # ----------------
    c1 = conv2d_block(input_img, n_filters=n_filters*4, kernel_size=3) #The first block of U-net
    p1 = MaxPooling2D((2, 2)) (c1)

    c2 = conv2d_block(p1, n_filters=n_filters*8, kernel_size=3)
    p2 = MaxPooling2D((2, 2)) (c2)

    c3 = conv2d_block(p2, n_filters=n_filters*16, kernel_size=3)
    p3 = MaxPooling2D((2, 2)) (c3)

    c4 = conv2d_block(p3, n_filters=n_filters*32, kernel_size=3)
    p4 = MaxPooling2D(pool_size=(2, 2)) (c4)

    c5 = conv2d_block(p4, n_filters=n_filters*64, kernel_size=3)
    #p5 = MaxPooling2D(pool_size=(2, 2)) (c5)
    
    # expansive path
    # --------------
    u7 = Conv2DTranspose(n_filters*64, (3, 3), strides=(2, 2), padding='same') (c5) #upsampling included
    u7 = Concatenate()([u7, c4])
    c7 = conv2d_block(u7, n_filters=n_filters*64, kernel_size=3)
    #c7 = Dropout(0.4) (c7)

    u8 = Conv2DTranspose(n_filters*32, (3, 3), strides=(2, 2), padding='same') (c7)
    u8 = Concatenate()([u8, c3])
    c8 = conv2d_block(u8, n_filters=n_filters*32, kernel_size=3)
    #c8 = Dropout(0.35) (c8)

    u9 = Conv2DTranspose(n_filters*16, (3, 3), strides=(2, 2), padding='same') (c8)
    u9 = Concatenate()([u9, c2])
    c9 = conv2d_block(u9, n_filters=n_filters*16, kernel_size=3)
    #c9 = Dropout(0.3) (c9)

    u10 = Conv2DTranspose(n_filters*8, (3, 3), strides=(2, 2), padding='same') (c9)
    u10 = Concatenate()([u10, c1])
    c10 = conv2d_block(u10, n_filters=n_filters*8, kernel_size=3)
    #c10 = Dropout(0.25) (c10)
    
    outputs = Conv2D(1, (1, 1), activation='sigmoid') (c10)
    model = Model(inputs=[input_img], outputs=[outputs])
    return model

Create the Unet:

In [None]:
input_img = Input((X_train.shape[1], X_train.shape[2],1), name='img')
model = get_unet(input_img, n_filters=4)

model.summary()

Implementation of the dice coefficient metrics for the training and the loss.

In [None]:
# the coefficient takes values in [0, 1], where 0 is the worst score, 1 is the 
# best score the dice coefficient of two sets represented as vectors a, b is 
# computed as (2 *|a b| / (a + b))
# ------------------------------------

def dice_coefficient(y_true, y_pred):
    eps = 1e-6
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    out = (2. * intersection) / (K.sum(y_true_f) + \
                                 K.sum(y_pred_f) + eps)
    return out

def loss_dice_coefficient(y_true, y_pred):
    eps = 1e-6
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    y_true_f = float(y_true_f)
    y_pred_f = float(y_pred_f)
    intersection = K.sum(y_true_f * y_pred_f)
    out = 1 - (2. * intersection) / (K.sum(y_true_f) + \
                                     K.sum(y_pred_f) + eps)
    return out

Compile the network with the selected optimizer and loss function and launch the training.

In [None]:
model.compile(optimizer='adam', 
              loss=loss_dice_coefficient,
              metrics=[dice_coefficient])

# model.compile(optimizer='adam', 
#               loss="binary_crossentropy",
#               metrics=["accuracy"])

In [None]:
results = model.fit(X_train, Y_train, 
                    epochs=250, 
                    validation_split=0.2,
                    batch_size=32,
                    shuffle=True,
                    )

## IV - Evaluate the performance of the network :

Plot the loss and metrics curves. 

In [None]:
history_dict = results.history

# acc_values = history_dict['accuracy']
# val_acc_values = history_dict['val_accuracy']
acc_values = history_dict['dice_coefficient']
val_acc_values = history_dict['val_dice_coefficient']

n = len(acc_values)
epochs = range(1, n+1)

plt.plot(epochs, acc_values, 'bo', label='Training acc')
plt.plot(epochs, val_acc_values, 'b', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Dice coefficient')
plt.legend()

plt.show()

In [None]:
loss_values = history_dict['loss']
val_loss_values = history_dict['val_loss']

plt.plot(epochs, loss_values, '-bo', label='Training loss')
plt.plot(epochs, val_loss_values, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.show()

Calculate the average dice coefficient calculated over the testing set.

In [None]:
model.evaluate(X_test,Y_test)

In [None]:
preds_test = model.predict(X_test, verbose=1)
# we apply a threshold on predicted mask (probability mask) to convert it to a binary mask.
preds_test_t = (preds_test > 0.5).astype(np.uint8) 

In [None]:
ix = random.randint(0, len(X_test)-1)

fig = plt.figure(figsize=(10, 10))
plt.subplot(221)
plt.imshow(X_test[ix,:,:])
plt.title("input image")

plt.subplot(222)
plt.imshow(np.squeeze(Y_test[ix, :, :]))
plt.title("ground truth")

plt.subplot(223)
plt.imshow(np.squeeze(preds_test[ix,:,:,0]))
plt.title("Probability map of the predicted mask")

plt.subplot(224)
plt.imshow(np.squeeze(preds_test_t[ix, :, :]))
plt.title("Predicted mask after thresholding")

plt.show()