# Ground Projection

In [None]:
import os
import cv2
import numpy as np

from dt_computer_vision.camera import CameraModel
from dt_computer_vision.camera.calibration.extrinsics.boards import CalibrationBoardDD24

this_dir: str = os.path.abspath("")
assets_dir: str = os.path.join(this_dir, "..", "assets")
image_fpath: str = os.path.join(
    assets_dir, "extrinsics/dd24/real-world/scenario0/image-0.png"
)
image: np.ndarray = cv2.imread(image_fpath)
board = CalibrationBoardDD24

# Load camera model from YAML file
yaml_file = os.path.join(
    assets_dir,
    "extrinsics",
    "dd24",
    "real-world",
    "scenario1",
    "calibration-intrinsic-dd24.yaml",
)

with open(yaml_file, "r") as file:
    yaml_content = file.read()
    camera = CameraModel.from_ros_calibration(yaml_content)

assert image.shape == (
    camera.height,
    camera.width,
    3,
), f"Image shape: {image.shape}, Camera shape: {*camera.get_shape(), 3}"

In [None]:
# Load ground homography from YAML file
import os
import yaml

from dt_computer_vision.camera.homography import Homography
import numpy as np

this_dir: str = os.path.abspath("")
assets_dir: str = os.path.join(this_dir, "..", "assets")

yaml_file = os.path.join(
    assets_dir,
    "extrinsics",
    "dd24",
    "real-world",
    "scenario0",
    "homography.yaml",
)

with open(yaml_file, "r") as file:
    yaml_content = file.read()
    homography = yaml.safe_load(yaml_content)

H = Homography(np.array(homography["homography"]).reshape(3, 3))

print(H)

In [None]:
from matplotlib import pyplot as plt

from dt_computer_vision.camera.homography import interpolate_homography

VIRTUAL_CAMERA_HEIGHT = 0.3

image = cv2.imread(
    os.path.join(assets_dir, "extrinsics/dd24/real-world/scenario0/image-0.png")
)

R2 = np.eye(3)
tvec2 = np.array([0, 0.0, VIRTUAL_CAMERA_HEIGHT]).reshape(3, 1)

H_dt = interpolate_homography(H, tvec2, R2, camera)

print("Interpolated homography:", H_dt)

camera.H = H_dt
camera.__post_init__()
rectified = camera.rectifier.rectify(image)

img = cv2.warpPerspective(rectified, H_dt, (camera.width, camera.height))
plt.imshow(img)


In [None]:
# iterate over each pixel of the rectified image

destination_img = np.zeros((camera.height, camera.width, 3), dtype=np.uint8)

print(H_dt.inverse)

for y in range(camera.height):
    for x in range(camera.width):
        # project the pixel to the ground plane
        p = np.array([x, y, 1]).reshape(3, 1)
        p_ground = H_dt.inverse @ p
        p_ground /= p_ground[2]

        if 0 <= p_ground[0] < camera.width and 0 <= p_ground[1] < camera.height:
            destination_img[y, x] = rectified[int(p_ground[1]), int(p_ground[0])]

plt.imshow(destination_img)

# Debugging pipeline

This pipeline is used for debugging purposes, as projecting each image is computationally expensive.

The pipeline for each image is:

1. Rectification
1. Projection (using the synthetized `H_dt` homography)
1. Motion vectors computation.
1. Debug visualization

In [None]:
from typing import List, Optional
import cv2
import numpy as np

import imageio
from IPython.display import Image, display
import io

from dt_computer_vision.camera.types import Rectifier

# Load the image
image = cv2.imread(
    os.path.join(assets_dir, "extrinsics/dd24/real-world/scenario0/image-0.png")
)


def simulate_translation(
    image, H_dt : Optional[Homography], camera : CameraModel, rectifier: Optional[Rectifier], translation_time : float = 3.0, num_frames : int = 30
) -> List:
    # Get the image width and height
    height, width = image.shape[:2]

    # Define the translation amount
    translation_amount = int(width / 2)

    # Create a black image with the same size as the original image
    black_image = np.zeros_like(image)

    # Generate the sequence of translated images
    sequence = []
    if rectifier is not None:
        # Apply the rectification to the image
        rectified_image = rectifier.rectify(image)

    for type in range(num_frames):
        # Calculate the translation amount for the current frame
        translation = int((type / num_frames) * translation_amount)

        # Create a translation matrix
        M = np.array([[1, 0, translation], [0, 1, 0]], dtype=np.float32)

        if H_dt is not None:
            # Apply the homography to the image
            image = cv2.warpPerspective(rectified_image, H_dt, (camera.width, camera.height))

        # Apply the translation to the original image
        translated_image = cv2.warpAffine(
            image,
            M,
            (width, height),
            borderMode=cv2.BORDER_CONSTANT,
            borderValue=(0, 0, 0),
        )

        # Combine the translated image with the black image to fill the missing pixels
        final_image = cv2.bitwise_or(translated_image, black_image)

        # Add the final image to the sequence
        sequence.append(final_image)

    return sequence


translation_time = 3

sequence_top_view = simulate_translation(
    image, H_dt, camera, camera.rectifier, translation_time, num_frames=30
)


def gif_from_images(images: List, duration: float) -> Image:
    buffer = io.BytesIO()
    imageio.mimsave(buffer, images, format="GIF", duration=duration)
    buffer.seek(0)
    return Image(data=buffer.read(), format="GIF")


# Display the gif
display(gif_from_images(sequence_top_view, translation_time / len(sequence_top_view)))


In [None]:

def gif_from_images(images: List, duration: float) -> Image:
    buffer = io.BytesIO()
    imageio.mimsave(buffer, images, format="GIF", duration=duration)
    buffer.seek(0)
    return Image(data=buffer.read(), format="GIF")


# Display the gif
display(gif_from_images(sequence_top_view, translation_time / len(sequence_top_view)))


In [None]:
import cv2

from IPython.display import display

# Compute optical flow using the OpticalFlow class
from dt_computer_vision.optical_flow.optical_flow import OpticalFlow


config = {
    "process_frequency": 20,
    "track_len": 10,
    "detect_interval": 5,
    "img_scale": 1,
}

VIS_SCALE = int(1 / config["img_scale"])

# Create an instance of the OpticalFlow class
optical_flow = OpticalFlow(
    track_len=config["track_len"],
    detect_interval=config["detect_interval"],
    resize_scale=config["img_scale"],
)

SEQUENCE_FRAME_COUNT = len(sequence_top_view)
delta_t = translation_time / SEQUENCE_FRAME_COUNT

debug_images = []
debug_strs = []
disp_vectors = []
vel_motion_vectors = []
features_locations = []

# Iterate over sequence_top_view picking two consecutive frames
for image in sequence_top_view:
    # Compute the optical flow
    displacements_array, velocities_arr, locations_px, debug_str = (
        optical_flow.compute_motion_vectors(image, delta_t)
    )

    vis = optical_flow.create_debug_visualization(
        image, locations_px, debug_str
    )

    debug_images.append(vis)
    features_locations.append(locations_px)
    disp_vectors.append(displacements_array)
    vel_motion_vectors.append(velocities_arr)
    debug_strs.append(debug_str)
print(f"The debug image has shape: {debug_images[0].shape}")

# Display the optical flow visualization
display(gif_from_images(debug_images, delta_t))

# Production pipeline

1. Motion vectors computation
1. Rectification of motion vectors
1. Projection of motion vectors
1. Debug visualization

In [None]:
"""
Generate a sequence of unprojected images equivalent to the sequence of projected images.
"""
# Load the image
image = cv2.imread(
    os.path.join(assets_dir, "extrinsics/dd24/real-world/scenario0/image-0.png")
)

sequence = simulate_translation(image, H_dt, camera, camera.rectifier, translation_time)

sequence = [
    camera.rectifier.distort(
        cv2.warpPerspective(image, np.linalg.inv(H_dt), (camera.height, camera.width))
    )
    for image in sequence
]

display(gif_from_images(sequence, delta_t))


In [None]:
from dt_computer_vision.camera.types import Pixel
from dt_computer_vision.optical_flow.optical_flow import OpticalFlow

optical_flow = OpticalFlow(
    track_len=config["track_len"],
    detect_interval=config["detect_interval"],
    resize_scale=config["img_scale"],
)

VIS_SCALE = 1


SEQUENCE_FRAME_COUNT = len(sequence)
delta_t = translation_time / SEQUENCE_FRAME_COUNT

debug_images = []
debug_strs = []
disp_vectors = []
vel_motion_vectors = []
features_locations = []
debug_images_projected = []

# Iterate over sequence picking two consecutive frames
for image in sequence:
    # Rectify the image
    image = camera.rectifier.rectify(image)

    # Compute the optical flow
    displacements_array, velocities_arr, locations_px, debug_str = (
        optical_flow.compute_motion_vectors(image, delta_t)
    )

    projected_motion_vectors, projected_locations = optical_flow.project_motion_vectors(
        velocities_arr, locations_px, camera, H_dt
    )

    locations_px_resized = [
        Pixel(*(loc / optical_flow.resize_scale).as_array()) for loc in locations_px
    ]

    projected_image = cv2.warpPerspective(image, H_dt, (camera.width, camera.height))

    vis_original = optical_flow.create_debug_visualization(
        image, locations_px_resized, debug_str, motion_vectors=velocities_arr
    )

    vis = optical_flow.create_debug_visualization(
        projected_image,
        projected_locations,
        debug_str,
        motion_vectors=velocities_arr,
    )

    debug_images.append(vis_original)
    debug_images_projected.append(vis)
    features_locations.append(locations_px_resized)
    disp_vectors.append(displacements_array)
    vel_motion_vectors.append(velocities_arr)
    debug_strs.append(debug_str)

# Display the optical flow visualizations side by side
display(gif_from_images(debug_images, delta_t))

In [None]:
# Display the projected motion vectors
display(gif_from_images(debug_images_projected, delta_t))

In [None]:
# Measure the performance of the optical flow algorithm

import time

t_start = time.perf_counter()

base_homography_pixel_per_meter = 900

velocities_px_s = []
velocities_m_s = []

for image in sequence:
    # Rectify the image
    image = camera.rectifier.rectify(image)

    # Compute the optical flow
    displacements_array, velocities_arr, locations_px, debug_str = (
        optical_flow.compute_motion_vectors(image, delta_t)
    )

    projected_motion_vectors, projected_locations = optical_flow.project_motion_vectors(
        velocities_arr, locations_px, camera, H_dt
    )
    velocity = optical_flow.compute_velocity_vector(velocities_arr)
    velocities_px_s.append(velocity)
    velocities_m_s.append(velocity / base_homography_pixel_per_meter)
    
t_end = time.perf_counter()

print(f"Optical flow ran at {len(sequence) / (t_end - t_start)} FPS")
print(f"Mean velocity in pixels per second: {np.mean(velocities_px_s)}")
print(f"Mean velocity in meters per second: {np.mean(velocities_m_s)}")