In [1]:
#utils.py

import os
from PIL import Image,ImageFilter
import numpy as np
import random
import imgaug.augmenters as iaa
import tensorflow as tf
from keras.losses import binary_crossentropy

def preprocess_images(image, annotation,dim,resize_only=False):

        # Convert to grayscale
        image_gray = image.convert('L')
        ann_gray = annotation.convert('L')

        if resize_only==False:

          # Apply blur
          image_blurred = image_gray.filter(ImageFilter.BLUR)
          ann_blurred = ann_gray.filter(ImageFilter.BLUR)

          # Augmentation
          aug = iaa.Sequential([
              iaa.GaussianBlur(sigma=(0, 3.0)),
              iaa.AdditiveGaussianNoise(scale=(0, 0.05 * 255)),
              iaa.Affine(rotate=(-45, 45)),
              iaa.Multiply((0.8, 1.2))
          ])

          #Crop/resize
          image_augmented = aug.augment_image(np.array(image_blurred))
          ann_augmented = aug.augment_image(np.array(ann_blurred))

          image_augmented = tf.convert_to_tensor(image_augmented)
          image_augmented = tf.expand_dims(image_augmented,-1)
          image_augmented = tf.image.resize_with_crop_or_pad(image_augmented,*dim)

          ann_augmented = tf.convert_to_tensor(ann_augmented)
          ann_augmented = tf.expand_dims(ann_augmented,-1)
          ann_augmented = tf.image.resize_with_crop_or_pad(ann_augmented,*dim)

          ann_augmented = tf.cast(ann_augmented,tf.float32)/255.0
          ann_augmented=tf.cast(ann_augmented,tf.int32)

        else:

          #Crop/resize
          image_augmented = np.array(image_gray)
          ann_augmented = np.array(ann_gray)

          image_augmented = tf.convert_to_tensor(image_augmented)
          image_augmented = tf.expand_dims(image_augmented,-1)
          image_augmented = tf.image.resize_with_crop_or_pad(image_augmented,*dim)

          ann_augmented = tf.convert_to_tensor(ann_augmented)
          ann_augmented = tf.expand_dims(ann_augmented,-1)
          ann_augmented = tf.image.resize_with_crop_or_pad(ann_augmented,*dim)

          ann_augmented = tf.cast(ann_augmented,tf.float32)/255.0
          ann_augmented=tf.cast(ann_augmented,tf.int32)

        return image_augmented,ann_augmented


def dice_coeff(y_true, y_pred):
    smooth = 1.
    # Flatten
    y_true = tf.cast(y_true, dtype=tf.float32)  # Convert labels to float32
    y_pred = tf.cast(y_pred, dtype=tf.float32)
    y_true_f = tf.reshape(y_true, [-1])
    y_pred_f = tf.reshape(y_pred, [-1])
    intersection = tf.reduce_sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth)
    return score

def dice_loss(y_true, y_pred):
    loss = 1 - dice_coeff(y_true, y_pred)
    return loss

def bce_dice_loss(y_true, y_pred):
    loss = binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred)
    return loss

ModuleNotFoundError: No module named 'imgaug'

In [None]:
#customdatagenerator.py

import numpy as np
import keras
#from utils import preprocess_images
from PIL import Image
import os



class CustomDataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, path, batch_size=32, dim=(800,540), n_channels=1, shuffle=True, augmentation=True):
        'Initialization'
        self.dim = dim # dimensions of images (make sure they are all the same dimension)
        self.batch_size = batch_size # choose batch size
        self.n_channels = n_channels # = 1 for grayscale
        self.shuffle = shuffle
        self.augmentation = augmentation # whether or not you want to perform augmentation
        self.path = path    #**whats the diff bw this and data_path?
        self.img_folder = self.path + 'Image/'
        self.mask_folder = self.path + 'Annotation/'
        self.list_IDs = os.listdir(self.img_folder) # make sure the names of the corresponding files in the images and annotations folders are the same **not exactly.?
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size, *self.dim, self.n_channels), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):

            filename,ext = os.path.splitext(ID)
            # Store sample
            #img = Image.open(self.img_folder + ID + '.png')
            #label = Image.open(self.mask_folder + ID + '.png')  #**adjusted preprocess function to match
            img = Image.open(self.img_folder + ID)
            label = Image.open(self.mask_folder + filename + '_Annotation.png')


            if self.augmentation==True:
                augmented_img, augmented_label = preprocess_images(img, label, self.dim) # ensure this "augment" function encompasses all transformations
                X[i,] = augmented_img
                y[i,] = augmented_label
            else:
                img, label = preprocess_images(img,label,self.dim,resize_only=True)
                X[i,]=img
                y[i,]=label

        return X, y

In [None]:
#data.py

from __future__ import print_function
from keras.preprocessing.image import ImageDataGenerator
import numpy as np
import os
import glob
import skimage.io as io
import skimage.transform as trans

#...
"""
Sky = [128,128,128]
Building = [128,0,0]
Pole = [192,192,128]
Road = [128,64,128]
Pavement = [60,40,222]
Tree = [128,128,0]
SignSymbol = [192,128,128]
Fence = [64,64,128]
Car = [64,0,128]
Pedestrian = [64,64,0]
Bicyclist = [0,128,192]
Unlabelled = [0,0,0]
"""
AF = [255,255,255]
Rest = [0,0,0]

#...
#COLOR_DICT = np.array([Sky, Building, Pole, Road, Pavement,Tree, SignSymbol, Fence, Car, Pedestrian, Bicyclist, Unlabelled])
COLOR_DICT = np.array([AF, Rest])

#...already using ImageDataGenerator - https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/image/ImageDataGenerator
# M/S aug: rotate, add noise, crop, flip(?) // IDG flips/rotates. does not crop (?) or add noise
# clarify: preprocessing should also done here/is part of dataloader right? point was to avoid having to upload separately
# just import prepr function and run through that first right?
# are annotation/label/mask/ground truth the same thing?
# so: load data -> run prepr function -> run aug function [trainGenerator?] -> etc. ?
# what's geneTrainNpy() doing?

def adjustData(img,mask,flag_multi_class,num_class):
    if(flag_multi_class):
        img = img / 255
        mask = mask[:,:,:,0] if(len(mask.shape) == 4) else mask[:,:,0]
        new_mask = np.zeros(mask.shape + (num_class,))
        for i in range(num_class):
            #for one pixel in the image, find the class in mask and convert it into one-hot vector
            #index = np.where(mask == i)
            #index_mask = (index[0],index[1],index[2],np.zeros(len(index[0]),dtype = np.int64) + i) if (len(mask.shape) == 4) else (index[0],index[1],np.zeros(len(index[0]),dtype = np.int64) + i)
            #new_mask[index_mask] = 1
            new_mask[mask == i,i] = 1
        new_mask = np.reshape(new_mask,(new_mask.shape[0],new_mask.shape[1]*new_mask.shape[2],new_mask.shape[3])) if flag_multi_class else np.reshape(new_mask,(new_mask.shape[0]*new_mask.shape[1],new_mask.shape[2]))
        mask = new_mask
    elif(np.max(img) > 1):
        img = img / 255
        mask = mask /255
        mask[mask > 0.5] = 1
        mask[mask <= 0.5] = 0
    return (img,mask)

#change prefixes?
def trainGenerator(batch_size,train_path,image_folder,mask_folder,aug_dict,image_color_mode = "grayscale",
                    mask_color_mode = "grayscale",image_save_prefix  = "image",mask_save_prefix  = "mask",
                    flag_multi_class = False,num_class = 2,save_to_dir = None,target_size = (256,256),seed = 1):
    '''
    can generate image and mask at the same time
    use the same seed for image_datagen and mask_datagen to ensure the transformation for image and mask is the same
    if you want to visualize the results of generator, set save_to_dir = "your path"
    '''
    image_datagen = ImageDataGenerator(**aug_dict)
    mask_datagen = ImageDataGenerator(**aug_dict)
    image_generator = image_datagen.flow_from_directory(
        train_path,
        classes = [image_folder],
        class_mode = None,
        color_mode = image_color_mode,
        target_size = target_size,
        batch_size = batch_size,
        save_to_dir = save_to_dir,
        save_prefix  = image_save_prefix,
        seed = seed)
    mask_generator = mask_datagen.flow_from_directory(
        train_path,
        classes = [mask_folder],
        class_mode = None,
        color_mode = mask_color_mode,
        target_size = target_size,
        batch_size = batch_size,
        save_to_dir = save_to_dir,
        save_prefix  = mask_save_prefix,
        seed = seed)
    train_generator = zip(image_generator, mask_generator)
    for (img,mask) in train_generator:
        img,mask = adjustData(img,mask,flag_multi_class,num_class)
        yield (img,mask)

def testGenerator(test_path,num_image = 30,target_size = (256,256),flag_multi_class = False,as_gray = True):
    for i in range(num_image):
        files = os.listdir(test_path)
        img = io.imread(os.path.join(test_path, files[i]),as_gray = as_gray)
        img = img / 255
        img = trans.resize(img,target_size)
        img = np.reshape(img,img.shape+(1,)) if (not flag_multi_class) else img
        img = np.reshape(img,(1,)+img.shape)
        yield img


def geneTrainNpy(image_path,mask_path,flag_multi_class = False,num_class = 2,image_prefix = "image",mask_prefix = "mask",image_as_gray = True,mask_as_gray = True):
    image_name_arr = glob.glob(os.path.join(image_path,"%s*.png"%image_prefix))
    image_arr = []
    mask_arr = []
    for index,item in enumerate(image_name_arr):
        img = io.imread(item,as_gray = image_as_gray)
        img = np.reshape(img,img.shape + (1,)) if image_as_gray else img
        mask = io.imread(item.replace(image_path,mask_path).replace(image_prefix,mask_prefix),as_gray = mask_as_gray)
        mask = np.reshape(mask,mask.shape + (1,)) if mask_as_gray else mask
        img,mask = adjustData(img,mask,flag_multi_class,num_class)
        image_arr.append(img)
        mask_arr.append(mask)
    image_arr = np.array(image_arr)
    mask_arr = np.array(mask_arr)
    return image_arr,mask_arr


def labelVisualize(num_class,color_dict,img):
    img = img[:,:,0] if len(img.shape) == 3 else img
    img_out = np.zeros(img.shape + (3,))
    for i in range(num_class):
        img_out[img == i,:] = color_dict[i]
    return img_out / 255



def saveResult(save_path,inp_path,npyfile,flag_multi_class = False,num_class = 2):
    filenames = os.listdir(inp_path)
    for i,item in enumerate(npyfile):
        img = labelVisualize(num_class,COLOR_DICT,item) if flag_multi_class else item[:,:,0]
        io.imsave(os.path.join(save_path,filenames[i]),img)


In [None]:
#model.py

import numpy as np
import os
import skimage.io as io
import skimage.transform as trans
import numpy as np
from keras.models import *
from keras.layers import *
from keras.optimizers import *
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras import backend as keras
#from utils import dice_coeff, bce_dice_loss


def unet(pretrained_weights = None,input_size = (256,256,1)):
    inputs = Input(input_size)
    conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(512, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(drop5))
    merge6 = concatenate([drop4,up6], axis = 3)
    conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv6)

    up7 = Conv2D(256, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv7)

    up8 = Conv2D(128, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv8)

    up9 = Conv2D(64, 2, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    conv9 = Conv2D(2, 3, activation = 'relu', padding = 'same', kernel_initializer = 'he_normal')(conv9)
    conv10 = Conv2D(1, 1, activation = 'sigmoid')(conv9)

    model = Model(inputs = inputs, outputs = conv10)

    model.compile(optimizer = Adam(lr = 1e-4), loss = bce_dice_loss, metrics = [tf.keras.metrics.binary_accuracy,dice_coeff])

    #model.summary()

    if(pretrained_weights):
      model.load_weights(pretrained_weights)

    return model

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
train_path = '/content/drive/MyDrive/2023 AI4Good Lab Trainee Drive/4. Project Teams/T1 Project/Data/Training-Splitted/train/'
os.path.exists(train_path)

In [None]:
val_path = '/content/drive/MyDrive/2023 AI4Good Lab Trainee Drive/4. Project Teams/T1 Project/Data/Training-Splitted/val/'
os.path.exists(val_path)

In [None]:
#main.py

#from model import *
#from data import *
#from customdatagenerator import *


#os.environ["CUDA_VISIBLE_DEVICES"] = "0"

# these augmentation params not needed if "augment" function is hard-coded to do random transformations
# data_gen_args = dict(rotation_range=0.2,
                    # width_shift_range=0.05,
                    # height_shift_range=0.05,
                    # shear_range=0.05,
                    # zoom_range=0.05,
                    # horizontal_flip=True,
                    # fill_mode='nearest')

train_path = '/content/drive/MyDrive/2023 AI4Good Lab Trainee Drive/4. Project Teams/T1 Project/Data/Training-Splitted/train/'

val_path = '/content/drive/MyDrive/2023 AI4Good Lab Trainee Drive/4. Project Teams/T1 Project/Data/Training-Splitted/val/'
batch_size = 5
dim = (640,640) # dimensions of images  **actual dim or 32,32,32 like customdatagen.py?
n_channels = 1

# using custom datagenerator:
train_gen = CustomDataGenerator(path = train_path, batch_size = batch_size, dim = dim, n_channels = n_channels, shuffle = True, augmentation = True)
val_gen = CustomDataGenerator(path = val_path, batch_size = batch_size, dim = dim, n_channels = n_channels, shuffle = True, augmentation = True)

model = unet(input_size=(*dim,1))
model_checkpoint = ModelCheckpoint('unet_membrane.hdf5', monitor='loss',verbose=1, save_best_only=True)
#model.fit(train_gen, val_gen ,steps_per_epoch=300,epochs=1,callbacks=[model_checkpoint])
model.fit(x=train_gen,epochs=20,callbacks=[model_checkpoint])
#https://www.tensorflow.org/api_docs/python/tf/keras/Model


#*****


In [None]:
checkpoint_dir = '/content/drive/MyDrive/Colab Notebooks/Checkpoints'

os.path.exists(checkpoint_dir)


In [None]:
# Define the checkpoint directory and file path
checkpoint_path = f'{checkpoint_dir}/model.ckpt'

In [None]:
#model = unet(input_size=(*dim,1)) #don above
# Create a checkpoint instance
checkpoint = tf.train.Checkpoint(model=model)

In [None]:
# Create a checkpoint manager to handle saving checkpoints
checkpoint_manager = tf.train.CheckpointManager(checkpoint, checkpoint_dir, max_to_keep=3)

In [None]:
# Save the checkpoint
checkpoint_manager.save(checkpoint_number=1)

# /content/drive/MyDrive/Colab Notebooks/Checkpoints/ckpt-1

In [None]:
model_path ='/content/drive/MyDrive/Colab Notebooks/Modelfile'
os.path.exists(model_path)
model.save(model_path)

In [None]:
!pip install gradio

In [None]:
model_path ='/content/drive/MyDrive/Colab Notebooks/Modelfile/unet_custom.hdf5'

os.path.exists(model_path)

In [None]:
!cd drive
!ls


In [None]:
import gradio as gr
import tensorflow as tf
import cv2
import numpy as np

# Load the UNet model
model_path ='/content/drive/MyDrive/Colab Notebooks/Modelfile/unet_custom.hdf5'
model = tf.keras.models.load_model("model_path")

In [None]:
val_gen = CustomDataGenerator(path = val_path, batch_size = batch_size, dim = dim, n_channels = n_channels, shuffle = False, augmentation = False)



In [None]:
#testGene = testGenerator(val_path,target_size=dim) # I don't think this needs to be changed (the function does not use ImageDataGenerator nor augmentations)
results = model.predict_generator(val_gen,verbose=1)

In [None]:

# Define the image segmentation function
def segment_head_circumference(image):
    # Preprocess the input image
    processed_image = cv2.imread(image, cv2.IMREAD_GRAYSCALE)
    processed_image = processed_image / 255.0
    processed_image = np.expand_dims(processed_image, axis=0)
    processed_image = np.expand_dims(processed_image, axis=-1)

    # Perform the segmentation
    prediction = model.predict(processed_image)
    prediction = np.squeeze(prediction)
    binary_image = np.where(prediction > 0.5, 255, 0).astype(np.uint8)

    return binary_image

# Define the input and output interfaces for Gradio
inputs = gr.inputs.Image(label="Ultrasound Image (PNG format)")
output = gr.outputs.Image(label="Segmented Head Circumference", type="pil")


# Create the Gradio interface
gr_interface = gr.Interface(
    fn=segment_head_circumference,
    inputs=inputs,
    outputs=output,
    title="Ultrasound Image Segmentation",
    description="Segment the head circumference in ultrasound images."
)

# Run the Gradio app on the Hugging Face Spaces platform
gr_interface.launch()



In [None]:
saveResult("/content/predictions",val_path+"Image",results)

In [None]:
import matplotlib.pyplot as plt

plt.imshow(results[0,])
np.unique(results[0,])