In [1]:
def dice_loss(y_true, y_pred):
    intersection = K.sum(y_true * y_pred)
    union = K.sum(y_true) + K.sum(y_pred)
    dice = (2. * intersection + K.epsilon()) / (union + K.epsilon())
    return 1 - dice
def f1(y_true, y_pred):
    def recall_m(y_true, y_pred):
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        Positives = K.sum(K.round(K.clip(y_true, 0, 1)))
        recall = TP / (Positives+K.epsilon())
        return recall
    
    def precision_m(y_true, y_pred):
        TP = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
        Pred_Positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
        precision = TP / (Pred_Positives+K.epsilon())
        return precision
    
    precision, recall = precision_m(y_true, y_pred), recall_m(y_true, y_pred)
    
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [4]:
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
import matplotlib.pyplot as plt
import networkx as nx
from skimage.morphology import skeletonize
from PIDController import PIDController
from OT2Eenv import OT2Env  # Updated environment class

def crop_petri_dish(image, patch_size):
    """
    Detect and crop the Petri dish from the image.
    """
    _, binary = cv2.threshold(image, 100, 255, cv2.THRESH_BINARY)
    contours, _ = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    largest_contour = max(contours, key=cv2.contourArea, default=None)
    if largest_contour is None:
        return None, None, False
    x, y, w, h = cv2.boundingRect(largest_contour)
    cropped_image = image[y:y + h, x:x + w]
    padded_image = pad_image(cropped_image, patch_size)
    return padded_image, (x, y, w, h), True

def pad_image(image, patch_size):
    height, width = image.shape[:2]
    pad_height = (patch_size[0] - height % patch_size[0]) % patch_size[0]
    pad_width = (patch_size[1] - width % patch_size[1]) % patch_size[1]
    return cv2.copyMakeBorder(image, 0, pad_height, 0, pad_width, cv2.BORDER_CONSTANT, value=0)

def split_image(image, num_parts):
    height, width = image.shape[:2]
    part_width = width // num_parts
    return [image[:, i * part_width:(i + 1) * part_width] for i in range(num_parts)]

def generate_mask(image, model, patch_size):
    height, width = image.shape[:2]
    pad_height = (patch_size[0] - height % patch_size[0]) % patch_size[0]
    pad_width = (patch_size[1] - width % patch_size[1]) % patch_size[1]
    padded_image = cv2.copyMakeBorder(image, 0, pad_height, 0, pad_width, cv2.BORDER_CONSTANT, value=0)
    patches = [padded_image[y:y + patch_size[0], x:x + patch_size[1]] / 255.0
               for y in range(0, padded_image.shape[0], patch_size[0])
               for x in range(0, padded_image.shape[1], patch_size[1])]
    patches = np.array(patches)[..., np.newaxis]
    predicted_patches = model.predict(patches)
    predicted_patches = (predicted_patches > 0.5).astype(np.uint8) * 255
    reconstructed_mask = np.zeros_like(padded_image, dtype=np.uint8)
    idx = 0
    for y in range(0, padded_image.shape[0], patch_size[0]):
        for x in range(0, padded_image.shape[1], patch_size[1]):
            reconstructed_mask[y:y + patch_size[0], x:x + patch_size[1]] = predicted_patches[idx].squeeze()
            idx += 1
    return reconstructed_mask[:height, :width]

def detect_root_tip_with_skeletonization(mask, kernel_size=10, closing_iterations=3, min_area=400):
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (kernel_size, kernel_size))
    improved_mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=closing_iterations)
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(improved_mask, connectivity=8)
    largest_component = max([(label, stats[label, cv2.CC_STAT_AREA]) for label in range(1, num_labels)],
                            key=lambda x: x[1], default=(None, None))
    if largest_component[1] < min_area:
        raise ValueError("No valid root tip detected in the skeletonized mask.")
    skeleton = skeletonize((labels == largest_component[0]).astype(np.uint8))
    skeleton_pixels = np.argwhere(skeleton > 0)
    topmost_pixel = tuple(skeleton_pixels.min(axis=0))
    lengths = {tuple(p): np.linalg.norm(p - topmost_pixel) for p in skeleton_pixels}
    bottommost_pixel = max(lengths, key=lengths.get)
    return bottommost_pixel

def convert_pixel_to_mm(root_tip_pixel, image_height, plate_height_mm):
    scale = plate_height_mm / image_height
    return (root_tip_pixel[1] * scale, root_tip_pixel[0] * scale, 0)

def convert_to_robot_coordinates(root_tip_mm, plate_position_robot):
    return [root_tip_mm[0] / 1000 + plate_position_robot[0],
            root_tip_mm[1] / 1000 + plate_position_robot[1],
            root_tip_mm[2] / 1000 + plate_position_robot[2]]

def inoculate_with_pid(env, root_tips_robot):
    pid_x = PIDController(kp=10, ki=5, kd=0.01)
    pid_y = PIDController(kp=10, ki=5, kd=0.01)
    pid_z = PIDController(kp=10, ki=5, kd=0.01)
    for idx, root_tip in enumerate(root_tips_robot):
        print(f"Starting inoculation for root tip {idx + 1} at {root_tip}")
        terminated = False
        truncated = False
        env.goal_position = np.array(root_tip)
        while not (terminated or truncated):
            current_position = np.array(env.get_current_position())
            error_x = env.goal_position[0] - current_position[0]
            error_y = env.goal_position[1] - current_position[1]
            error_z = env.goal_position[2] - current_position[2]
            control_x = pid_x.compute(error_x)
            control_y = pid_y.compute(error_y)
            control_z = pid_z.compute(error_z)
            action = np.array([control_x, control_y, control_z], dtype=np.float32)
            obs, reward, terminated, truncated, info = env.step(action)
            env.render()
            xy_error = np.linalg.norm(env.goal_position[:2] - current_position[:2])
            if xy_error < 0.001:
                print(f"Inoculating at position {current_position[:2]} (XY Accuracy Met)")
                print("Simulating inoculum drop...")
                break
        print(f"Finished inoculation for root tip {idx + 1}\n")

# Main Workflow
if __name__ == "__main__":
    # Parameters
    model_path = r"C:\Users\Edopi\Desktop\2024-25b-fai2-adsai-EdoardoPierezza231412\datalab_tasks\Task8\Edoardo_231412_undet_model256px_data augmentation_1.h5"
    patch_size = (256, 256)
    plate_position_robot = [0.10775, 0.088 - 0.026, 0.057]
    image_height = 2816
    plate_height_mm = 150

    # Initialize variables to store results
    root_tips_mm = []
    root_tips_robot = []

    try:
        # Load the mask model
        model = load_model(model_path, custom_objects={"f1": f1, "dice_loss": dice_loss})

        # Capture the image and process it
        env = OT2Env(render=True)
        image_path = env.image()
        image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

        if image is None:
            raise ValueError("Failed to load the image. Check the image path or capture process.")

        cropped_image, bbox, success = crop_petri_dish(image, patch_size)
        if not success:
            raise RuntimeError("Failed to detect and crop the Petri dish.")

        splits = split_image(cropped_image, num_parts=5)

        # Analyze all root tips
        for idx, split in enumerate(splits):
            print(f"Processing split {idx + 1} of {len(splits)}")
            try:
                mask = generate_mask(split, model, patch_size)
                root_tip_pixel = detect_root_tip_with_skeletonization(mask, kernel_size=10, closing_iterations=3, min_area=400)
                root_tip_mm = convert_pixel_to_mm(root_tip_pixel, image_height // len(splits), plate_height_mm // len(splits))
                root_tip_robot = convert_to_robot_coordinates(root_tip_mm, plate_position_robot)
                root_tips_robot.append(root_tip_robot)
                print(f"Split {idx + 1} root tip in robot coordinates: {root_tip_robot}")
            except Exception as e:
                print(f"Warning: No root detected in split {idx + 1}. Skipping to the next split. Error: {e}")

        # Perform inoculation for all detected root tips
        print("Starting inoculation process...")
        inoculate_with_pid(env, root_tips_robot)
        print("Inoculation process completed.")

    except Exception as e:
        print(f"An error occurred: {e}")

    finally:
        print("Closing the environment...")
        env.close()


Image captured and saved at: textures/_plates/038_43-13-ROOT1-2023-08-08_control_pH7_-Fe+B_col0_04-Fish Eye Corrected.png
Processing split 1 of 5
Split 1 root tip in robot coordinates: [0.1263468028419183, 0.11352753108348135, 0.057]
Processing split 2 of 5
Split 2 root tip in robot coordinates: [0.12490808170515097, 0.11640497335701598, 0.057]
Processing split 3 of 5
Split 3 root tip in robot coordinates: [0.12181749555950266, 0.11347424511545293, 0.057]
Processing split 4 of 5
Split 4 root tip in robot coordinates: [0.12480150976909414, 0.11832326820603908, 0.057]
Processing split 5 of 5
Split 5 root tip in robot coordinates: [0.1163823268206039, 0.12050799289520425, 0.057]
Starting inoculation process...
Starting inoculation for root tip 1 at [0.1263468028419183, 0.11352753108348135, 0.057]
Step 1 called.
Rendering: Goal Position [0.1263468  0.11352753 0.057     ]
Step 2 called.
Rendering: Goal Position [0.1263468  0.11352753 0.057     ]
Step 3 called.
Rendering: Goal Position [0.12