In [None]:
from d123.dataset.scene.scene_builder import ArrowSceneBuilder
from d123.dataset.scene.scene_filter import SceneFilter

from nuplan.planning.utils.multithreading.worker_sequential import Sequential
# from nuplan.planning.utils.multithreading.worker_ray import RayDistributed

In [None]:
split = "nuplan_mini_val"

log_names = None
scene_tokens = None

# log_names = ["2021.06.07.12.54.00_veh-35_01843_02314"]
# scene_tokens = ["2283aea39bc1505e"]

scene_filter = SceneFilter(
    split_names=[split],
    log_names=log_names,
    scene_tokens=scene_tokens,
    duration_s=15.1,
    history_s=1.0,
)
scene_builder = ArrowSceneBuilder("/home/daniel/d123_workspace/data")
worker = Sequential()
# worker = RayDistributed()
scenes = scene_builder.get_scenes(scene_filter, worker)

print(f"Number of scenes: {len(scenes)}")


In [None]:
from d123.simulation.controller.action_controller import ActionController
from d123.simulation.controller.motion_model.kinematic_bicycle_model import KinematicBicycleModel
from d123.simulation.observation.log_replay_observation import LogReplayObservation
from d123.simulation.simulation_2d_setup import Simulation2DSetup
from d123.simulation.simulation_2d import Simulation2D


from d123.simulation.time_controller.log_time_controller import LogTimeController


def get_simulation_2d_setup():
    return Simulation2DSetup(
        time_controller=LogTimeController(),
        observations=LogReplayObservation(),
        ego_controller=ActionController(KinematicBicycleModel()),
    )



simulation_2d_setup = get_simulation_2d_setup()
simulation2d = Simulation2D(simulation_2d_setup)

In [None]:
# reset
from typing import List
from d123.simulation.gym.environment.gym_observation.raster.raster_gym_observation import RasterGymObservation
from d123.simulation.gym.environment.gym_observation.raster.raster_renderer import RasterRenderer
from d123.simulation.gym.environment.helper.environment_area import RectangleEnvironmentArea
from d123.simulation.planning.abstract_planner import PlannerInput
from d123.simulation.planning.planner_output.action_planner_output import ActionPlannerOutput


environment_area = RectangleEnvironmentArea()
gym_observation = RasterGymObservation(environment_area, RasterRenderer(environment_area), inference=True)
gym_observation.reset()


last_action = [0.0, 0.0]  # placeholder
info = {"last_action": last_action}

rasters = []

idx = 1338
planner_initialization, current_planner_input = simulation2d.reset(scenes[idx])
rasters.append(gym_observation.get_gym_observation(current_planner_input, planner_initialization, info))


# TODO: Implement action planner output
# TODO: Further test the simulation object.


def _get_action(planner_input: PlannerInput) -> ActionPlannerOutput:
    ego_state, _ = planner_input.history.current_state
    return ActionPlannerOutput(0.5, 0.0, ego_state)


while simulation2d.is_simulation_running():

    # 1. trigger planner
    planner_output = _get_action(current_planner_input)

    # 2. step simulation
    current_planner_input = simulation2d.step(planner_output)
    rasters.append(gym_observation.get_gym_observation(current_planner_input, planner_initialization, info))

In [None]:
initial_ego_state = simulation2d.history.data[0]
# iteration
# ego_state
# planner_output
# detections

len(simulation2d.history.data)

In [None]:
from pathlib import Path
from typing import List, Union

import numpy as np
import numpy.typing as npt
from PIL import Image


from d123.common.visualization.color.color import  (
    BLACK,
    DARK_GREY,
    DARKER_GREY,
    ELLIS_5,
    LIGHT_GREY,
    NEW_TAB_10,
)


def numpy_images_to_gif(
    numpy_images: List[npt.NDArray[np.uint8]], gif_path: Union[str, Path], duration: int = 50
) -> None:
    """
    Helper function to convert images into a GIF file.
    :param numpy_images: list of images as uint8 numpy arrays.
    :param gif_path: outout path for the GIF file.
    :param duration: duration between frames (TODO: check), defaults to 50
    """
    pil_images = [Image.fromarray(img) for img in numpy_images]
    pil_images[0].save(gif_path, save_all=True, append_images=pil_images[1:], duration=duration, loop=0)


def image_to_rbg(image: npt.NDArray[np.uint8]) -> npt.NDArray[np.uint8]:
    """
    Helper function to convert an observation image to RGB format.
    :param image: _description_
    :return: _description_
    """
    _, width, height = image.shape
    rgb_image = np.zeros((width, height, 3), dtype=np.uint8)
    rgb_image.fill(255)
    # drivable area
    rgb_image[image[0] > 0] = LIGHT_GREY.rgb
    rgb_image[image[1] > 0] = DARK_GREY.rgb
    rgb_image[image[2] > 0] = BLACK.rgb
    rgb_image[image[5] > 0] = DARKER_GREY.rgb

    rgb_image[image[3] == 80] = NEW_TAB_10[4].rgb
    rgb_image[image[3] == 255] = NEW_TAB_10[2].rgb
    # rgb_image[image[4] > 0] = ELLIS_5[1].rgb
    rgb_image[image[6] > 0] = ELLIS_5[4].rgb
    rgb_image[image[7] > 0] = NEW_TAB_10[6].rgb
    rgb_image[image[8] > 0] = ELLIS_5[0].rgb

    rgb_image = np.rot90(rgb_image[::-1])
    return rgb_image


numpy_images_to_gif(
    [image_to_rbg(raster["bev_semantics"]) for raster in rasters],
    gif_path="/home/daniel/d123_workspace/d123/notebooks/simulation_2d.gif",
    duration=100,
)

In [None]:
from pathlib import Path
from typing import Optional, Tuple, Union

import matplotlib.animation as animation
import matplotlib.pyplot as plt
from tqdm import tqdm

from d123.common.visualization.matplotlib.observation import (
    add_box_detections_to_ax,
    add_default_map_on_ax,
    add_ego_vehicle_to_ax,
    add_traffic_lights_to_ax,
)
from d123.dataset.scene.abstract_scene import AbstractScene
from d123.simulation.history.simulation_history import Simulation2DHistory, Simulation2DHistorySample
from d123.simulation.planning.abstract_planner import PlannerInitialization


def _plot_simulation_history_sample_on_ax(
    ax: plt.Axes,
    simulation_history: Simulation2DHistory,
    iteration: int = 0,
    radius: float = 80,
) -> plt.Axes:

    sample = simulation_history.data[iteration]
    map_api = simulation_history.scene.map_api

    ego_state = sample.ego_state
    # planner_output = sample.planner_output
    detections = sample.detections

    point_2d = ego_state.center.point_2d
    add_default_map_on_ax(ax, map_api, point_2d, radius=radius, route_lane_group_ids=None)
    add_traffic_lights_to_ax(ax, detections.traffic_light_detections, map_api)

    add_box_detections_to_ax(ax, detections.box_detections)
    add_ego_vehicle_to_ax(ax, ego_state)

    ax.set_xlim(point_2d.x - radius, point_2d.x + radius)
    ax.set_ylim(point_2d.y - radius, point_2d.y + radius)

    ax.set_aspect("equal", adjustable="box")
    ax.set_title(f"Iteration {iteration}")
    return ax


def plot_simulation_history_at_iteration(
    simulation_history: Simulation2DHistory,
    iteration: int = 0,
    radius: float = 80,
) -> Tuple[plt.Figure, plt.Axes]:

    fig, ax = plt.subplots(figsize=(10, 10))
    _plot_simulation_history_sample_on_ax(
        ax,
        simulation_history,
        iteration,
        radius,
    )
    return fig, ax


def render_simulation_history_animation(
    simulation_history: Simulation2DHistory,
    output_path: Union[str, Path],
    start_idx: int = 0,
    end_idx: Optional[int] = None,
    step: int = 10,
    fps: float = 20.0,
    dpi: int = 300,
    format: str = "mp4",
    radius: float = 100,
) -> None:
    assert format in ["mp4", "gif"], "Format must be either 'mp4' or 'gif'."
    output_path = Path(output_path)
    output_path.mkdir(parents=True, exist_ok=True)

    simulation_history.scene.open()
    end_idx = len(simulation_history)

    fig, ax = plt.subplots(figsize=(10, 10))

    def update(i):
        ax.clear()
        _plot_simulation_history_sample_on_ax(ax, simulation_history, i, radius)
        plt.subplots_adjust(left=0.05, right=0.95, top=0.95, bottom=0.05)
        pbar.update(1)

    frames = list(range(start_idx, end_idx, step))
    pbar = tqdm(total=len(frames), desc=f"Rendering {simulation_history.scene.log_name} as {format}")
    ani = animation.FuncAnimation(fig, update, frames=frames, repeat=False)

    ani.save(
        output_path / f"{simulation_history.scene.log_name}_{simulation_history.scene.token}.{format}",
        writer="ffmpeg",
        fps=fps,
        dpi=dpi,
    )
    plt.close(fig)
    simulation_history.scene.close()


render_simulation_history_animation(
    simulation_history=simulation2d.history,
    output_path=Path("/home/daniel/d123_workspace/d123/notebooks/animations"),
    format="mp4",
    step=1,
    fps=20.0,
)