In [1]:
import pandas as pd
import numpy as np
from scipy.spatial.distance import cdist

# Parameters
SPEED_THRESHOLD = 4  # m/s
MIN_STOP_DURATION = 0.5  # seconds
DIST_THRESH = 3.5
EXPORT_INTERVAL = 0.16  # seconds
PED_BOX = (0.3, 0.3)
LIGHT_TRUCK_BOX = (6.10, 2.10)

# ----------------------------------
# Helpers
# ----------------------------------

def filter_stopped_vehicles(df, speed_col='speed', time_col='TimeStamp'):
    """Filter out stopped vehicles longer than threshold"""
    df = df.copy()
    df['time_diff'] = df.groupby('Track ID')[time_col].diff()
    df['is_stopped'] = df[speed_col] < SPEED_THRESHOLD
    stop_groups = (df['is_stopped'] != df['is_stopped'].shift()).cumsum()
    df['stop_duration'] = df.groupby(['Track ID', stop_groups])['time_diff'].cumsum()
    return df[~((df['is_stopped']) & (df['stop_duration'] >= MIN_STOP_DURATION))].drop(columns=['time_diff', 'is_stopped', 'stop_duration'])

def get_rotated_corners(x, y, heading, length, width):
    """Return coordinates of rotated bounding box corners"""
    half_l, half_w = length / 2, width / 2
    corners = np.array([
        [ half_l,  half_w],
        [ half_l, -half_w],
        [-half_l, -half_w],
        [-half_l,  half_w]
    ])
    rad = np.radians(heading)
    rot = np.array([
        [np.cos(rad), -np.sin(rad)],
        [np.sin(rad),  np.cos(rad)]
    ])
    return (corners @ rot.T) + np.array([x, y])

def get_closest_corners(row):
    """Get direction vector and min distance between closest corners"""
    ped_corners = get_rotated_corners(row['x_smooth_ped'], row['y_smooth_ped'], row['HA_ped'], *PED_BOX)
    light_truck_corners = get_rotated_corners(row['x_smooth_light_truck'], row['y_smooth_light_truck'], row['HA_light_truck'], *LIGHT_TRUCK_BOX)
    dists = cdist(ped_corners, light_truck_corners)
    idx = np.unravel_index(np.argmin(dists), dists.shape)
    min_dist = dists[idx]
    direction_vec = light_truck_corners[idx[1]] - ped_corners[idx[0]]
    return min_dist, direction_vec

def calculate_attc(row):
    """Compute ATTC using projection of relative motion on direction vector"""
    min_dist, direction_vec = get_closest_corners(row)
    norm = np.linalg.norm(direction_vec)
    if norm == 0:
        return np.inf
    unit_vec = direction_vec / norm

    rel_v = np.array([
        row['vx_smooth_ped'] - row['vx_smooth_light_truck'],
        row['vy_smooth_ped'] - row['vy_smooth_light_truck']
    ])
    rel_a = 0.5 * np.array([
        row['ax_ped'] - row['ax_light_truck'],
        row['ay_ped'] - row['ay_light_truck']
    ])

    closing_rate = -np.dot(rel_v, unit_vec) + np.dot(rel_a, unit_vec)
    return min_dist / closing_rate if closing_rate > 0 else np.inf

# ----------------------------------
# Load Data
# ----------------------------------

df_ped = pd.read_csv(r"D:\T\test_codeEVT\nd\ped_smooth.csv")
df_light_truck = pd.read_csv(r"D:\T\test_codeEVT\nd\LightTruck_smooth.csv")
df_light_truck['speed'] = np.sqrt(df_light_truck['vx_smooth']**2 + df_light_truck['vy_smooth']**2)

# Filter stopped vehicles
print(f"Original light_truckrcycle count: {len(df_light_truck)}")
df_light_truck = filter_stopped_vehicles(df_light_truck, speed_col='speed')
print(f"Filtered light_truckrcycle count: {len(df_light_truck)}")
print(df_light_truck['speed'].mean())
# Calculate yaw rates
for df in [df_ped, df_light_truck]:
    df['yaw_rate'] = df.groupby('Track ID').apply(
        lambda x: x['HA'].diff() / x['TimeStamp'].diff()
    ).reset_index(level=0, drop=True)

# Round timestamps
df_ped['Time_rounded'] = (df_ped['TimeStamp'] / EXPORT_INTERVAL).round() * EXPORT_INTERVAL
df_light_truck['Time_rounded'] = (df_light_truck['TimeStamp'] / EXPORT_INTERVAL).round() * EXPORT_INTERVAL


Original light_truckrcycle count: 51965
Filtered light_truckrcycle count: 7154
6.514972427036709


  df['yaw_rate'] = df.groupby('Track ID').apply(
  df['yaw_rate'] = df.groupby('Track ID').apply(


In [2]:
df_light_truck[df_light_truck['Track ID']==5675][['TimeStamp','speed']].head(50)

Unnamed: 0,TimeStamp,speed
16065,2057.4,1.068
16066,2057.56,1.068
16067,2057.72,0.975407
16068,2057.88,0.576078
16392,2109.72,5.208749
16393,2109.88,4.288111
16394,2110.04,2.312377
16395,2110.2,1.623321
16396,2110.36,2.379163
16524,2130.84,4.827403


In [3]:

# ----------------------------------
# Merge and Process
# ----------------------------------

merged = pd.merge(
    df_ped, 
    df_light_truck, 
    on='Time_rounded', 
    suffixes=('_ped', '_light_truck')
)

merged['Center_dist'] = np.hypot(
    merged['x_smooth_ped'] - merged['x_smooth_light_truck'],
    merged['y_smooth_ped'] - merged['y_smooth_light_truck']
)

# Filter by center threshold
results = merged[merged['Center_dist'] <= DIST_THRESH].copy()

# Calculate ATTC using corrected method
results['ATTC'] = results.apply(calculate_attc, axis=1)
results[['mindis', 'direction_vec']] = results.apply(
    lambda row: pd.Series(get_closest_corners(row)),
    axis=1
)

# Output
output = results[['Track ID_ped', 'Track ID_light_truck', 'TimeStamp_ped', 'ATTC','mindis']].rename(columns={
    'Track ID_ped': 'Ped_ID',
    'Track ID_light_truck': 'Light_truck_ID',
    'TimeStamp_ped': 'TimeStamp'
})
print(output)
len(results[(results['ATTC']<1) & (results['mindis']<1)][['Track ID_ped', 'Track ID_light_truck']].drop_duplicates())

       Ped_ID  Light_truck_ID  TimeStamp      ATTC    mindis
1427     1168            1200     426.56  1.255037  2.398210
1601     1285            1340     472.28       inf       NaN
1602     1285            1340     472.44  2.464585  2.869909
1603     1285            1340     472.60       inf  2.780909
1604     1285            1340     472.76       inf  2.608381
...       ...             ...        ...       ...       ...
31870   16154           15961    5527.52       inf  0.256236
31878   16154           16087    5528.80       inf  0.178941
31880   16154           16087    5528.96  0.035383  0.244750
31882   16154           16087    5529.12  0.117499  0.610610
31884   16154           16087    5529.28  0.214466  0.777671

[224 rows x 5 columns]


10

In [4]:
len(results[results['ATTC']<1][['Track ID_ped', 'Track ID_light_truck']].drop_duplicates())

20

In [5]:
len(results[(results['ATTC']<1) & (results['mindis']<1)][['Track ID_ped', 'Track ID_light_truck']].drop_duplicates())


10

In [6]:
results[(results['ATTC']<1) & (results['mindis']<1)][['Track ID_ped', 'Track ID_light_truck','TimeStamp_ped']].drop_duplicates()

Unnamed: 0,Track ID_ped,Track ID_light_truck,TimeStamp_ped
21470,9319,9301,3375.68
23281,10279,10285,3668.72
23283,10279,10285,3668.88
23567,10514,10129,3774.12
23983,10918,10910,3845.68
25701,11970,12034,4220.84
27086,12871,12813,4468.08
27099,12881,12813,4461.52
27102,12881,12813,4462.0
27103,12881,12813,4462.16


In [7]:
# Create new DataFrame with selected and renamed columns
conflict_summary = results[[
    'Track ID_ped',        # Pedestrian ID
    'Track ID_light_truck',       # Vehicle ID
    'Type_light_truck',           # Vehicle type
    'TimeStamp_ped',       # Timestamp
    'mindis',              # Minimum corner distance
    'ATTC'                 # ATTC value
]].copy()

# Rename columns
conflict_summary.rename(columns={
    'Track ID_ped': 'PED_ID',
    'Track ID_light_truck': 'VEH_ID',
    'Type_light_truck': 'TYPE',
    'TimeStamp_ped': 'TIMESTAMP',
    'mindis': 'MIN_COR_DIS',
    'ATTC': 'ATTC'
}, inplace=True)

# Preview
print(conflict_summary.head())


      PED_ID  VEH_ID          TYPE  TIMESTAMP  MIN_COR_DIS      ATTC
1427    1168    1200   Light Truck     426.56     2.398210  1.255037
1601    1285    1340   Light Truck     472.28          NaN       inf
1602    1285    1340   Light Truck     472.44     2.869909  2.464585
1603    1285    1340   Light Truck     472.60     2.780909       inf
1604    1285    1340   Light Truck     472.76     2.608381       inf


In [8]:
conflict_summary.to_csv(r"D:\T\test_codeEVT\ATTC_Data/Light_Truck_Ped.csv", index=False)