In [None]:
import pickle
import polars as pl


with open('./data.pkl', 'rb') as f:
    df = pickle.load(f)

df

In [None]:
c = pl.col

In [None]:
import json
from collections import OrderedDict
import os
import numpy as np

# Trajectories within 300m behind an actively controlled AV 

In [None]:
df_once_within = df.filter(pl.col('distance_to_upstream_av_meters').list.min() < 300)
# Return all trajectories that are 'once' within 300m behind an actively controlled AV
df_once_within

In [None]:
(df_once_within
    .with_columns(
        # The column "within_indics" is a list of indices of the timestamps where the vehicle is within 300m behind an actively controlled AV
        within_indcs=pl.col('distance_to_upstream_av_meters')
            .map_elements(lambda dist: list(np.argwhere(np.array(dist) < 300)[:, 0]),
                          return_dtype=list[int]),
    )
    .with_columns(
        # The column "within_distance_to_upstream_av_meters" is a list of the timestamp where the vehicle is within 300m behind an actively controlled AV
        within_timestamps=pl.struct(['timestamp', 'within_indcs'])
            .map_elements(lambda x: list(np.array(x['timestamp'])[x['within_indcs']]),
                          return_dtype=list[float]),
    )
)

# Trajectories within 300m ahead of an actively controlled AV

In [None]:
df_once_within = df.filter(pl.col('distance_to_downstream_av_meters').list.min() > -300)
# Return all trajectories that are 'once' within 300m ahead of an actively controlled AV
df_once_within

In [None]:
(df_once_within
    .with_columns(
        # The column "within_indics" is a list of indices of the timestamps where the vehicle is within 300m ahead of an actively controlled AV
        within_indcs=pl.col('distance_to_downstream_av_meters')
            .map_elements(lambda dist: list(np.argwhere(np.array(dist) > -300)[:, 0]),
                          return_dtype=list[int]),
    )
    .with_columns(
        # The column "within_distance_to_upstream_av_meters" is a list of the timestamp where the vehicle is within 300m ahead of an actively controlled AV
        within_timestamps=pl.struct(['timestamp', 'within_indcs'])
            .map_elements(lambda x: list(np.array(x['timestamp'])[x['within_indcs']]),
                          return_dtype=list[float]),
    )
)

# Trajectories more than 2000m away from an AV 

In [None]:
df_once_away = df.filter(
    (pl.col('distance_to_downstream_av_meters').list.max() > 2000) |
    (pl.col('distance_to_upstream_av_meters').list.max() < -2000)
)
# Return all trajectories that are 'once' more than 2000m away from an actively controlled AV
df_once_away

In [None]:
(df_once_away
     .with_columns(
        # The column "daway_indics" is a list of indices of the timestamps where the vehicle is more than 2000m ahead of an actively controlled AV
         daway_indcs=pl.col('distance_to_downstream_av_meters')
            .map_elements(lambda dist: list(np.argwhere(np.array(dist) > 2000)[:, 0]),
                          return_dtype=list[int]),
        # The column "uaway_indics" is a list of indices of the timestamps where the vehicle is more than 2000m behind an actively controlled AV
         uaway_indcs=pl.col('distance_to_upstream_av_meters')
            .map_elements(lambda dist: list(np.argwhere(np.array(dist) < -2000)[:, 0]),
                          return_dtype=list[int]),
     )
     .with_columns(
        # The column "daway_timestamps" is a list of the timestamp where the vehicle is more than 2000m ahead of an actively controlled AV
         daway_timestamps=pl.struct(['timestamp', 'daway_indcs'])
            .map_elements(lambda x: list(np.array(x['timestamp'])[x['daway_indcs']]),
                          return_dtype=list[float]),
        # The column "uaway_timestamps" is a list of the timestamp where the vehicle is more than 2000m behind an actively controlled AV
         uaway_timestamps=pl.struct(['timestamp', 'uaway_indcs'])
            .map_elements(lambda x: list(np.array(x['timestamp'])[x['uaway_indcs']]),
                          return_dtype=list[float]),
     )
)

In [None]:
df.filter(c.downstream_av_id.map_elements(lambda ids: any(i is None for i in ids)))

# Trajectories exactly behind/ahead of an active AV

In [None]:
# POS = 'downstream'
POS = 'upstream'

def new_av(aid):
    """
    Given a list of av_ids, return a list of indices of when av_id changes.
    """
    aid = np.array([a if a is not None else -1 for a in aid], dtype=int)
    return list(np.argwhere(aid[:-1] != aid[1:])[:, 0] + 1)


df_split_av_ = (df
    .select(
        pl.col('trajectory_id'),
        pl.col('timestamp'),
        pl.col(f'{POS}_av_id'),
        pl.col(f'distance_to_{POS}_av_meters'),
    )
    .rename({'trajectory_id': 'tid', f'{POS}_av_id': 'avid', f'distance_to_{POS}_av_meters': 'dist'})
    # Remove all the trajectories without any av id.
    .filter(pl.col('avid').list.drop_nulls().list.len() != 0)
    .with_columns(
        # Since the timestamps for each trajectories do not align, we snap the timestamp of all the trajectories to the nearest millisecond.
        timestamp_int=pl.col('timestamp')
            .map_elements(lambda ts: [int(ts[0] * 100 + 0.5) + int((t - ts[0]) * 100 + 0.5) for t in ts],
                          return_dtype=list[int]),
        # A list of indices of a trajectory points. Will be used to reference each point of the trajectory.
        trajectory_point_idx=pl.col('timestamp')
            .map_elements(lambda ts: list(range(len(ts))),
                          return_dtype=list[int]),
    )
    .with_columns(
        # Derive when a trajectory changes its up/down-stream AV.
        new_av=pl.col('avid')
            .map_elements(new_av, return_dtype=list[int])
    )
    .with_columns(
        # Split timestamps based on the AV the trajectory follow / lead.
        timestamp=pl.struct('timestamp', 'new_av')
            .map_elements(lambda x: [list(l) for l in np.split(x['timestamp'], x['new_av'])],
                          return_dtype=list[list[float]]),
        # Split avid based on the AV the trajectory follow / lead.
        # Take the first AV id because all the ids in the splitted array are the same (because we split by AV id)
        avid=pl.struct('avid', 'new_av')
            .map_elements(lambda x: [l[0] for l in np.split(x['avid'], x['new_av'])],
                          return_dtype=list[int]),
        # Split dist based on the AV the trajectory follow / lead.
        dist=pl.struct('dist', 'new_av')
            .map_elements(lambda x: [list(l) for l in np.split(x['dist'], x['new_av'])],
                          return_dtype=list[list[float]]),
        # Split timestamp_int based on the AV the trajectory follow / lead.
        timestamp_int=pl.struct('timestamp_int', 'new_av')
            .map_elements(lambda x: [list(l) for l in np.split(x['timestamp_int'], x['new_av'])],
                          return_dtype=list[list[int]]),
        # Split trajectory_point_idx based on the AV the trajectory follow / lead.
        trajectory_point_idx=pl.struct('trajectory_point_idx', 'new_av')
            .map_elements(lambda x: [list(l) for l in np.split(x['trajectory_point_idx'], x['new_av'])],
                          return_dtype=list[list[int]]),
    )
)
df_split_av_

In [None]:
df_split_av = (df_split_av_
    .drop('new_av')
    .explode(
        # list[list[timestamp]] per row -> list[timestamp] per row
        'timestamp',
        # list[list[timestamp_int]] per row -> list[timestamp_int] per row
        'timestamp_int',
        # list[avid] per row -> avid per row
        'avid',
        # list[list[dist]] per row -> list[dist] per row
        'dist',
        # list[list[trajectory_point_idx]] per row -> list[trajectory_point_idx] per row
        'trajectory_point_idx',
    )
    .filter(~pl.col('avid').is_null())
)
df_split_av

In [None]:
import itertools
def interpolate(arr):
    last = arr[-1]
    arr = np.array(arr)
    prv = arr[:-1]
    nxt = arr[1:]
    dff = nxt - prv
    dff25 = prv + dff * .25
    dff50 = prv + dff * .50
    dff75 = prv + dff * .75
    return list(np.stack((prv, dff25, dff50, dff75), axis=-1).flatten()) + [last]
    

df_interpolate_ = (df_split_av
    .with_columns(
        # Points of a trajectories are captured roughly every 4 millisecond and 2 trajectories might not align, so we interpolate trajectories points to every 1 millisecond.
        timestamp_int=pl.col('timestamp_int').map_elements(lambda ts: list(range(ts[0], ts[-1] + 1)), return_dtype=list[int]),
        # Interpolate timestamp
        timestamp=pl.col('timestamp').map_elements(interpolate, return_dtype=list[float]),
        # Interpolate dist
        dist=pl.col('dist').map_elements(interpolate, return_dtype=list[float]),
        # Interpolate trajectory_point_idx
        trajectory_point_idx_interp=pl.col('trajectory_point_idx').map_elements(lambda r: range(r[0] * 4, r[-1] * 4 + 1), return_dtype=list[int]),
        # avid=pl.col('avid').list.first(),
    )
)

df_interpolate_

In [None]:
df_interpolate = (df_interpolate_
    # Drop the original trajectory_point_idx it has different lenght than other columns (cannot explode)
    .drop('trajectory_point_idx')
    .explode(
        # Flatten all trajectories points
        'timestamp',
        'dist',
        'timestamp_int',
        'trajectory_point_idx_interp',
    )
)
df_interpolate

In [None]:
df_interpolate_grouped = (df_interpolate
    # For every AV id and every timestamp (snapped to the nearest millissecond -> every millisecond)
    .group_by('avid', 'timestamp_int')
    # Roll up ungrouped columns into lists
    .all()
    .with_columns(
        # Find the index of the nearest vehicle
        nearest_idx=pl.col('dist').list.arg_min() if POS == 'downstream' else pl.col('dist').list.arg_max()
    )
)
df_interpolate_grouped

In [None]:
df_nearest = (df_interpolate_grouped
    .with_columns(
        # Choose the vehicle from the index
        timestamp=pl.col('timestamp').list.get(pl.col('nearest_idx')),
        trajectory_id=pl.col('tid').list.get(pl.col('nearest_idx')),
        dist=pl.col('dist').list.get(pl.col('nearest_idx')),
        trajectory_point_idx_interp=pl.col('trajectory_point_idx_interp').list.get(pl.col('nearest_idx')),
    )
    .drop('nearest_idx', 'tid')
    # Only include the non-interpolated trajectory points
    .filter(pl.col('trajectory_point_idx_interp') % 4 == 0)
    .sort('timestamp')
    # (trajectory_id, trajectory_point_idx) can now be used to reference the trajectory points in the original dataframe.
    .with_columns(trajectory_point_idx=pl.col('trajectory_point_idx_interp') // 4)
)
df_nearest

In [None]:
(df_nearest
    .group_by('avid')
    # Roll up ungrouped columns into lists
    .all()
)