
# Lane Change Initiation Detection using Lateral Position (NGSIM Data)
This notebook detects the initiation of lane change events using lateral position data.
The goal is to identify where lane changes start based on position data from NGSIM datasets.


In [None]:

import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
from pathlib import Path

# Configuration
DATA_PATH = Path('data/follower_decision_sorted_merged_lane_changing.csv')
OUTPUT_CSV = Path('output/lane_change_initiation_points.csv')
SMOOTH_WINDOW = 100
POLY_ORDER = 2
LATERAL_THRESHOLD = 0.01
VEHICLE_SAMPLE = slice(7, 10)  # Modify this range to plot more/other vehicles


In [None]:

def load_data(file_path):
    df = pd.read_csv(file_path)
    df['frame_number'] = pd.to_numeric(df['frame_number'], errors='coerce')
    df['lateral_position'] = pd.to_numeric(df['lateral_position'], errors='coerce')
    return df


In [None]:

def apply_smoothing(df, column='lateral_position', new_col='smoothed_lateral_position'):
    df[new_col] = savgol_filter(df[column], window_length=SMOOTH_WINDOW, polyorder=POLY_ORDER)
    return df


In [None]:

def find_lane_change_initiation(vehicle_df):
    for idx in range(1, len(vehicle_df)):
        if vehicle_df['lane_no_lane_change'].iloc[idx - 1] == 2 and vehicle_df['lane_no_lane_change'].iloc[idx] == 3:
            lane_change_idx = idx
            initial_index = lane_change_idx
            for i in reversed(range(lane_change_idx)):
                if abs(vehicle_df['smoothed_lateral_position'].iloc[i] -
                       vehicle_df['smoothed_lateral_position'].iloc[initial_index]) > LATERAL_THRESHOLD:
                    initial_index = i
                else:
                    break
            return vehicle_df.iloc[initial_index]
    return None


In [None]:

def detect_all_initiations(df):
    initiation_points = []
    for vehicle_id in df['vehicle_ID'].unique():
        vehicle_df = df[df['vehicle_ID'] == vehicle_id].sort_values(by='frame_number')
        initiation_point = find_lane_change_initiation(vehicle_df)
        if initiation_point is not None:
            initiation_points.append(initiation_point)
    return pd.DataFrame(initiation_points)


In [None]:

def plot_vehicle_lane_changes(df, initiation_df, vehicle_ids):
    fig, ax = plt.subplots(figsize=(10, 6))
    for vehicle_id in vehicle_ids:
        vehicle_df = df[df['vehicle_ID'] == vehicle_id].sort_values(by='frame_number')
        initiation_df_vehicle = initiation_df[initiation_df['vehicle_ID'] == vehicle_id]
        ax.plot(vehicle_df['frame_number'], vehicle_df['smoothed_lateral_position'], label=f'Vehicle {vehicle_id}')
        if not initiation_df_vehicle.empty:
            ax.scatter(initiation_df_vehicle['frame_number'],
                       initiation_df_vehicle['lateral_position'],
                       color='red', label=f'Initiation {vehicle_id}', zorder=5)
    ax.set_xlabel('Frame')
    ax.set_ylabel('Smoothed Lateral Position')
    ax.set_title('Lane Change Initiation Points (Sample Vehicles)')
    ax.legend()
    plt.tight_layout()
    plt.show()


In [None]:

# Create folders if they don't exist
OUTPUT_CSV.parent.mkdir(parents=True, exist_ok=True)

# Load and prepare data
df = load_data(DATA_PATH)
df = apply_smoothing(df)

# Detect initiation points
initiation_df = detect_all_initiations(df)

# Plot results for sample vehicles
sample_ids = df['vehicle_ID'].unique()[VEHICLE_SAMPLE]
plot_vehicle_lane_changes(df, initiation_df, sample_ids)

# Export to CSV
initiation_df.to_csv(OUTPUT_CSV, index=False)
print("Saved initiation points to:", OUTPUT_CSV)
initiation_df.head()
