In [None]:

import os

from tensorflow.keras.preprocessing.image import ImageDataGenerator

import cv2
import matplotlib.pyplot as plt
from patchify import patchify, unpatchify

import numpy as np

## Helper function

In [None]:
#!pip list

In [None]:
def padder(image, patch_size):
    h = image.shape[0]
    w = image.shape[1]
    height_padding = ((h // patch_size) + 1) * patch_size - h
    width_padding = ((w // patch_size) + 1) * patch_size - w

    top_padding = int(height_padding/2)
    bottom_padding = height_padding - top_padding

    left_padding = int(width_padding/2)
    right_padding = width_padding - left_padding

    padded_image = cv2.copyMakeBorder(image, top_padding, bottom_padding, left_padding, right_padding, cv2.BORDER_CONSTANT, value=[0, 0, 0])

    return padded_image

## Loading the data in patches

In [None]:
# Set the paths and directories
patch_dir = 'real_plus_sim_data/data_patched_256'

# Define patch size
patch_size = 256

### Train

In [None]:
# Data augmentation for the training set
train_image_datagen = ImageDataGenerator(rescale=1./255, 
    horizontal_flip=True,  # Add horizontal flip for augmentation
    vertical_flip=True                                     
)

train_image_generator = train_image_datagen.flow_from_directory(
    f'{patch_dir}/train_masks',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

# Training masks
train_mask_datagen = ImageDataGenerator(rescale=1./255, 
    horizontal_flip=True,  # Add horizontal flip for augmentation
    vertical_flip=True
)

train_mask_generator = train_mask_datagen.flow_from_directory(
    f'{patch_dir}/train_images',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

train_generator = zip(train_image_generator, train_mask_generator)

### Validation

In [None]:
# Data augmentation for the test set
val_image_datagen = ImageDataGenerator(rescale=1./255,    
                                       horizontal_flip=True,  # Add horizontal flip for augmentation
    vertical_flip=True  # Add horizontal flip for augmentation
)

val_image_generator = val_image_datagen.flow_from_directory(
    f'{patch_dir}/val_masks',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

# Test masks
val_mask_datagen = ImageDataGenerator(rescale=1./255,  
    horizontal_flip=True,  # Add horizontal flip for augmentation
    vertical_flip=True  # Add horizontal flip for augmentation
)

val_mask_generator = val_mask_datagen.flow_from_directory(
    f'{patch_dir}/val_images',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

val_generator = zip(val_image_generator, val_mask_generator)

### Test

In [None]:
# Data augmentation for the test set
test_image_datagen = ImageDataGenerator()

test_image_generator = test_image_datagen.flow_from_directory(
    f'{patch_dir}/test_masks',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

# Test masks
test_mask_datagen = ImageDataGenerator()

test_mask_generator = test_mask_datagen.flow_from_directory(
    f'{patch_dir}/test_images',
    target_size=(patch_size, patch_size),
    batch_size=32,
    class_mode=None,
    color_mode='grayscale',
    seed=42)

test_generator = zip(test_image_generator, test_mask_generator)

## Model architecture

In [None]:
import tensorflow as tf
import keras.backend as K

In [None]:
# Let's implement two custom metrics f1 score and iou
def f1(y_true, y_pred):
    def recall_m(y_true, y_pred):
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        Positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = TP / (Positives+K.epsilon())
        return recall
    
    def precision_m(y_true, y_pred):
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        Pred_Positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = TP / (Pred_Positives+K.epsilon())
        return precision
    
    precision, recall = precision_m(y_true, y_pred), recall_m(y_true, y_pred)
    
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
def iou(y_true, y_pred):
    def f(y_true, y_pred):
        threshold = 0.5 
        y_pred_binary = K.round(y_pred + 0.5 - threshold)
        
        intersection = K.sum(K.abs(y_true * y_pred_binary), axis=[1,2,3])
        total = K.sum(K.square(y_true), [1,2,3]) + K.sum(K.square(y_pred_binary), [1,2,3])
        union = total - intersection
        return (intersection + K.epsilon()) / (union + K.epsilon())
    
    return K.mean(f(y_true, y_pred), axis=-1)

In [None]:
# U-Net model
# Author: Sreenivas Bhattiprolu
# This code is coming from the videos at the beginning
from keras.models import Model
import keras.backend as K
from keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Conv2DTranspose, BatchNormalization, Dropout, Lambda

def simple_unet_model(IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS):
# Build the model
    inputs = Input((IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS))
    s = inputs

    # Contraction path
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(s)
    c1 = Dropout(0.1)(c1)
    c1 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c1)
    p1 = MaxPooling2D((2, 2))(c1)
    
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p1)
    c2 = Dropout(0.1)(c2)
    c2 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c2)
    p2 = MaxPooling2D((2, 2))(c2)
     
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p2)
    c3 = Dropout(0.2)(c3)
    c3 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c3)
    p3 = MaxPooling2D((2, 2))(c3)
     
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p3)
    c4 = Dropout(0.2)(c4)
    c4 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c4)
    p4 = MaxPooling2D(pool_size=(2, 2))(c4)
     
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(p4)
    c5 = Dropout(0.3)(c5)
    c5 = Conv2D(256, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c5)
    
    # Expansive path 
    u6 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c5)
    u6 = concatenate([u6, c4])
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u6)
    c6 = Dropout(0.2)(c6)
    c6 = Conv2D(128, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c6)
     
    u7 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c6)
    u7 = concatenate([u7, c3])
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u7)
    c7 = Dropout(0.2)(c7)
    c7 = Conv2D(64, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c7)
     
    u8 = Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(c7)
    u8 = concatenate([u8, c2])
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u8)
    c8 = Dropout(0.1)(c8)
    c8 = Conv2D(32, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c8)
     
    u9 = Conv2DTranspose(16, (2, 2), strides=(2, 2), padding='same')(c8)
    u9 = concatenate([u9, c1], axis=3)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(u9)
    c9 = Dropout(0.1)(c9)
    c9 = Conv2D(16, (3, 3), activation='relu', kernel_initializer='he_normal', padding='same')(c9)
     
    outputs = Conv2D(1, (1, 1), activation='sigmoid')(c9)
     
    model = Model(inputs=[inputs], outputs=[outputs])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy', f1, iou])
    model.summary()
    
    return model

## Model trainning

In [None]:
from keras.callbacks import EarlyStopping, ModelCheckpoint

# Define the model
model = simple_unet_model(patch_size, patch_size, 1)

# Define callbacks
early_stopping = EarlyStopping(monitor='val_f1', mode = 'max', patience=35, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('occlusion_rs_1_256.h5', monitor='val_f1', mode = 'max', save_best_only=True)

In [None]:
import tensorflow as tf
tf.config.list_physical_devices('GPU')

In [None]:
tf.device('/GPU:0')

In [None]:
history = model.fit(
    train_generator,
    steps_per_epoch=len(train_image_generator),  # Use len(train_image_generator) instead
    epochs=400,  # Adjust the number of epochs as needed
    validation_data=val_generator,
    validation_steps=len(val_image_generator),  # Use len(val_image_generator) instead
    callbacks=[early_stopping]
)

## Evaluation

## Error analysis

In [None]:
# Evaluate the model on the test set
score = model.evaluate(test_generator, steps=len(test_image_generator))

# Print the evaluation metrics
print(f'Test Loss: {score[0]}')
print(f'Test Accuracy: {score[1]}')
print(f'Test F1 Score: {score[2]}')
print(f'Test IoU: {score[3]}')

In [None]:
# Save the best model
model.save(f"occlusion_rs_1_256.h5")

In [None]:
from keras.models import load_model

# Load the best model saved during training, providing custom metrics
model = load_model("occlusion_rs_1_256.h5", custom_objects={'f1': f1, 'iou': iou})

In [None]:
def process_images_in_folder(folder_path, num_images):
    # Get a list of all image files in the specified folder
    image_files = [f for f in os.listdir(folder_path) if f.endswith(('.png', '.tif', '.jpeg'))]

    # Select a random sample of image files
    selected_files = np.random.choice(image_files, num_images, replace=False)

    for image_file in selected_files:
        # Construct the full path of the image file
        image_path = os.path.join(folder_path, image_file)

        # Read the image
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

        # Apply your existing code
        #patch_size = 512
        image = padder(image, patch_size)
        patches = patchify(image, (patch_size, patch_size), step=patch_size)
        i, j = patches.shape[0], patches.shape[1]
        patches = patches.reshape(-1, patch_size, patch_size, 1)
        preds = model.predict(patches / 255)
        preds = preds.reshape(i, j, patch_size, patch_size)
        predicted_mask = unpatchify(preds, (image.shape[0], image.shape[1]))
        predicted_mask = predicted_mask > 0.5

        # Plot the images with the image file name as the title
        fig, ax = plt.subplots(1, 2, figsize=(11, 6))
        ax[0].imshow(image, cmap="gray")
        ax[1].imshow(predicted_mask, cmap="gray")
        ax[0].axis("off")
        ax[0].set_title("Original image")
        ax[1].axis("off")
        ax[1].set_title(f"Predicted root mask - {image_file}")

        plt.show()

In [None]:
folder_path = 'p_w'
num_images = 1

process_images_in_folder(folder_path, num_images)


In [None]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
from patchify import patchify, unpatchify

def process_images_in_folder(folder_path, num_images, refinement_steps=5):
    # Get a list of all image files in the specified folder
    image_files = [f for f in os.listdir(folder_path) if f.endswith(('.png', '.tif', '.jpeg'))]

    # Select a random sample of image files
    selected_files = np.random.choice(image_files, num_images, replace=False)

    for image_file in selected_files:
        # Construct the full path of the image file
        image_path = os.path.join(folder_path, image_file)

        # Read the image
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

        # Assume 'padder' function is defined elsewhere to adjust image dimensions as needed
        #patch_size = 512
        image = padder(image, patch_size)  # Assuming 'padder' adjusts the image size as required
        patches = patchify(image, (patch_size, patch_size), step=patch_size)
        i, j = patches.shape[0], patches.shape[1]
        patches = patches.reshape(-1, patch_size, patch_size, 1)

        # Initial prediction
        preds = model.predict(patches / 255)
        
        # Iteratively refine predictions
        for _ in range(refinement_steps):
            # Reshape predictions to match the patches layout, then predict again
            refined_patches = preds.reshape(-1, patch_size, patch_size, 1)
            preds = model.predict(refined_patches)
            preds = preds > 0.5  # Apply threshold to get binary mask

        # Final processing of predictions
        preds = preds.reshape(i, j, patch_size, patch_size)
        predicted_mask = unpatchify(preds, (image.shape[0], image.shape[1]))
        #predicted_mask = predicted_mask > 0.5  # Apply threshold to get binary mask

        # Plot the images with the image file name as the title
        fig, ax = plt.subplots(1, 2, figsize=(11, 6))
        ax[0].imshow(image, cmap="gray")
        ax[1].imshow(predicted_mask, cmap="gray")
        ax[0].axis("off")
        ax[0].set_title("Original image")
        ax[1].axis("off")
        ax[1].set_title(f"Predicted root mask - {image_file}")

        plt.show()


In [None]:
folder_path = 'p_w'
num_images = 1

process_images_in_folder(folder_path, num_images, refinement_steps=4)

In [None]:
folder_path = 'real_occlusions_test_images'
num_images = 32

process_images_in_folder(folder_path, num_images, refinement_steps=8)