In [1]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda




################################################################
def multi_unet_model(n_classes=4, IMG_HEIGHT=256, IMG_WIDTH=256, IMG_CHANNELS=1):
#Build the model
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
    #s = Lambda(lambda x: x / 255)(inputs)   #No need for this if we normalize our inputs beforehand
    s = inputs

    #Contraction path
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(s)
    c1 = Dropout(0.1)(c1)
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)
    
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
    c2 = Dropout(0.1)(c2)
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)
     
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
    c3 = Dropout(0.2)(c3)
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)
     
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
    c4 = Dropout(0.2)(c4)
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
    p4 = MaxPooling2D(pool_size=(2, 2))(c4)
     
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
    c5 = Dropout(0.3)(c5)
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)
    
    #Expansive path 
    u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
    c6 = Dropout(0.2)(c6)
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
     
    u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
    c7 = Dropout(0.2)(c7)
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
     
    u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
    c8 = Dropout(0.1)(c8)
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
     
    u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
    c9 = Dropout(0.1)(c9)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
     
    outputs = Conv2D(n_classes, (1, 1), activation='softmax')(c9)
     
    model = Model(inputs=[inputs], outputs=[outputs])
    return model

In [2]:
import cv2
import glob
import os
import numpy as np
from matplotlib import pyplot as plt
from keras.utils import normalize
# Resizing images, if needed
SIZE_X = 128
SIZE_Y = 128
n_classes = 4  # Number of classes for segmentation

# Capture training image info as a list
train_images = []

image_paths = glob.glob("data/patches_128_useful/patch_img/*.jpg")
image_paths.sort()
print("Image paths:")
for img_path in image_paths:
    print(img_path)
    img = cv2.imread(img_path, 0)
    img = cv2.resize(img, (SIZE_Y, SIZE_X))
    train_images.append(img)

# Convert list to array for machine learning processing
train_images = np.array(train_images)
print("Train images shape:", train_images.shape)

Image paths:
data/patches_128_useful/patch_img\image_100_1_1.jpg
data/patches_128_useful/patch_img\image_100_1_2.jpg
data/patches_128_useful/patch_img\image_100_1_3.jpg
data/patches_128_useful/patch_img\image_100_1_4.jpg
data/patches_128_useful/patch_img\image_100_2_1.jpg
data/patches_128_useful/patch_img\image_100_2_2.jpg
data/patches_128_useful/patch_img\image_100_2_3.jpg
data/patches_128_useful/patch_img\image_100_2_4.jpg
data/patches_128_useful/patch_img\image_100_3_1.jpg
data/patches_128_useful/patch_img\image_100_3_2.jpg
data/patches_128_useful/patch_img\image_100_3_3.jpg
data/patches_128_useful/patch_img\image_100_3_4.jpg
data/patches_128_useful/patch_img\image_100_3_5.jpg
data/patches_128_useful/patch_img\image_100_4_1.jpg
data/patches_128_useful/patch_img\image_100_4_2.jpg
data/patches_128_useful/patch_img\image_100_4_3.jpg
data/patches_128_useful/patch_img\image_100_4_4.jpg
data/patches_128_useful/patch_img\image_101_1_1.jpg
data/patches_128_useful/patch_img\image_101_1_2.jpg

In [3]:
# Capture mask/label info as a list
train_masks = []

mask_paths = glob.glob("data/patches_128_useful/patch_mask/*.tif")
mask_paths.sort()
print("Mask paths:")
for mask_path in mask_paths:
    print(mask_path)
    mask = cv2.imread(mask_path, 0)
    mask = cv2.resize(mask, (SIZE_Y, SIZE_X), interpolation=cv2.INTER_NEAREST)  # Otherwise ground truth changes due to interpolation
    train_masks.append(mask)

# Convert list to array for machine learning processing
train_masks = np.array(train_masks)
print("Train masks shape:", train_masks.shape)

Mask paths:
data/patches_128_useful/patch_mask\mask_100_1_1.tif
data/patches_128_useful/patch_mask\mask_100_1_2.tif
data/patches_128_useful/patch_mask\mask_100_1_3.tif
data/patches_128_useful/patch_mask\mask_100_1_4.tif
data/patches_128_useful/patch_mask\mask_100_2_1.tif
data/patches_128_useful/patch_mask\mask_100_2_2.tif
data/patches_128_useful/patch_mask\mask_100_2_3.tif
data/patches_128_useful/patch_mask\mask_100_2_4.tif
data/patches_128_useful/patch_mask\mask_100_3_1.tif
data/patches_128_useful/patch_mask\mask_100_3_2.tif
data/patches_128_useful/patch_mask\mask_100_3_3.tif
data/patches_128_useful/patch_mask\mask_100_3_4.tif
data/patches_128_useful/patch_mask\mask_100_3_5.tif
data/patches_128_useful/patch_mask\mask_100_4_1.tif
data/patches_128_useful/patch_mask\mask_100_4_2.tif
data/patches_128_useful/patch_mask\mask_100_4_3.tif
data/patches_128_useful/patch_mask\mask_100_4_4.tif
data/patches_128_useful/patch_mask\mask_101_1_1.tif
data/patches_128_useful/patch_mask\mask_101_1_2.tif


In [4]:
###############################################
#Encode labels... but multi dim array so need to flatten, encode and reshape
from sklearn.preprocessing import LabelEncoder
labelencoder = LabelEncoder()
n, h, w = train_masks.shape
train_masks_reshaped = train_masks.reshape(-1,1)
train_masks_reshaped_encoded = labelencoder.fit_transform(train_masks_reshaped)
train_masks_encoded_original_shape = train_masks_reshaped_encoded.reshape(n, h, w)

np.unique(train_masks_encoded_original_shape)

#################################################
train_images = np.expand_dims(train_images, axis=3)
train_images = normalize(train_images, axis=1)

train_masks_input = np.expand_dims(train_masks_encoded_original_shape, axis=3)

  y = column_or_1d(y, warn=True)


In [5]:
#Create a subset of data for quick testing
#Picking 10% for testing and remaining for training
from sklearn.model_selection import train_test_split
# X1, X_test, y1, y_test = train_test_split(train_images, train_masks_input, test_size = 0.10, random_state = 0)
# #Further split training data t a smaller subset for quick testing of models
# X_train, X_do_not_use, y_train, y_do_not_use = train_test_split(X1, y1, test_size = 0.1, random_state = 0)


#Full Data
X_train, X_test, y_train, y_test = train_test_split(train_images, train_masks_input, test_size = 0.10, random_state = 0)
print("Class values in the dataset are ... ", np.unique(y_train))  # 0 is the background/few unlabeled 


Class values in the dataset are ...  [0 1 2 3]


In [6]:

from keras.utils import to_categorical
train_masks_cat = to_categorical(y_train, num_classes=n_classes)
y_train_cat = train_masks_cat.reshape((y_train.shape[0], y_train.shape[1], y_train.shape[2], n_classes))

test_masks_cat = to_categorical(y_test, num_classes=n_classes)
y_test_cat = test_masks_cat.reshape((y_test.shape[0], y_test.shape[1], y_test.shape[2], n_classes))


In [7]:
# Assuming you have one-hot encoded masks
# train_masks_cat and test_masks_cat are your one-hot encoded masks

num_classes = n_classes  # Set the number of classes to 4

# For training data
class_pixel_counts_train = np.sum(train_masks_cat, axis=(0, 1, 2))
print("Class-wise pixel counts for training data:")
for class_index in range(num_classes):
    pixel_count = class_pixel_counts_train[class_index]
    print(f"Class {class_index}: {pixel_count} pixels")

# For testing data
class_pixel_counts_test = np.sum(test_masks_cat, axis=(0, 1, 2))
print("Class-wise pixel counts for testing data:")
for class_index in range(num_classes):
    pixel_count = class_pixel_counts_test[class_index]
    print(f"Class {class_index}: {pixel_count} pixels")


Class-wise pixel counts for training data:
Class 0: 16777216.0 pixels
Class 1: 6610873.0 pixels
Class 2: 10398736.0 pixels
Class 3: 832332.0 pixels
Class-wise pixel counts for testing data:
Class 0: 3853517.0 pixels
Class 1: 742354.0 pixels
Class 2: 1172084.0 pixels
Class 3: 97517.0 pixels


In [8]:
###############################################################
from sklearn.utils import class_weight
class_weights = class_weight.compute_class_weight('balanced',
                                                 classes = np.unique(train_masks_reshaped_encoded),
                                                 y = train_masks_reshaped_encoded)
print("Class weights are...:", class_weights)
class_weights = {l:c for l,c in zip(np.unique(train_masks_reshaped_encoded), class_weights)}

IMG_HEIGHT = X_train.shape[1]
IMG_WIDTH  = X_train.shape[2]
IMG_CHANNELS = X_train.shape[3]

Class weights are...: [ 0.37835547  1.98972669  1.2644663  15.73471822]


In [9]:
# Add print statements to check tensor shapes
print("Input shapes:")
print("X_train shape:", X_train.shape)
print("y_train_cat shape:", y_train_cat.shape)
print("X_test shape:", X_test.shape)
print("y_test_cat shape:", y_test_cat.shape)

Input shapes:
X_train shape: (3214, 128, 128, 1)
y_train_cat shape: (3214, 128, 128, 4)
X_test shape: (358, 128, 128, 1)
y_test_cat shape: (358, 128, 128, 4)


In [10]:
from keras.callbacks import ModelCheckpoint, EarlyStopping, CSVLogger
BACKBONE = 'MultiUnet'
#ModelCheckpoint callback saves a model at some interval. 
filepath=f'SegmentationUnet/{BACKBONE}/CheckPoints/weights-improvement_{BACKBONE}.hdf5' #File name includes epoch and validation accuracy.
#Use Mode = max for accuracy and min for loss. 
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

#https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/EarlyStopping
early_stop = EarlyStopping(monitor='val_loss', patience=10, verbose=1)
#This callback will stop the training when there is no improvement in
# the validation loss for three consecutive epochs.

#CSVLogger logs epoch, acc, loss, val_acc, val_loss
log_csv = CSVLogger(f'SegmentationUnet/{BACKBONE}/training-history-{BACKBONE}.csv', separator=',', append=False)

callbacks_list = [checkpoint, early_stop, log_csv]

In [None]:
def get_model():
    return multi_unet_model(n_classes=n_classes, IMG_HEIGHT=IMG_HEIGHT, IMG_WIDTH=IMG_WIDTH, IMG_CHANNELS=IMG_CHANNELS)

model = get_model()
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
model.summary()

#If starting with pre-trained weights. 
# model.load_weights('test.hdf5')

history = model.fit(X_train, y_train_cat, 
                    batch_size = 2, 
                    verbose=1, 
                    epochs=300, 
                    validation_data=(X_test, y_test_cat), 
                    class_weight=class_weights,
                    shuffle=False,
                    callbacks = callbacks_list)

model.save('SavedModels/testMultiUnet_300epochs.hdf5')

############################################################
#Evaluate the model
	# evaluate model
_, acc = model.evaluate(X_test, y_test_cat)
print("Accuracy is = ", (acc * 100.0), "%")

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 128, 128, 1)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 128, 128, 16)         160       ['input_1[0][0]']             
                                                                                                  
 dropout (Dropout)           (None, 128, 128, 16)         0         ['conv2d[0][0]']              
                                                                                                  
 conv2d_1 (Conv2D)           (None, 128, 128, 16)         2320      ['dropout[0][0]']             
                                                                                              

  saving_api.save_model(


Epoch 2: val_loss improved from 0.65607 to 0.56284, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 3/300
Epoch 3: val_loss improved from 0.56284 to 0.54083, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 4/300
Epoch 4: val_loss improved from 0.54083 to 0.53312, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 5/300
Epoch 5: val_loss improved from 0.53312 to 0.50611, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 6/300
Epoch 6: val_loss improved from 0.50611 to 0.49929, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 7/300
Epoch 7: val_loss improved from 0.49929 to 0.49571, saving model to SegmentationUnet/MultiUnet/CheckPoints\weights-improvement_MultiUnet.hdf5
Epoch 8/300
Epoch 8: val_loss improved from 0.49571 to 0.45907, saving model to Segmenta

In [None]:
###
#plot the training and validation accuracy and loss at each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

history_dict = history.history
print(history_dict.keys())
# acc = history.history['acc']
# val_acc = history.history['val_acc']
acc = history_dict['accuracy']
val_acc = history_dict['val_accuracy']

plt.plot(epochs, acc, 'y', label='Training Accuracy')
plt.plot(epochs, val_acc, 'r', label='Validation Accuracy')
plt.title('Training and validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


In [None]:
from keras.models import load_model
model = load_model('SavedModels/testMultiUnet_300epochs.hdf5', compile=False)
model.load_weights('SavedModels/testMultiUnet_300epochs.hdf5')  

In [None]:
#IOU
y_pred=model.predict(X_test)
y_pred_argmax=np.argmax(y_pred, axis=3)

##################################################

#Using built in keras function
from keras.metrics import MeanIoU
n_classes = 4
IOU_keras = MeanIoU(num_classes=n_classes)  
IOU_keras.update_state(y_test[:,:,:,0], y_pred_argmax)
print("Mean IoU =", IOU_keras.result().numpy())


#To calculate I0U for each class...
values = np.array(IOU_keras.get_weights()).reshape(n_classes, n_classes)
print(values)
class1_IoU = values[0,0]/(values[0,0] + values[0,1] + values[0,2] + values[0,3] + values[1,0]+ values[2,0]+ values[3,0])
class2_IoU = values[1,1]/(values[1,1] + values[1,0] + values[1,2] + values[1,3] + values[0,1]+ values[2,1]+ values[3,1])
class3_IoU = values[2,2]/(values[2,2] + values[2,0] + values[2,1] + values[2,3] + values[0,2]+ values[1,2]+ values[3,2])
class4_IoU = values[3,3]/(values[3,3] + values[3,0] + values[3,1] + values[3,2] + values[0,3]+ values[1,3]+ values[2,3])

print("IoU for class1 is: ", class1_IoU)
print("IoU for class2 is: ", class2_IoU)
print("IoU for class3 is: ", class3_IoU)
print("IoU for class4 is: ", class4_IoU)


In [None]:
plt.imshow(train_images[0, :,:,0], cmap='gray')

In [None]:
plt.imshow(train_masks[0], cmap= 'gray')

In [None]:
#######################################################################
#Predict on a few images
for i in range(10):
    model = load_model('SavedModels/testMultiUnet_300epochs.hdf5', compile=False)
    model.load_weights('SavedModels/testMultiUnet_300epochs.hdf5')   
    import random
    test_img_number = random.randint(0, len(X_test)-1)
    test_img = X_test[test_img_number]
    ground_truth=y_test[test_img_number]
    test_img_norm=test_img[:,:,0][:,:,None]
    test_img_input=np.expand_dims(test_img_norm, 0)
    prediction = (model.predict(test_img_input))
    predicted_img=np.argmax(prediction, axis=3)[0,:,:]
    
    
    plt.figure(figsize=(12, 8))
    plt.subplot(231)
    plt.title('Testing Image')
    plt.imshow(test_img[:,:,0], cmap='gray')
    plt.subplot(232)
    plt.title('Testing Label')
    plt.imshow(ground_truth[:,:,0], cmap='jet')
    plt.subplot(233)
    plt.title('Prediction on test image')
    plt.imshow(predicted_img, cmap='jet')
    plt.show()

In [None]:
#####################################################################

# #Predict on large image

# #Apply a trained model on large image

from patchify import patchify, unpatchify

large_image = cv2.imread('LargeImages/Original/1118.jpg', 0)
#This will split the image into small images of shape [3,3]
patches = patchify(large_image, (128, 128), step=128)  #Step=256 for 256 patches means no overlap

predicted_patches = []
for i in range(patches.shape[0]):
    for j in range(patches.shape[1]):
        print(i,j)
        
        single_patch = patches[i,j,:,:]       
        single_patch_norm = np.expand_dims(normalize(np.array(single_patch), axis=1),2)
        single_patch_input=np.expand_dims(single_patch_norm, 0)
        single_patch_prediction = (model.predict(single_patch_input))
        single_patch_predicted_img=np.argmax(single_patch_prediction, axis=3)[0,:,:]

        predicted_patches.append(single_patch_predicted_img)

predicted_patches = np.array(predicted_patches)

predicted_patches_reshaped = np.reshape(predicted_patches, (patches.shape[0], patches.shape[1], 128,128) )

reconstructed_image = unpatchify(predicted_patches_reshaped, large_image.shape)
plt.imshow(reconstructed_image, cmap='gray')
#plt.imsave('data/results/segm.jpg', reconstructed_image, cmap='gray')

plt.hist(reconstructed_image.flatten())  #Threshold everything above 0

# final_prediction = (reconstructed_image > 0.01).astype(np.uint8)
# plt.imshow(final_prediction)

plt.figure(figsize=(8, 8))
plt.subplot(221)
plt.title('Large Image')
plt.imshow(large_image, cmap='gray')
plt.subplot(222)
plt.title('Prediction of large Image')
plt.imshow(reconstructed_image, cmap='jet')
plt.show()