# Analyze the CAN bus data from nuscenes

In [None]:
import matplotlib.pyplot as plt

SMALL_SIZE = 14
MEDIUM_SIZE = 16
BIGGER_SIZE = 18

plt.rc("font", size=SMALL_SIZE)  # controls default text sizes
plt.rc("axes", titlesize=SMALL_SIZE)  # fontsize of the axes title
plt.rc("axes", labelsize=MEDIUM_SIZE)  # fontsize of the x and y labels
plt.rc("xtick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("ytick", labelsize=SMALL_SIZE)  # fontsize of the tick labels
plt.rc("legend", fontsize=SMALL_SIZE)  # legend fontsize
plt.rc("figure", titlesize=BIGGER_SIZE)  # fontsize of the figure title

In [None]:
from avapi.nuscenes import nuScenesManager


# data_dir = "/data/shared/nuScenes"
data_dir = "/home/data/nuScenes"
NSM = nuScenesManager(data_dir=data_dir, split="v1.0-mini")
NDM = NSM.get_scene_dataset_by_index(0)  # get first scene
dts_ahead = [1, 2, 3, 4, 5]

### Show trajectory

In [None]:
import numpy as np
import matplotlib.pyplot as plt


# get trajectory and timestamps
frames = NDM.get_frames(sensor=None)
ts_ego_tuple = [
    (
        NDM.get_timestamp(frame=frame),
        NDM.get_ego(frame=frame),
    )
    for frame in frames
]

# get data
t = np.array([tse[0] for tse in ts_ego_tuple])
xy = np.array([tse[1].position.x[:2] for tse in ts_ego_tuple])
vel = np.array([tse[1].velocity.x[:2] for tse in ts_ego_tuple])

# plot trajectory
fig, _ = plt.subplots(1, 1, figsize=(7, 6))
plt.plot(xy[:, 0], xy[:, 1])
plt.scatter(xy[0, 0], xy[0, 1], color="green", marker="o", label="start")
plt.scatter(xy[-1, 0], xy[-1, 1], color="red", marker="o", label="end")
plt.title("Trajectory")
plt.xlabel("X Position")
plt.ylabel("Y Position")
plt.legend()
plt.axis("equal")
plt.grid()
plt.show()

# plot speed profile
fig, _ = plt.subplots(1, 1, figsize=(7, 4))
plt.plot(t, np.linalg.norm(vel, axis=1))
plt.title("Speed Profile")
plt.xlabel("Time (s)")
plt.ylabel("Speed (m/s)")
plt.grid()
plt.show()

### Show control signals

In [None]:
import matplotlib.pyplot as plt

# get all control signal values over time
signals = [NDM.get_control_signal(frame, bounded=False) for frame in NDM.frames]

# show control signal plots
fig, axs = plt.subplots(3, 1, sharex=True)
axs[0].plot(
    [s.timestamp for s in signals],
    [s.brake for s in signals],
    color="blue",
    label="Brake",
)
axs[1].plot(
    [s.timestamp for s in signals],
    [s.steer for s in signals],
    color="green",
    label="Steer",
)
axs[2].plot(
    [s.timestamp for s in signals],
    [s.throttle for s in signals],
    color="orange",
    label="Throttle",
)
axs[0].set_title("Control signals over time")
axs[0].set_ylabel("Signal")
axs[1].set_ylabel("Signal")
axs[2].set_ylabel("Signal")
axs[2].set_xlabel("Time (s)")
axs[0].legend(loc="upper left")
axs[1].legend(loc="upper left")
axs[2].legend(loc="upper left")
plt.tight_layout()
plt.show()

### Write a simple rule-based classifier for the meta actions

In [None]:
from avlm.actions import Lateral, Longitudinal


def get_actions_by_lookahead(dt_ahead: float):
    # get trajectory
    xy = np.array([tsp[1].position.x[:2] for tsp in ts_ego_tuple])

    # each point on the trajectory gets a meta action classification
    meta_actions = {"lat": [], "lon": []}
    ts_plot = []
    xy_plot = []
    vel_plot = []
    for frame, xy_i, vel_i in zip(frames, xy, vel):
        # get the frame ahead
        timestamp = NDM.get_timestamp(frame=frame)
        timestamp_ahead = timestamp + dt_ahead
        if timestamp_ahead <= NDM.timestamps[-1] + 0.1:
            # get the look-ahead frame
            frame_ahead = NDM.get_frame_at_timestamp(timestamp_ahead)

            # get agent states
            agent_current = NDM.get_ego(frame=frame, agent=None)
            agent_future = NDM.get_ego(frame=frame_ahead, agent=None)

            # get meta action
            lat_action = Lateral.evaluate(
                agent_current=agent_current, agent_future=agent_future
            )
            lon_action = Longitudinal.evaluate(
                agent_current=agent_current, agent_future=agent_future
            )
            ts_plot.append(timestamp)
            xy_plot.append(xy_i)
            vel_plot.append(vel_i)
        else:
            lat_action = None
            lon_action = None

        # store results
        meta_actions["lat"].append(lat_action)
        meta_actions["lon"].append(lon_action)
    ts_plot = np.asarray(ts_plot)
    xy_plot = np.asarray(xy_plot)
    vel_plot = np.asarray(vel_plot)

    return ts_plot, xy_plot, vel_plot, meta_actions

In [None]:
from matplotlib import colormaps as cmaps
from collections import defaultdict


def plot_state_transition(meta_actions, ts_plot):
    # plot the results as a state transition diagrams
    meta_actions_list = [meta_actions["lat"], meta_actions["lon"]]
    fig, axs_state = plt.subplots(2, 1, sharex=True, figsize=(10, 10))
    for ax, cmap_name, action_hist, action_class in zip(
        axs_state, ["summer", "winter"], meta_actions_list, [Lateral, Longitudinal]
    ):
        # plot the line through and scatter points
        cmap = cmaps[cmap_name].resampled(len(action_class))(range(len(action_class)))
        actions_ints = [int(action) for action in action_hist if action is not None]
        ax.plot(ts_plot, actions_ints, color="black")
        ax.scatter(ts_plot, actions_ints, c=[cmap[a_int] for a_int in actions_ints])

        # get tick labels
        tick_y = [int(a) for a in action_class]
        tick_label = [str(a).split(".")[-1] for a in action_class]

        # adjust axes
        ax.set_title(action_class.__name__)
        ax.set_xlabel("Time (s)")
        ax.set_yticks(tick_y)
        ax.set_yticklabels(tick_label)
        ax.grid()
    plt.tight_layout()
    plt.show()


def overlay_action_on_plot(
    x_plot, y_plot, xlabel, ylabel, ax, cmap_name, action_hist, action_class
):
    # plot different segments
    cmap = cmaps[cmap_name].resampled(len(action_class))(range(len(action_class)))
    lines = defaultdict(list)
    i = 1
    while i < len(x_plot):
        j = i
        while j < len(action_hist):
            j += 1
            if action_hist[j] != action_hist[i]:
                break
        action_label = str(action_hist[i]).split(".")[-1]
        # NOTE: the -1 assigns the "ambiguous" region to the most recent segment
        lines[action_label].append(
            ax.plot(
                x_plot[i - 1 : j],
                y_plot[i - 1 : j],
                color=cmap[int(action_hist[i])],
                label=action_label,
            )[0]
        )
        i = j

    # add start/end points
    line_start = ax.scatter(
        x_plot[0], y_plot[0], color="green", marker="o", label="start"
    )
    line_end = ax.scatter(x_plot[-1], y_plot[-1], color="red", marker="o", label="end")

    # adjust axes
    ax.set_title(action_class.__name__)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    ax.legend(
        list(map(tuple, lines.values())) + [line_start, line_end],
        list(lines.keys()) + ["start", "end"],
        bbox_to_anchor=(0.9, 1.0),
    )
    ax.grid()
    ax.set_aspect("equal")


def overlay_on_speed_profile(ts_plot, vel_plot, meta_actions):
    # plot the results over the trajectory
    t_plot = ts_plot
    s_plot = np.linalg.norm(vel_plot, axis=1)
    meta_actions_list = [meta_actions["lat"], meta_actions["lon"]]
    fig, axs_traj = plt.subplots(2, 1, sharex=True, figsize=(12, 8))
    for ax, cmap_name, action_hist, action_class in zip(
        axs_traj, ["summer", "winter"], meta_actions_list, [Lateral, Longitudinal]
    ):
        overlay_action_on_plot(
            x_plot=t_plot,
            y_plot=s_plot,
            xlabel="Time (s)",
            ylabel="Speed (m/s)",
            ax=ax,
            cmap_name=cmap_name,
            action_hist=action_hist,
            action_class=action_class,
        )
    plt.tight_layout()
    plt.show()


def overlay_on_trajectory(xy_plot, meta_actions):
    # plot the results over the trajectory
    x_plot = xy_plot[:, 0]
    y_plot = xy_plot[:, 1]
    meta_actions_list = [meta_actions["lat"], meta_actions["lon"]]
    fig, axs_traj = plt.subplots(1, 2, sharey=True, figsize=(8, 8))
    for ax, cmap_name, action_hist, action_class in zip(
        axs_traj, ["summer", "winter"], meta_actions_list, [Lateral, Longitudinal]
    ):
        overlay_action_on_plot(
            x_plot=x_plot,
            y_plot=y_plot,
            xlabel="X Position (m)",
            ylabel="Y Position (m)",
            ax=ax,
            cmap_name=cmap_name,
            action_hist=action_hist,
            action_class=action_class,
        )
    plt.tight_layout()
    plt.show()

#### Plot for all the dts_ahead

In [None]:
n_print = 30
for dt_ahead in dts_ahead:
    print(f"{'='*n_print}\nMAKE PLOTS FOR dt={dt_ahead} LOOKAHEAD\n{'='*n_print}")
    # get details by lookahead
    ts_plot, xy_plot, vel_plot, meta_actions = get_actions_by_lookahead(
        dt_ahead=dt_ahead
    )

    # plot state transition
    plot_state_transition(meta_actions=meta_actions, ts_plot=ts_plot)

    # plot overlay on speed profile
    overlay_on_speed_profile(
        ts_plot=ts_plot, vel_plot=vel_plot, meta_actions=meta_actions
    )

    # plot overlay on trajectory
    overlay_on_trajectory(xy_plot=xy_plot, meta_actions=meta_actions)