# Compute Metrics on Trajectories

In [1]:
import omega_prime
import polars as pl
import altair as alt
import numpy as np


r = omega_prime.Recording.from_file("example_files/alks_cut-in.osi", xodr_path="example_files/straight_500m.xodr")

In [2]:
r._df["idx"].unique()

idx
u32
0
1


In [3]:
mm = omega_prime.metrics.MetricManager(
    metrics=[
        omega_prime.metrics.timegaps_and_min_timgaps,
        omega_prime.metrics.vel,
        omega_prime.metrics.distance_traveled,
        omega_prime.metrics.p_timegaps_and_min_p_timgaps,
    ],
)
mm

computes columns: ['vel', 'distance_traveled'] - computes properties ['timegaps', 'min_timegaps', 'p_timegaps', 'min_p_timegaps'] - args [<Parameter "ego_id">, <Parameter "time_buffer=2000000000.0">, <Parameter "ego_id">, <Parameter "time_buffer=2000000000.0">]

In [4]:
df, metrics = mm.compute(r, ego_id=0, time_buffer=10e9)
list(metrics.keys())

['timegaps', 'min_timegaps', 'p_timegaps', 'min_p_timegaps']

In [5]:
(
    alt.Chart(metrics["p_timegaps"]).mark_line().encode(alt.X("total_nanos_ego:Q"), alt.Y("p_timegap"), color="idx:N")
    + alt.Chart(metrics["timegaps"])
    .mark_line(strokeDash=[10, 2])
    .encode(alt.X("total_nanos_ego:Q"), alt.Y("timegap"), color="idx:N")
    + alt.Chart(metrics["min_timegaps"])
    .mark_rule(strokeDash=[1, 1])
    .encode(
        alt.Y("min_timegap"),
        color="idx:N",
    )
    .properties(title="timegap")
).interactive()

In [6]:
max_nanos = np.max(metrics["p_timegaps"].select(pl.max("total_nanos_ego", "total_nanos")).to_numpy())
min_nanos = np.min(metrics["p_timegaps"].select(pl.min("total_nanos_ego", "total_nanos")).to_numpy())

(
    alt.Chart(metrics["timegaps"].select(pl.col("total_nanos_ego", "total_nanos", "idx")))
    .mark_line(strokeDash=[1, 1])
    .encode(alt.X("total_nanos_ego:Q"), alt.Y("total_nanos:Q"), color="idx:N")
    + alt.Chart()
    .mark_rule(color="red")
    .encode(x=alt.datum(min_nanos), x2=alt.datum(max_nanos), y2=alt.datum(max_nanos), y=alt.datum(min_nanos))
    + (
        alt.Chart(metrics["p_timegaps"].select(pl.col("total_nanos_ego", "total_nanos", "idx")))
        .mark_line()
        .encode(alt.X("total_nanos_ego:Q"), alt.Y("total_nanos:Q"), color="idx:N")
    )
).properties(title="time of smallest timegap").interactive()

# Create your own metric

create a function that takes the dataframe from a Recording (`Recording._df`) and returns the same dataframe and a dictonary with additional dataframes (dictionary needs to be always return but can be empty if necessary). Decorate it with `@omega_prime.metrics.metric()` and add it to your `MetricManager`

In [7]:
@omega_prime.metrics.metric(computes_properties=["test_property"])
def compute_stuff(df):
    return df, {"test_property": df.group_by("idx").agg(pl.col("idx").count().alias("count"))}


compute_stuff(r._df)

(<LazyFrame at 0x217788D0ED0>, {'test_property': <LazyFrame at 0x2177C659190>})

In [8]:
type(compute_stuff)

omega_prime.metrics.Metric

In [9]:
?omega_prime.metrics.Metric

[31mInit signature:[39m
omega_prime.metrics.Metric(
    compute_func: collections.abc.Callable[[polars.lazyframe.frame.LazyFrame, ...], tuple[polars.lazyframe.frame.LazyFrame, dict[str, polars.lazyframe.frame.LazyFrame]]],
    computes_columns: list[str] = <factory>,
    computes_properties: list[str] = <factory>,
    requires_columns: list[str] = <factory>,
    requires_properties: list[str] = <factory>,
    computes_intermediate_columns: list[str] = <factory>,
    computes_intermediate_properties: list[str] = <factory>,
) -> [38;5;28;01mNone[39;00m
[31mDocstring:[39m      Metric(compute_func: collections.abc.Callable[[polars.lazyframe.frame.LazyFrame, ...], tuple[polars.lazyframe.frame.LazyFrame, dict[str, polars.lazyframe.frame.LazyFrame]]], computes_columns: list[str] = <factory>, computes_properties: list[str] = <factory>, requires_columns: list[str] = <factory>, requires_properties: list[str] = <factory>, computes_intermediate_columns: list[str] = <factory>, computes_interm

In [10]:
mm = omega_prime.metrics.MetricManager(
    metrics=[compute_stuff],
)
mm

computes columns: [] - computes properties ['test_property'] - args []

In [11]:
df, metrics_dict = mm.compute(r)
metrics_dict

{'test_property': shape: (2, 2)
 ┌─────┬───────┐
 │ idx ┆ count │
 │ --- ┆ ---   │
 │ u32 ┆ u32   │
 ╞═════╪═══════╡
 │ 0   ┆ 305   │
 │ 1   ┆ 305   │
 └─────┴───────┘}