# Path Visualization

In [None]:
import os
import re
from math import isnan, nan
from typing import cast

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.artist import Artist
from matplotlib.axes import Axes
from matplotlib.ticker import ScalarFormatter

## Data Loading

In [None]:
def create_route():
    return {
        "__plot_handle": None,
        "Origin_x": nan,
        "Origin_y": nan,
        "Initial_dir": nan,
        "End_dir": nan,
        "Left_right_sign": nan,
        "PathNum": nan,
        "Path": [
            {
                "PathType": nan,
                "TraceType": nan,
                "EntryPoint_x": nan,
                "EntryPoint_y": nan,
                "ExitPoint_x": nan,
                "ExitPoint_y": nan,
                "CenterPoint_x": nan,
                "CenterPoint_y": nan,
                "R": nan,
                "X_data": [nan] * 10,
                "Y_data": [nan] * 10,
            }
            for _i in range(8)
        ],
    }


def create_route_pair():
    return {
        "front": create_route(),
        "rear": create_route(),
    }


route_pairs = []

ROUTE_PATTERN = re.compile(
    r"""(?x)
        g(?P<route_type>Front|Rear)Route\.
        (?P<key>\w+)\ =\ (?P<value>-?\d+(?:\.\d+)?)
    """
)
PATH_PATTERN = re.compile(
    r"""(?x)
        g(?P<route_type>Front|Rear)Route\.
        Path\[(?P<path_index>\d+)\]\.
        (?P<key>\w+)\ =\ (?P<value>-?\d+(?:\.\d+)?)
    """
)
DATA_PATTERN = re.compile(
    r"""(?x)
        g(?P<route_type>Front|Rear)Route\.
        Path\[(?P<path_index>\d+)\]\.
        (?P<key>\w+)\[(?P<list_index>\d+)\]\ =\ (?P<value>-?\d+(?:\.\d+)?)
    """
)

candidate_log_files = [
    file_name
    for file_name in os.listdir(".")
    if file_name.startswith("tracking_control") and file_name.endswith(".log")
]
if len(candidate_log_files) != 1:
    raise RuntimeError(
        f"Unexpected log file candidate(s): {candidate_log_files!r}"
    )

current_route_pair = create_route_pair()
route_pairs.append(current_route_pair)

with open(candidate_log_files[0], "r", encoding="utf-8") as log_file:
    for line in log_file:
        if "INFO" not in line:
            continue

        line = line.strip()
        try:
            escape_seq_len = line.index("[ INFO]")
            if escape_seq_len > 0:
                line = line[escape_seq_len:-escape_seq_len]
        except ValueError:
            pass

        if match := ROUTE_PATTERN.search(line):
            route_type = match.group("route_type").lower()
            path_index = -1
            list_index = -1
            key = match.group("key")
            value = match.group("value")
        elif match := PATH_PATTERN.search(line):
            route_type = match.group("route_type").lower()
            path_index = int(match.group("path_index"))
            list_index = -1
            key = match.group("key")
            value = match.group("value")
        elif match := DATA_PATTERN.search(line):
            route_type = match.group("route_type").lower()
            path_index = int(match.group("path_index"))
            list_index = int(match.group("list_index"))
            key = match.group("key")
            value = match.group("value")
        else:
            continue

        if "." in value:
            value = float(value)
        else:
            value = int(value)

        route = current_route_pair[route_type]
        if path_index == -1:
            if key not in route:
                continue
            if (
                (not isnan(route[key]))
                and (route[key] != value)
                and (route[key] > 0)
            ):
                current_route_pair = create_route_pair()
                route_pairs.append(current_route_pair)
                route = current_route_pair[route_type]
            route[key] = value
        else:
            path = route["Path"][path_index]
            if key not in path:
                continue
            if list_index == -1:
                if not isnan(path[key]) and path[key] != value:
                    current_route_pair = create_route_pair()
                    route_pairs.append(current_route_pair)
                    route = current_route_pair[route_type]
                    path = route["Path"][path_index]
                path[key] = value
            else:
                list_ = path[key]
                if not isnan(list_[list_index]) and list_[list_index] != value:
                    current_route_pair = create_route_pair()
                    route_pairs.append(current_route_pair)
                    route = current_route_pair[route_type]
                    path = route["Path"][path_index]
                    list_ = path[key]
                list_[list_index] = value

print(f"Loaded {candidate_log_files[0]!r}. ({len(route_pairs)} route pair(s) in total.)")

In [None]:
df_raw = pd.read_csv(
    "tracking_control_node.csv",
    index_col=0,
    parse_dates=["timestamp"],
)
df_plot = df_raw.query("valid == 1")

asc_flag = "x_vf" in df_plot.columns
if asc_flag:
    front_label, rear_label = "vf", "vr"
else:
    front_label, rear_label = "f", "r"

In [None]:
if asc_flag:
    TRANS_MAP_DIR_SUFFIX = "agv_ns_ros/data/AGV_Map/"
else:
    TRANS_MAP_DIR_SUFFIX = "agv_ns_ros_agv/data/AGV_Map/"

CWD = os.getcwd()
current_dir = os.path.dirname(CWD)
while (parent_dir := os.path.dirname(current_dir)) != current_dir:
    current_dir = parent_dir
    map_dir_path = os.path.join(current_dir, TRANS_MAP_DIR_SUFFIX)
    if os.path.exists(map_dir_path):
        TRANS_MAP_DIR = map_dir_path
        break
else:
    raise FileNotFoundError("Failed to find the transponder map file!")

trans_map_candidates = [
    file_name
    for file_name in os.listdir(TRANS_MAP_DIR)
    if file_name.startswith("TransMap") and file_name.endswith(".csv")
]
if len(trans_map_candidates) < 1:
    raise FileNotFoundError("Transponder map file not found!")
else:
    selected_trans_map_file_name = sorted(trans_map_candidates)[-1]

TRANS_MAP_PATH = os.path.join(TRANS_MAP_DIR, selected_trans_map_file_name)
print(f'Selected "{os.path.relpath(TRANS_MAP_PATH, CWD)}".')
df_trans_map = pd.read_csv(
    TRANS_MAP_PATH,
    names=["TransID", "AbsX", "AbsY", "LaneNB1", "LaneNB2"],
    index_col=0,
)
df_trans_map["x"] = df_trans_map["AbsX"] / 1000
df_trans_map["y"] = df_trans_map["AbsY"] / 1000


def plot_transponders(
    ax: Axes,
    xlim: tuple[float, float],
    ylim: tuple[float, float],
) -> None | Artist:
    handle = None
    for id in df_trans_map.index:
        x = cast(np.float64, df_trans_map.loc[id, "x"])
        y = cast(np.float64, df_trans_map.loc[id, "y"])
        if not ((xlim[0] < x < xlim[1]) and (ylim[0] < y < ylim[1])):
            continue
        handle = ax.plot(
            x,
            y,
            "+",
            ms=5,
            color="purple",
            alpha=0.5,
            label="Transponders",
        )[0]
    return handle

## Entry/Exit Points

In [None]:
for i, route_pair in enumerate(route_pairs):
    print(f"======== route_pair[{i}] ========")

    first_front_path = route_pair["front"]["Path"][0]
    first_path_dir = np.array([
        first_front_path["ExitPoint_x"] - first_front_path["EntryPoint_x"],
        first_front_path["ExitPoint_y"] - first_front_path["EntryPoint_y"],
    ])
    initial_vehicle_dir = np.array([
        df_plot["x_" + front_label].iloc[0] - df_plot["x_" + rear_label].iloc[0],
        df_plot["y_" + front_label].iloc[0] - df_plot["y_" + rear_label].iloc[0],
    ])
    if np.dot(first_path_dir, initial_vehicle_dir) > 0:
        target_axle = "front"
        target_route_label = front_label
    else:
        target_axle = "rear"
        target_route_label = rear_label

    target_route = route_pair[target_axle]
    print(f"PathNum: {target_route['PathNum']:d}")
    if target_route["PathNum"] <= 0:
        continue

    target_paths = target_route["Path"]

    for route_type, route in route_pair.items():
        for j, path in enumerate(route["Path"]):
            if isnan(path["TraceType"]):
                break
            print(
                "%sRoute.Path[%d]: (%7.3f, %7.3f) -> (%7.3f, %7.3f)" % (
                    route_type.title(),
                    j,
                    path["EntryPoint_x"],
                    path["EntryPoint_y"],
                    path["ExitPoint_x"],
                    path["ExitPoint_y"],
                )
            )

print(
    "Final Position (%s): (%7.3f, %7.3f)" % (
        target_axle,  # type: ignore
        df_plot[f"x_{target_route_label}"].iloc[-1],  # type: ignore
        df_plot[f"y_{target_route_label}"].iloc[-1],  # type: ignore
    )
)

fig = plt.figure(figsize=(6, 3), dpi=150)
fig.set_facecolor("#fff")
ax = fig.add_subplot()

plot_slice = slice(-min(len(df_plot), 200), None)
last_path = [
    path for path in target_paths if not isnan(path["TraceType"])  # type: ignore
][-1]
target_axis = (
    "y"
    if np.isclose(last_path["ExitPoint_x"], last_path["EntryPoint_x"])
    else "x"
)
target_label = f"{target_axis}_{target_route_label}"  # type: ignore
v_plot = df_plot[target_label]
v_ref = last_path[f"ExitPoint_{target_axis}"]
ax.plot(
    df_plot["timestamp"].iloc[plot_slice],
    v_plot.iloc[plot_slice],
    "b.-",
    ms=2,
    alpha=0.3,
    label="Actual Position",
)

ax.axhline(v_ref, ls="--", color="gold", alpha=0.8, label="Target Position")

for label in ax.get_xticklabels():
    label.set_rotation(15)

ax.yaxis.set_major_formatter(ScalarFormatter(useOffset=False))

ax.set(
    xlabel="Timestamp",
    ylabel=target_label,
)
ax.legend()
ax.grid()

## Visualization

In [None]:
PLOT_PADDING = 2
ROUTE_PAIR_SLICE = slice(None, None)
DF_SLICE = slice(None, None)


def visualize(items: list[tuple[str, tuple[str, str]]]) -> None:
    """
    Args:
        items (list[tuple[str, tuple[str, str]]]):
            (route_type, (design_color, actual_color)), ...
    """

    fig = plt.figure(figsize=(5, 5), dpi=150)
    fig.set_facecolor("#fff")
    ax = fig.add_subplot()

    handles = []
    x_min = y_min = np.inf
    x_max = y_max = -np.inf

    for route_type, colors in items:
        for route_pair in route_pairs[ROUTE_PAIR_SLICE]:
            route = route_pair[route_type]
            plot_kwargs = dict(
                lw=6,
                ms=12,
                color=colors[0],
                alpha=0.5,
                label=f"Design path ({route_type})",
            )
            # draw circle reference first
            for path in route["Path"]:
                if isnan(path["TraceType"]):
                    break
                if path["TraceType"] == 2:  # circle
                    theta = np.linspace(0, 2 * np.pi, 100)
                    x = path["R"] * np.cos(theta) + path["CenterPoint_x"]
                    y = path["R"] * np.sin(theta) + path["CenterPoint_y"]
                    ax.plot(x, y, "--",
                            color="gray", lw=1, alpha=0.5)  # type: ignore
                    ax.plot(
                        path["CenterPoint_x"],
                        path["CenterPoint_y"],
                        "+",
                        color="gray",
                        alpha=0.6,
                    )
                    x_min, y_min = min(x_min, x.min()), min(y_min, y.min())
                    x_max, y_max = max(x_max, x.max()), max(y_max, y.max())
            # draw other paths
            for path in route["Path"]:
                if isnan(path["TraceType"]):
                    break
                match path["TraceType"]:
                    case 1:  # straight line
                        x = np.array(
                            [path["EntryPoint_x"], path["ExitPoint_x"]])
                        y = np.array(
                            [path["EntryPoint_y"], path["ExitPoint_y"]])
                        route["__plot_handle"] = ax.plot(
                            x,
                            y,
                            ".-",
                            **plot_kwargs,  # type: ignore
                        )[0]
                        x_min, y_min = min(x_min, x.min()), min(y_min, y.min())
                        x_max, y_max = max(x_max, x.max()), max(y_max, y.max())
                    case 2:  # circle
                        pass
                    case 3 | 4:  # arbitrary curve
                        x0 = route["Origin_x"]
                        y0 = route["Origin_y"]
                        x1 = np.array(path["X_data"])
                        y1 = np.array(path["Y_data"])
                        theta = route["Initial_dir"]
                        sign = route["Left_right_sign"]
                        x = x1 * np.cos(theta) - sign * y1 * np.sin(theta) + x0
                        y = x1 * np.sin(theta) + sign * y1 * np.cos(theta) + y0
                        route["__plot_handle"] = ax.plot(
                            x,
                            y,
                            ".-",
                            **plot_kwargs,  # type: ignore
                        )[0]
                        x_min, y_min = min(x_min, x.min()), min(y_min, y.min())
                        x_max, y_max = max(x_max, x.max()), max(y_max, y.max())
                    case _ as v:
                        raise ValueError(f"unexpected trace type: {v!r}")

    handles.extend(
        filter(
            bool,
            [
                route_pairs[0][route_type]["__plot_handle"]
                for route_type, _ in items
            ]
        )
    )

    for route_type, colors in items:
        route_label = front_label if route_type == "front" else rear_label
        x = df_plot["x_" + route_label].iloc[DF_SLICE]
        y = df_plot["y_" + route_label].iloc[DF_SLICE]
        handles.append(
            ax.plot(x, y, ".-", ms=2, color=colors[1], alpha=0.5,
                    label=f"Actual path ({route_type})")[0]
        )
        x_min, y_min = min(x_min, x.min()), min(y_min, y.min())
        x_max, y_max = max(x_max, x.max()), max(y_max, y.max())

    x_mid = (x_min + x_max) / 2
    y_mid = (y_min + y_max) / 2
    radius = max(x_max - x_mid, y_max - y_mid)
    x_min, x_max = x_mid - radius, x_mid + radius
    y_min, y_max = y_mid - radius, y_mid + radius
    xlim = (x_min - PLOT_PADDING, x_max + PLOT_PADDING)
    ylim = (y_min - PLOT_PADDING, y_max + PLOT_PADDING)

    handle_transponders = plot_transponders(ax, xlim=xlim, ylim=ylim)
    if handle_transponders:
        handles.append(handle_transponders)

    if ("obstacle_x" in df_plot) and ("obstacle_y" in df_plot):
        df_obstacle = df_plot.loc[df_plot["obstacle_exists"] == 1]
        if len(df_obstacle):
            heading = df_obstacle["heading"]
            cos_heading = np.cos(heading)
            sin_heading = np.sin(heading)
            x_obstacle_rel = df_obstacle["obstacle_x"]
            y_obstacle_rel = -df_obstacle["obstacle_y"]
            x_obstacle_abs = df_obstacle["x_center"] \
                + x_obstacle_rel * cos_heading - y_obstacle_rel * sin_heading
            y_obstacle_abs = df_obstacle["y_center"] \
                + x_obstacle_rel * sin_heading + y_obstacle_rel * cos_heading
            handles.append(
                ax.plot(x_obstacle_abs.iloc[DF_SLICE],
                        y_obstacle_abs.iloc[DF_SLICE],
                        "co", ms=3, alpha=0.3, label="Obstacle Positions")[0]
            )

    ax.set(
        xlabel="x",
        ylabel="y",
        xlim=xlim,
        ylim=ylim,
        aspect="equal",
    )
    ax.legend(handles=handles)
    ax.grid(alpha=0.5)

### Front & Rear

In [None]:
visualize([
    ("front", ("red", "gold")),
    ("rear", ("blue", "cyan")),
])

### Front Only

In [None]:
visualize([
    ("front", ("red", "gold")),
])

### Rear Only

In [None]:
visualize([
    ("rear", ("blue", "cyan")),
])