In [None]:
import math
import re
from os import path
from typing import Optional, Tuple, Dict, List

import matplotlib.pyplot as plt
from commonroad.common.util import Interval
from commonroad.scenario.scenario import Tag
from commonroad.visualization.mp_renderer import MPRenderer
from matplotlib.patches import Rectangle, Patch

from BatchConversion.AnalysisFunctions import middle_blue, gray, orange, blue, dark_blue, green
from OpenSCENARIO2CR.ConversionAnalyzer.SpotAnalyzer import SpotAnalyzer
from OpenSCENARIO2CR.OpenSCENARIOWrapper.Esmini.EsminiWrapperProvider import EsminiWrapperProvider
from OpenSCENARIO2CR.OpenSCENARIOWrapper.StoryBoardElement import EStoryBoardElementLevel
from OpenSCENARIO2CR.Osc2CrConverter import Osc2CrConverter
from OpenSCENARIO2CR.Osc2CrConverterResult import Osc2CrConverterResult
from OpenSCENARIO2CR.util.AbsRel import AbsRel
from OpenSCENARIO2CR.util.PPSBuilder import PPSBuilder

In [None]:
scenario_path = "/path/to/esmini/resources/xosc/acc-test.xosc"
run_viewer = False
render_to_gif = False
draw_plots = True

assert path.exists(scenario_path)

In [None]:
# Setup EsminiWrapper
esmini_wrapper = EsminiWrapperProvider().provide_esmini_wrapper()
esmini_wrapper.min_time = 13
esmini_wrapper.max_time = 120.0
esmini_wrapper.grace_period = 1.0
esmini_wrapper.ignored_level = EStoryBoardElementLevel.ACT
esmini_wrapper.log_to_console = False
esmini_wrapper.log_to_file = False
esmini_wrapper.random_seed = 0

In [None]:
if run_viewer:
    wrapper = EsminiWrapperProvider().provide_esmini_wrapper()
    wrapper.grace_period = None
    wrapper.max_time = 60.0
    wrapper.ignored_level = EStoryBoardElementLevel.ACT
    wrapper.view_scenario(scenario_path)
    raise Exception

In [None]:
if render_to_gif:
    esmini_wrapper.render_scenario_to_gif(scenario_path, "scenario.gif")

In [None]:
pps_builder = PPSBuilder()
pps_builder.time_interval = AbsRel(Interval(-10, 0), AbsRel.EUsage.REL_ADD)
pps_builder.pos_length = AbsRel(50, AbsRel.EUsage.ABS)
pps_builder.pos_width = AbsRel(10, AbsRel.EUsage.ABS)
pps_builder.pos_rotation = AbsRel(0, AbsRel.EUsage.REL_ADD)
pps_builder.pos_center_x = AbsRel(0, AbsRel.EUsage.REL_ADD)
pps_builder.pos_center_y = AbsRel(0, AbsRel.EUsage.REL_ADD)
pps_builder.velocity_interval = AbsRel(Interval(-5, 5), AbsRel.EUsage.REL_ADD)
pps_builder.orientation_interval = None

In [None]:
from OpenSCENARIO2CR.ConversionAnalyzer.STLAnalyzer import STLAnalyzer
from OpenSCENARIO2CR.ConversionAnalyzer.DrivabilityAnalyzer import DrivabilityAnalyzer

converter = Osc2CrConverter(
    author="ADD AUTHOR HERE",
    affiliation="ADD AFFILIATION HERE",
    source="ADD SOURCE HERE",
    tags={Tag.SIMULATED},
)

converter.sim_wrapper = esmini_wrapper
converter.pps_builder = pps_builder

converter.dt_cr = 0.1
converter.keep_ego_vehicle = True
converter.trim_scenario = False
converter.use_implicit_odr_file = True
converter.analyzers = {
    SpotAnalyzer: SpotAnalyzer(stride=1),
    DrivabilityAnalyzer: None,  # Will be initialized with default parameters
    STLAnalyzer: None,  # Will be initialized with default parameters
}
# If you only want to run with default parameters for analyzers you can also use:
# converter.analyzers = [EAnalyzer.SPOT_ANALYZER, EAnalyzer.DRIVABILITY_CHECKER, EAnalyzer.STL_MONITOR]

converter.dt_sim = 0.01
converter.odr_file_override = None
converter.ego_filter = re.compile(r".*ego.*", re.IGNORECASE)

In [None]:
result = converter.run_conversion(scenario_path)

In [None]:
assert isinstance(result, Osc2CrConverterResult)

scenario = result.scenario
pps = result.planning_problem_set
analysis = result.analysis
stats = result.statistics

In [None]:
def draw_obstacle_in_color(renderer, obstacle, color, trajectory_color: Optional = None, time: int = 0):
    assert renderer is not None
    assert obstacle is not None
    assert color is not None
    renderer.draw_dynamic_obstacle(
        obstacle,
        call_stack=(),
        draw_params={
            "time_begin": time,
            "time_end": obstacle.prediction.final_time_step,
            "trajectory": {
                "draw_trajectory": trajectory_color is not None,
                "draw_continuous": True,
                "line_width": 1.5,
                "facecolor": "#000000" if trajectory_color is None else trajectory_color,
            },
            "dynamic_obstacle": {
                "draw_shape": True,
                "vehicle_shape": {
                    "occupancy": {
                        "shape": {
                            "rectangle": {
                                "facecolor": color,
                                "edgecolor": "black",
                            },
                        }
                    },
                },
            },
        }
    )

In [None]:
def draw_overview(x_size=5, y_size=4):
    highlight_color = blue
    highlight_color_darker = "#005293"
    normal_color = orange
    normal_color_darker = "#994d17"

    rnd = MPRenderer(figsize=(2 * x_size, 0.7 * y_size), plot_limits=[-10, 260, -30, 30], )
    scenario.draw(rnd, draw_params={
        "trajectory": {"draw_trajectory": False},
        "dynamic_obstacle": {"draw_shape": False, },
        "time_begin": 0,
        "time_end": scenario.dynamic_obstacles[0].prediction.final_time_step,

    })
    draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[0], highlight_color, highlight_color_darker, time=0)
    for i in range(1, len(scenario.dynamic_obstacles)):
        draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[i], normal_color, normal_color_darker, time=0)
    pps.draw(rnd)
    rnd.render()
    plt.tight_layout()
    plt.savefig("overview.png")
    plt.show()


def draw_with_spot(time_steps_with_spot: List[int], lim_y=10, x_offset=15, x_size=5, y_size=4):
    lim_x = lim_y * 3
    highlight_color = blue
    highlight_color_darker = "#005293"
    normal_color = orange
    normal_color_darker = "#994d17"
    for t in time_steps_with_spot:
        x_pos = scenario.dynamic_obstacles[0].prediction.trajectory.state_list[t].position[0]
        lower_tick = x_pos - lim_x + x_offset
        upper_tick = x_pos + lim_x + x_offset - 3
        lower_tick = math.ceil(lower_tick / 10) * 10
        upper_tick = math.floor(upper_tick / 10) * 10
        x_ticks = list(range(lower_tick, upper_tick + 1, 10))

        fig, axs = plt.subplots(2, 1, tight_layout=True, figsize=(x_size, y_size))
        assert isinstance(fig, plt.Figure)

        rnd = MPRenderer(
            ax=axs[0],
            draw_params={"focus_obstacle_id": scenario.dynamic_obstacles[0].obstacle_id},
            plot_limits=[-lim_x + x_offset, lim_x + x_offset, -lim_y, lim_y],
        )
        scenario.draw(rnd, draw_params={
            "trajectory": {"draw_trajectory": False},
            "dynamic_obstacle": {"draw_shape": False, },
            "time_begin": t
        })
        draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[0], highlight_color, highlight_color_darker, time=t)
        for i in range(1, len(scenario.dynamic_obstacles)):
            draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[i], normal_color, normal_color_darker, time=t)
        pps.draw(rnd)
        rnd.render()

        res = analysis[SpotAnalyzer][1][stats.ego_vehicle]
        restore = {}
        for o_id, o_prediction in res.predictions[t].items():
            if o_id == scenario.dynamic_obstacles[0].obstacle_id:
                continue
            restore[o_id] = scenario.obstacle_by_id(o_id).prediction
            scenario.obstacle_by_id(o_id).prediction = o_prediction

        try:
            rnd = MPRenderer(
                ax=axs[1],
                draw_params={"focus_obstacle_id": scenario.dynamic_obstacles[0].obstacle_id},
                plot_limits=[-lim_x + x_offset, lim_x + x_offset, -lim_y, lim_y],
            )
            scenario.draw(rnd, draw_params={
                "trajectory": {"draw_trajectory": False},
                "dynamic_obstacle": {"draw_shape": False, },
                "time_begin": t
            })
            draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[0], highlight_color, highlight_color_darker, time=t)
            for o_id, o_prediction in restore.items():
                scenario.obstacle_by_id(o_id).prediction = o_prediction
                draw_obstacle_in_color(rnd, scenario.obstacle_by_id(o_id), "#E37222", None, time=t)

            pps.draw(rnd)
            rnd.render()
            axs[0].set_xticks(x_ticks)
            axs[1].set_xticks(x_ticks)
            fig.savefig(f"step-{t}.png")
            fig.show()
        finally:
            for o_id, o_prediction in restore.items():
                scenario.obstacle_by_id(o_id).prediction = o_prediction


def draw_no_spot(time_steps_no_spot: List[int], lim_y=10, x_offset=15, x_size=5, y_size=4):
    lim_x = lim_y * 3
    highlight_color = blue
    highlight_color_darker = "#005293"
    normal_color = orange
    normal_color_darker = "#994d17"

    for t in time_steps_no_spot:
        rnd = MPRenderer(
            figsize=(x_size, 0.5 * y_size),
            draw_params={"focus_obstacle_id": scenario.dynamic_obstacles[0].obstacle_id},
            plot_limits=[-lim_x + x_offset, lim_x + x_offset, -lim_y, lim_y],
        )
        scenario.draw(rnd, draw_params={
            "trajectory": {"draw_trajectory": False},
            "dynamic_obstacle": {"draw_shape": False, },
            "time_begin": t,
            "time_end": scenario.dynamic_obstacles[0].prediction.final_time_step,
        })
        draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[0], highlight_color, highlight_color_darker, time=t)
        for i in range(1, len(scenario.dynamic_obstacles)):
            draw_obstacle_in_color(rnd, scenario.dynamic_obstacles[i], normal_color, normal_color_darker, time=t)
        pps.draw(rnd)
        rnd.render()
        plt.tight_layout()
        plt.savefig(f"step-{t}.png")
        plt.show()


if draw_plots:
    draw_overview()
    draw_no_spot([60])
    draw_with_spot([45, 75])


In [None]:
def setup(y_lim, highlight_zero: bool):
    size = 0.025 * abs(y_lim[1] - y_lim[0])
    plt.figure(figsize=(6, 3))
    plt.ylim([y_lim[0] - size, y_lim[1] + size])
    plt.xlim([-0.5, 22.8])
    if highlight_zero:
        plt.axhline(y=0, color="black", lw=0.6)
    plt.axvline(x=5, color=gray, ls="dotted")  # Schwenker start
    plt.axvline(x=8.4, color=gray, ls="dotted")  # Schwenker end
    plt.gca().add_patch(Rectangle((5, y_lim[0] - size), 3.4, size, linewidth=0, facecolor=gray))
    plt.gca().add_patch(Rectangle((5, y_lim[1]), 3.4, size, linewidth=0, facecolor=gray))

    plt.axvline(x=11, color=gray, ls="dotted")  # Brake start
    plt.axvline(x=13, color=gray, ls="dotted")  # Brake end
    plt.gca().add_patch(Rectangle((11, y_lim[0] - size), 2, size, linewidth=0, facecolor=gray))
    plt.gca().add_patch(Rectangle((11, y_lim[1]), 2, size, linewidth=0, facecolor=gray))

    plt.axvline(x=17, color=gray, ls="dotted")  # Acc start
    plt.axvline(x=20, color=gray, ls="dotted")  # Acc end / Brake start
    plt.axvline(x=21.3, color=gray, ls="dotted")  # Brake end
    plt.gca().add_patch(Rectangle((17, y_lim[0] - size), 3, size, linewidth=0, facecolor=gray))
    plt.gca().add_patch(Rectangle((20, y_lim[0] - size), 1.3, size, linewidth=0, facecolor=gray))
    plt.gca().add_patch(Rectangle((17, y_lim[1]), 3, size, linewidth=0, facecolor=gray))
    plt.gca().add_patch(Rectangle((20, y_lim[1]), 1.3, size, linewidth=0, facecolor=gray))
    plt.xlabel("t [s]")
    plt.gcf().subplots_adjust(bottom=0.15)


def plot_ruleset(rule_set: str):
    setup([-1, 1], highlight_zero=True)
    times = [t / 10 for t in range(0, len(analysis[STLAnalyzer][1]["Ego"].r_g1))]
    for i, obstacle in enumerate(scenario.dynamic_obstacles):
        times = []
        for t in range(0, len(obstacle.prediction.trajectory.state_list)):
            times.append(t / 10)
        # plt.plot(times, velocities)
    plt.plot(times, getattr(analysis[STLAnalyzer][1]["Target"], rule_set), color=orange, label="Target")
    plt.plot(times, getattr(analysis[STLAnalyzer][1]["Ego"], rule_set), color=blue, label="Ego")
    plt.legend(loc="lower left")
    plt.ylabel(rule_set.upper())
    plt.savefig(f"{rule_set}.png")
    plt.show()


def plot_state_variable(variable: str, unit: str, highlight_zero: bool):
    values = {}
    times = [t / 10 for t in range(0, len(analysis[STLAnalyzer][1]["Ego"].r_g1))]
    for i, obstacle in enumerate(scenario.dynamic_obstacles):
        values[i] = []
        for t in range(0, len(obstacle.prediction.trajectory.state_list)):
            state = obstacle.prediction.trajectory.state_list[t]
            values[i].append(getattr(state, variable))
    values_min = min([min(v) for v in values.values()])
    values_max = max([max(v) for v in values.values()])
    setup([values_min - 0.1, values_max + 0.1], highlight_zero=highlight_zero)
    plt.plot(times, values[1], color=orange, label="Target")
    plt.plot(times, values[0], color=blue, label="Ego")
    plt.legend(loc="lower left")
    plt.ylabel(f"{variable} [{unit}]")
    plt.savefig(f"state_{variable}.png")
    plt.show()

def plot_running_elements():
    colors = {
        1: middle_blue,
        2: blue,
        3: dark_blue,
        4: orange
    }

    def draw_rectangle(start_time: Tuple[float, int], end_t: float, level):
        plt.gca().add_patch(Rectangle(
            (start_time[0], level.value - 0.25),
            abs(end_t - start_time[0]), 0.5,
            linewidth=0,
            facecolor=colors[min(max(0, start_time[1]), 4)]
        ))

    setup([0.5, len(EStoryBoardElementLevel) - 0.5], highlight_zero=False)
    considered_levels = [
        lvl for lvl in EStoryBoardElementLevel if lvl is not EStoryBoardElementLevel.UNDEFINED_ELEMENT_TYPE
    ]
    plt.yticks([lvl.value for lvl in considered_levels], [lvl.name[-6:] for lvl in considered_levels])
    lvl_start_times: Dict[EStoryBoardElementLevel, Optional[Tuple[float, int]]] = {lvl: None for lvl in
                                                                                   considered_levels}
    last_t = 0
    for t, running_elements in sorted(result.running_storyboard_elements.items()):
        for lvl in considered_levels:
            running_count = running_elements[lvl]
            if running_count > 0:
                if lvl_start_times[lvl] is None:
                    lvl_start_times[lvl] = (t, running_count)
                elif lvl_start_times[lvl][1] != running_count:
                    draw_rectangle(lvl_start_times[lvl], last_t, lvl)
                    lvl_start_times[lvl] = (t, running_count)
            elif running_count == 0 and lvl_start_times[lvl] is not None:
                draw_rectangle(lvl_start_times[lvl], last_t, lvl)
                lvl_start_times[lvl] = None
        last_t = t
    for lvl in lvl_start_times.keys():
        if lvl_start_times[lvl] is not None:
            draw_rectangle(lvl_start_times[lvl], last_t, lvl)
            lvl_start_times[lvl] = None
    plt.legend(handles=[
        Patch(label="1", color=colors[1]),
        Patch(label="2", color=colors[2]),
        Patch(label="3", color=colors[3]),
        Patch(label="4+", color=colors[4]),
    ], ncol=4, bbox_to_anchor=(0, 1, 1, .101), mode="expand")
    plt.savefig(f"runnning_events.png")
    plt.show()

if draw_plots:
    plot_ruleset("r_g1")
    plot_ruleset("r_g2")
    plot_ruleset("r_g3")
    plot_state_variable("velocity", "m/s", highlight_zero=False)
    plot_state_variable("steering_angle", "deg", highlight_zero=False)
    plot_running_elements()
