In [None]:
from grid.robot.locomotion.isaac_locomotion import IsaacLocomotion
locomotionisaacsim_0 = IsaacLocomotion()

In [None]:
from grid.model.perception.depth.midas import MIDAS
from grid.utils.types import Image, Position, Velocity
from grid.utils.logger import log

import numpy as np
import time
import cv2

locomotionisaacsim_0.run()

In [None]:
def move(agent, speed, rad):
    """
    Moves the agent for a short duration using a given linear speed and angular velocity.
    """
    start_time = time.time()
    while time.time() - start_time < 0.1:
        agent.moveByVelocity(Velocity(speed, 0, 0), Velocity(0, 0, rad))


def get_formatted_midas_image(rgb_image: Image, depth_model):
    """
    Converts an RGB image to a normalized depth image using the depth model.
    """
    depth_image = depth_model.run(rgb_image.data)
    formatted = (depth_image * 255 / np.max(depth_image)).astype("uint8")
    return formatted


def divide_into_grid(depth_image: np.ndarray, num_horizontal_patches: int, num_vertical_patches: int):
    """
    Splits a depth image into a grid of patches.
    """
    patches = []
    patch_height = depth_image.shape[0] // num_vertical_patches
    patch_width = depth_image.shape[1] // num_horizontal_patches
    for v in range(num_vertical_patches):
        for h in range(num_horizontal_patches):
            patch = depth_image[v * patch_height:(v + 1) * patch_height,
                                h * patch_width:(h + 1) * patch_width]
            patches.append(patch)
    return patches


def compute_safety_metric(patches: list):
    """
    Computes a safety metric (mean depth) for each patch.
    """
    return [np.mean(patch) for patch in patches]


def determine_best_patch(safety_metrics, num_horizontal_patches, num_vertical_patches):
    """
    Finds the patch with the lowest mean depth (i.e. the safest patch).
    """
    metrics_grid = np.array(safety_metrics).reshape((num_vertical_patches, num_horizontal_patches))
    best_patch_index = np.unravel_index(np.argmin(metrics_grid), metrics_grid.shape)
    return best_patch_index


def check_blocked(patches: list, means: list, num_horizontal_patches: int, num_vertical_patches: int,
                  obs_threshold_ratio: float = 0.5, dist_threshold: float = 100):
    """
    Checks if the center patch is too blocked or if all patches have high depth values.
    """
    np_patches = np.array(patches)
    odd = len(patches) % 2
    middle_i = [len(patches) // 2]
    if not odd:
        middle_i.append(len(patches) // 2 - 1)
    for i in middle_i:
        patch = np_patches[i]
        patch[patch < 100] = 0
        patch[patch > 0] = 1
        print(f"Center patch has {np.mean(patch)} blocked ratio")
        if np.mean(patch) > obs_threshold_ratio:
            return True, -1
    np_means = np.array(means)
    print(f"Means: {np_means}")
    if np_means.all() > dist_threshold:
        return True, -1
    else:
        return False, determine_best_patch(means, num_horizontal_patches, num_vertical_patches)


def map_patch_to_steering(patch_index, num_horizontal_patches, num_vertical_patches):
    """
    Maps the chosen patch index to steering commands (yaw and pitch).
    """
    yaw = (patch_index[1] - num_horizontal_patches // 2) * (180 / num_horizontal_patches)
    pitch = (patch_index[0] - num_vertical_patches // 2) * (180 / num_vertical_patches)
    print("Steering command (yaw, pitch):", yaw, pitch)
    return yaw, pitch

In [None]:
def safenav_main(agent, rgb_image, depth_model):
    """
    Processes the RGB image to produce a depth map, divides it into patches, computes safety metrics,
    checks for obstacles, and steers the agent accordingly.
    """
    try:
        grid_size = 3  # Number of horizontal patches (vertical patches is set to 1)
        depth_image = get_formatted_midas_image(rgb_image, depth_model)
        log("grid/safenav/depth_image", Image(depth_image))
        
        patches_img = divide_into_grid(depth_image, grid_size, 1)
        means = compute_safety_metric(patches_img)
        blocked, best_patch_index = check_blocked(patches_img, means, grid_size, 1)
        
        if blocked:
            print("Path is blocked! Rotating in place.")
            move(agent, 0, 1)  # Rotate if blocked
            return

        yaw, pitch = map_patch_to_steering(best_patch_index, grid_size, 1)
        yaw_clip = (-30, 30)
        yaw = np.clip(yaw, yaw_clip[0], yaw_clip[1])
        print("Clipped yaw:", yaw)

        yaw_rad = np.deg2rad(yaw)
        move(agent, 1, -yaw_rad)
    except KeyboardInterrupt:
        raise
    except Exception as e:
        print(f"Error in safenav_main: {e}")


def main_navigation_loop(agent, camera_intrinsics=[128, 128, 128, 128]):
    """
    Continuously captures images from the agent, processes them for depth, and navigates safely.
    """
    depth = MIDAS()

    while True:
        rgb_image = agent.getImage()
        if rgb_image is None or rgb_image.data is None:
            print("RGB image is none, waiting...")
            time.sleep(1)
            continue
        
        log("grid/rgb_image", rgb_image)
        safenav_main(agent, rgb_image, depth)

In [None]:
def safenav_main(agent, rgb_image, depth_model):
    """
    Processes the RGB image to produce a depth map, divides it into patches, computes safety metrics,
    checks for obstacles, and steers the agent accordingly.
    """
    try:
        grid_size = 3  # Number of horizontal patches (vertical patches is set to 1)
        depth_image = get_formatted_midas_image(rgb_image, depth_model)
        log("grid/safenav/depth_image", Image(depth_image))
        
        patches_img = divide_into_grid(depth_image, grid_size, 1)
        means = compute_safety_metric(patches_img)
        blocked, best_patch_index = check_blocked(patches_img, means, grid_size, 1)
        
        if blocked:
            print("Path is blocked! Rotating in place.")
            move(agent, 0, 1)  # Rotate if blocked
            return

        yaw, pitch = map_patch_to_steering(best_patch_index, grid_size, 1)
        yaw_clip = (-30, 30)
        yaw = np.clip(yaw, yaw_clip[0], yaw_clip[1])
        print("Clipped yaw:", yaw)

        yaw_rad = np.deg2rad(yaw)
        move(agent, 1, -yaw_rad)
    except KeyboardInterrupt:
        raise
    except Exception as e:
        print(f"Error in safenav_main: {e}")


def main_navigation_loop(agent, camera_intrinsics=[128, 128, 128, 128]):
    """
    Continuously captures images from the agent, processes them for depth, and navigates safely.
    """
    depth = MIDAS()

    while True:
        rgb_image = agent.getImage()
        if rgb_image is None or rgb_image.data is None:
            print("RGB image is none, waiting...")
            time.sleep(1)
            continue
        
        log("grid/rgb_image", rgb_image)
        safenav_main(agent, rgb_image, depth)

In [None]:
def start():
    """
    Initializes and starts the safe navigation loop.
    """
    agent = locomotionisaacsim_0
    agent.run()
    main_navigation_loop(agent)

# Begin safe exploration using the navigation loop
start()