# Detector Validation


In [1]:
# ruff: noqa: E402
%load_ext autoreload
%autoreload 2

# find the root of the project
import os
from pathlib import Path
import sys
import polars as pl

ROOT = Path(os.getcwd()).parent
while not ROOT.joinpath(".git").exists():
    ROOT = ROOT.parent

# add the root to the python path
sys.path.append(str(ROOT))

## Read in the Processed Trajectories

The detector is in EBL1 Index 2


In [2]:
radar_df = (
    pl.scan_parquet(
        ROOT / "data" / "vppc_ext_data.parquet",
    )
    # .filter((pl.col("lane") == "EBL1") & (pl.col("lane_index") == 1))
    .sort("epoch_time_cst")
    .set_sorted("epoch_time_cst")
    .collect()
)

radar_df.head()

epoch_time_cst,vehicle_id,lane,lane_index,association_distance,s_smooth,s_velocity_smooth,length_s,Harper
"datetime[ms, US/Central]",u64,str,u16,f32,f64,f32,f64,bool
2023-03-12 16:41:27.300 CDT,0,"""WBL1""",0,,1044.388272,22.171509,3.613847,False
2023-03-12 16:41:27.300 CDT,1,"""EBL1""",0,,643.46732,21.330873,5.367028,False
2023-03-12 16:41:27.300 CDT,2,"""EBL1""",1,,1084.416652,16.189831,4.784425,True
2023-03-12 16:41:27.300 CDT,3,"""EBL1""",0,,920.065366,23.284044,9.615423,False
2023-03-12 16:41:27.300 CDT,4,"""EBL1""",0,,1205.912551,3.290045,1.690091,True


## Read in the Traffic Signals

In [3]:
import polars as pl
from src.plotting.traffic_signals import open_signal_df, add_signals_to_plot


signal_df = open_signal_df(ROOT / "data" / "tl_logs" / "03_13_2023_TL.csv", offset=2.9)

## Plot the Time Space Data

In [4]:
import plotly.graph_objects as go
from src.plotting.time_space import plot_time_space
from src.radar import BasicRadar
from datetime import timedelta

tl = pl.lit("2023-03-13 06:50:00").str.strptime(
    pl.Datetime(time_unit="ns", time_zone="US/Central"),
)
th = pl.lit("2023-03-13 07:00:00").str.strptime(
    pl.Datetime(time_unit="ns", time_zone="US/Central"),
)

plot_radar_df = (
    radar_df
    # .pipe(BasicRadar.add_cst_timezone)
    .filter(
        pl.col("epoch_time_cst").is_between(
            tl,
            th,
        )
        & (pl.col("lane").str.contains("E"))
        & (pl.col("lane_index") == 0)
    ).with_columns(pl.col("s_smooth").max() - pl.col("s_smooth"))
)


fig = plot_time_space(
    plot_radar_df,
    s_col="s_smooth",
    vehicle_col="vehicle_id",
)


fig = add_signals_to_plot(
    signal_df.filter(
        pl.col("Timestamp").is_between(
            tl - timedelta(seconds=120),
            th + timedelta(seconds=120),
        )
        & (pl.col("Phase") == 6)
    ),
    fig=fig,
    s_mapping={63082002: 1230, 63082003: 830, 63082004: 450},
)

fig.update_layout(
    # use journal paper font
    font_family="Times New Roman",
    # set the font size
    font_size=22,
    # set the x_limit
    xaxis=dict(
        range=[
            plot_radar_df["epoch_time_cst"].min(),
            plot_radar_df["epoch_time_cst"].max(),
        ],
    ),
    yaxis=dict(
        range=[200, 1500],
        title="S Distance [m]",
    ),
    # set the size of the figure
    width=1000,
    height=600,
)


# save the figure as png
fig.write_image(ROOT / "data" / "figures" / "time_space.png")

# display the image
from ipywidgets import Image

Image(value=open(ROOT / "data" / "figures" / "time_space.png", "rb").read())

Image(value=b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x03\xe8\x00\x00\x02X\x08\x06\x00\x00\x00\xf1\x1e\xcc…

In [5]:
# this is a a stupid way to deal with datetimes, but hack it till you make it
tl = pl.lit("2023-03-13 06:57:00").str.strptime(
    pl.Datetime(time_unit="ns", time_zone="US/Central"),
)

tl = plot_radar_df.filter(pl.col("epoch_time_cst") > tl)["epoch_time_cst"].min()


fig.update_layout(
    # use journal paper font
    font_family="Times New Roman",
    # set the font size
    font_size=22,
    # set the x_limit
    xaxis=dict(
        range=[tl, plot_radar_df["epoch_time_cst"].max()],
    ),
    yaxis=dict(
        range=[200, 1400],
        title="S Distance [m]",
    ),
    # set the size of the figure
    width=1000,
    height=600,
)

fig.write_image(ROOT / "data" / "figures" / "time_space_small.png")

# fig.show()

## Plot Speed and Volume Data

In [None]:
# get the volume at Harper & Average Speed

avg_speed = (
    radar_df.filter(
        pl.col("s_velocity_smooth").is_between(0, 35)
        & pl.col("s_velocity_smooth").is_not_nan()
    )
    .group_by_dynamic("epoch_time_cst", by="lane", every="30m")
    .agg(pl.col("s_velocity_smooth").mean())
)


volume = (
    radar_df.filter(
        pl.col("s_velocity_smooth").is_between(0, 35)
        & pl.col("s_velocity_smooth").is_not_nan()
    )
    .with_columns(
        pl.col('lane').replace({
            'EBL1': 950,
            'WBL1': 950
        }).cast(float).alias('s_dist')
    )
    .with_columns(
        (pl.col('s_smooth') >= pl.col('s_dist')).alias('gt')
    )
    .filter(
        # indicative of a detector cross
        ((pl.col('gt') == 0).any() & (pl.col('gt') == 1).any()).over('vehicle_id')
    )
    .group_by_dynamic("epoch_time_cst", by="lane", every="30m")
    .agg(pl.col("vehicle_id").n_unique() * 2)
)

In [None]:
import matplotlib.pyplot as plt
import scienceplots
import seaborn as sns

engine_color = [0.65, 0.65, 0.65]
gps_color = [0, 0, 0.75]
plutron_color = [0, 0, 0]
alabama = [165, 30, 54]
alabama = [i / 255 for i in alabama]

plt.style.use(["science", "ieee"])


fig, ax = plt.subplots(figsize=(6, 3))
ax2 = ax.twinx()

sns.lineplot(
    data=avg_speed.with_columns(pl.col("epoch_time_cst").dt.time().cast(str))
    .sort("epoch_time_cst")
    .to_pandas().drop_duplicates(["epoch_time_cst", 'lane']),
    x="epoch_time_cst",
    y="s_velocity_smooth",
    hue="lane",
    ax=ax2,
    legend=False,
    palette=[alabama, engine_color, plutron_color],
    linestyle='--'
    # style="-"
    # dash the lines
    # dashes=[(2, 2), (2, 2)]
)

plot_ = sns.lineplot(
    data=volume.with_columns(pl.col("epoch_time_cst").dt.time().cast(str))
    .sort("epoch_time_cst")
    .to_pandas().drop_duplicates(["epoch_time_cst", 'lane']),
    x="epoch_time_cst",
    y="vehicle_id",
    hue="lane",
    ax=ax,
    legend=True,
    palette=[alabama, engine_color, plutron_color],
    # style="-"
    # dash the lines
    # dashes=[(2, 2), (2, 2)]
)

for ind, label in enumerate(plot_.get_xticklabels()):
    if ind % 5 == 0:  # every 10th label is kept
        label.set_visible(True)
    else:
        label.set_visible(False)

# remove the title from the legend

handles, labels = ax.get_legend_handles_labels()
ax.legend(handles=handles, labels=labels)

# ax.set_xticklabels(volume['epoch_time_cst'][::10], rotation=45)
ax.set_ylabel("Volume [veh/hr]")
ax2.set_ylabel("Average Speed [m/s]")
ax.set_xlabel("Time")

# plt.locator_params(axis='x', nbins=5)
# plt.gcf().autofmt_xdate()

In [None]:
avg_speed.with_columns(pl.col("epoch_time_cst").dt.time())