<a href="https://colab.research.google.com/github/Raswanth-Prasath/NGSIM-Driving-Behavior-Analysis/blob/main/NGSIM_Driving_Behavior_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Optional, Dict, Tuple

@dataclass
class CarFollowingPair:
    """
    Represents a validated car-following pair with all relevant information
    """
    leader_id: int
    follower_id: int
    
    start_frame: int
    end_frame: int
    lane_id: int
    
    metrics: Dict = None

    @property
    def duration(self) -> float:
        """Duration of car-following episode in seconds"""
        return (self.end_frame - self.start_frame + 1) * 0.1  # Convert frames to seconds

class CarFollowingIdentifier:
    """
    Identifies and validates car-following pairs in trajectory data
    """
    def __init__(self, min_duration: float = 20.0,
                 min_spacing: float = 2.0,
                 max_spacing: float = 100.0):
        """
        Initialize with validation criteria
        
        Parameters:
        min_duration: Minimum duration in seconds for valid car-following
        min_spacing: Minimum allowed spacing between vehicles (meters)
        max_spacing: Maximum allowed spacing between vehicles (meters)
        """
        self.min_duration = min_duration
        self.min_frames = int(min_duration * 10)  # Convert to frames (0.1s intervals)
        self.min_spacing = min_spacing
        self.max_spacing = max_spacing
        
    def identify_pairs(self, df: pd.DataFrame) -> List[CarFollowingPair]:
        """
        Main method to identify valid car-following pairs, with integrated leader-follower validation
        
        Parameters:
        df: DataFrame with columns for Vehicle_ID, Frame_ID, Lane_ID, LocalY, Leader_ID, Follower_ID
        
        Returns:
        List of validated CarFollowingPair objects
        """
        valid_pairs = []
        pairs_before_validation = 0
        pairs_after_validation = 0
        
        # Step 1: Group data by lane
        for lane_id, lane_data in df.groupby('Lane_ID'):
            # Skip special lanes (like merge lanes or shoulders)
            if lane_id > 6:  # Assuming regular lanes are 1-6
                continue
                
            # Step 2: Process each time window in the lane
            frames = sorted(lane_data['Frame_ID'].unique())
            
            # Step 3: Track ongoing pairs
            current_pairs = {}
            
            for frame in frames:
                frame_data = lane_data[lane_data['Frame_ID'] == frame]
                
                # Sort vehicles by position to identify leader-follower relationships
                frame_vehicles = frame_data.sort_values('LocalY', ascending=False)
                
                # Step 4: Check each consecutive pair of vehicles
                for i in range(len(frame_vehicles) - 1):
                    leader = frame_vehicles.iloc[i]
                    follower = frame_vehicles.iloc[i + 1]
                    
                    # Calculate spacing
                    spacing = leader['LocalY'] - follower['LocalY']
                    pairs_before_validation += 1
                    
                    # Get vehicle IDs for relationship validation
                    leader_id = leader['Vehicle_ID']
                    follower_id = follower['Vehicle_ID']
                    follower_leader_id = follower['Leader_ID']
                    leader_follower_id = leader['Follower_ID']
                    
                    # Validate the leader-follower relationship:
                    # 1. If explicit relationships exist (non -1), they must be correct
                    # 2. If no explicit relationships (-1), allow validation by spacing
                    valid_relationship = False
                    
                    if follower_leader_id != -1 or leader_follower_id != -1:
                        # At least one explicit relationship exists - it must be correct
                        leader_match = follower_leader_id == leader_id
                        follower_match = leader_follower_id == follower_id
                        valid_relationship = leader_match or follower_match
                    else:
                        # No explicit relationships - allow validation by spacing
                        valid_relationship = True
                    
                    # Both the relationship and spacing must be valid
                    if valid_relationship and self.min_spacing <= spacing <= self.max_spacing:
                        pairs_after_validation += 1
                        pair_id = (leader_id, follower_id)
                        
                        if pair_id not in current_pairs:
                            # Start new pair tracking
                            current_pairs[pair_id] = {
                                'start_frame': frame,
                                'current_frame': frame,
                                'lane_id': lane_id
                            }
                        else:
                            # Update existing pair
                            current_pairs[pair_id]['current_frame'] = frame
                    else:
                        # Invalid pair - end pair if exists
                        pair_id = (leader_id, follower_id)
                        self._check_and_add_pair(current_pairs, pair_id, valid_pairs)
                
            # Process any remaining pairs at the end of the lane
            for pair_id in list(current_pairs.keys()):
                self._check_and_add_pair(current_pairs, pair_id, valid_pairs)
        
        print(f"Pairs before validation: {pairs_before_validation}")
        print(f"Pairs after validation: {pairs_after_validation}")
        return valid_pairs
    
    def _check_and_add_pair(self, current_pairs: Dict, 
                           pair_id: Tuple[int, int],
                           valid_pairs: List[CarFollowingPair]) -> None:
        """
        Validates and adds a car-following pair if it meets duration criteria
        """
        if pair_id in current_pairs:
            pair_data = current_pairs[pair_id]
            duration_frames = pair_data['current_frame'] - pair_data['start_frame'] + 1
            
            if duration_frames >= self.min_frames:
                # Create validated pair
                valid_pairs.append(CarFollowingPair(
                    leader_id=pair_id[0],
                    follower_id=pair_id[1],
                    start_frame=pair_data['start_frame'],
                    end_frame=pair_data['current_frame'],
                    lane_id=pair_data['lane_id']
                ))
            
            # Remove pair from tracking
            del current_pairs[pair_id]
    
    def compute_pair_metrics(self, pair: CarFollowingPair, df: pd.DataFrame) -> Dict:
        """
        Computes detailed metrics for a validated car-following pair with proper handling of zero speeds
        
        Parameters:
        pair: CarFollowingPair object
        df: Original trajectory DataFrame
        
        Returns:
        Dictionary of computed metrics
        """
        # Get leader and follower trajectories
        leader_data = df[(df['Vehicle_ID'] == pair.leader_id) & 
                        (df['Frame_ID'] >= pair.start_frame) & 
                        (df['Frame_ID'] <= pair.end_frame)]
        
        follower_data = df[(df['Vehicle_ID'] == pair.follower_id) & 
                        (df['Frame_ID'] >= pair.start_frame) & 
                        (df['Frame_ID'] <= pair.end_frame)]
        
        # Compute spacing statistics
        spacing = leader_data['LocalY'].values - follower_data['LocalY'].values
        
        # Compute speed difference statistics
        speed_diff = leader_data['Speed'].values - follower_data['Speed'].values
        
        # Compute time headway with careful handling of zero speeds
        follower_speeds = follower_data['Speed'].values
        
        # Create a mask for non-zero speeds to avoid division by zero
        non_zero_speed_mask = follower_speeds > 0.01  # Small threshold to handle near-zero speeds
        
        # Only calculate time headway where speed is non-zero
        if np.any(non_zero_speed_mask):
            # Calculate time headway only for non-zero speeds
            valid_spacings = spacing[non_zero_speed_mask]
            valid_speeds = follower_speeds[non_zero_speed_mask]
            time_headway = valid_spacings / valid_speeds
            
            # Remove any extreme values that might still occur
            reasonable_headway_mask = (time_headway > 0) & (time_headway < 20)  # Filter unreasonable values
            valid_headway = time_headway[reasonable_headway_mask] if np.any(reasonable_headway_mask) else np.array([])
        else:
            # No valid speeds for calculation
            valid_headway = np.array([])
        
        # Handle empty arrays for edge cases
        if len(valid_headway) == 0:
            headway_stats = {
                'mean': np.nan,
                'std': np.nan,
                'min': np.nan,
                'max': np.nan
            }
        else:
            headway_stats = {
                'mean': np.mean(valid_headway),
                'std': np.std(valid_headway),
                'min': np.min(valid_headway),
                'max': np.max(valid_headway)
            }
        
        return {
            'spacing': {
                'mean': np.mean(spacing),
                'std': np.std(spacing),
                'min': np.min(spacing),
                'max': np.max(spacing)
            },
            'speed_difference': {
                'mean': np.mean(speed_diff),
                'std': np.std(speed_diff),
                'min': np.min(speed_diff),
                'max': np.max(speed_diff)
            },
            'time_headway': headway_stats
        }

def main():
    # Example usage
    # Read the data file
    df = pd.read_csv("D:\ASU Academics\Traffic Flow Theroy\MP-1\Reconstructed NGSIM I80-1 data\Data\DATA (NO MOTORCYCLES).txt", delimiter='\s+', header=None,
                     names=['Vehicle_ID', 'Frame_ID', 'Lane_ID', 'LocalY',
                           'Speed', 'Acceleration', 'Vehicle_Length',
                           'Vehicle_Class', 'Follower_ID', 'Leader_ID'])
    
    # Initialize identifier
    identifier = CarFollowingIdentifier(
        min_duration=20.0,  # 20 seconds minimum
        min_spacing=2.0,    # 2 meters minimum spacing
        max_spacing=100.0   # 100 meters maximum spacing
    )
    
    # Find car-following pairs
    all_pairs = identifier.identify_pairs(df)
    
    # Print summary
    print(f"Found {len(all_pairs)} valid car-following pairs")
    
    # Export all pairs to CSV
    export_pairs_to_csv(all_pairs, df, identifier, "all_car_following_pairs.csv")
    
    # Select the specific pairs we want
    selected_pairs = []
    selected_pair_ids = [
        (1087.0, 1101.0),  # Pair 57 - Aggressive Following with Large Speed Difference
        (120.0, 125.0),    # Pair 13 - Very Close Following
        (2066.0, 2074.0),  # Pair 96 - Conservative Following with Large Spacing
        (1463.0, 1478.0),  # Pair 714 - Lane 3 with Medium Following Distance
        (260.0, 267.0)     # Pair 238 - Stable Following with Small Speed Difference
    ]
    
    # Find these pairs in our identified pairs
    for pair in all_pairs:
        if (pair.leader_id, pair.follower_id) in selected_pair_ids:
            selected_pairs.append(pair)
    
    print(f"Selected {len(selected_pairs)} specific car-following pairs for analysis")
    
    # Export selected pairs to a separate CSV
    export_pairs_to_csv(selected_pairs, df, identifier, "selected_car_following_pairs.csv")
    
    # Analyze each selected pair
    for i, pair in enumerate(selected_pairs):
        print(f"\nPair {i+1}:")
        print(f"Leader ID: {pair.leader_id}")
        print(f"Follower ID: {pair.follower_id}")
        print(f"Duration: {pair.duration:.1f} seconds")
        print(f"Lane: {pair.lane_id}")
        
        # Compute and print metrics
        metrics = identifier.compute_pair_metrics(pair, df)
        print("\nMetrics:")
        for metric, values in metrics.items():
            print(f"\n{metric}:")
            for stat, value in values.items():
                print(f"  {stat}: {value:.2f}")

def export_pairs_to_csv(pairs, df, identifier, filename):
    """
    Export all car-following pairs with their metrics to a CSV file
    
    Parameters:
    pairs: List of CarFollowingPair objects
    df: Original trajectory DataFrame
    identifier: CarFollowingIdentifier instance for computing metrics
    filename: Name of the output CSV file
    """
    # Prepare a list to hold all pair data
    all_pairs_data = []
    
    # Process each pair
    for i, pair in enumerate(pairs):
        # Compute metrics for this pair
        metrics = identifier.compute_pair_metrics(pair, df)
        
        # Create a dictionary with all relevant data
        pair_data = {
            'pair_id': i + 1,
            'leader_id': pair.leader_id,
            'follower_id': pair.follower_id,
            'start_frame': pair.start_frame,
            'end_frame': pair.end_frame,
            'lane_id': pair.lane_id,
            'duration_seconds': pair.duration,
            'mean_spacing': metrics['spacing']['mean'],
            'std_spacing': metrics['spacing']['std'],
            'min_spacing': metrics['spacing']['min'],
            'max_spacing': metrics['spacing']['max'],
            'mean_speed_diff': metrics['speed_difference']['mean'],
            'std_speed_diff': metrics['speed_difference']['std'],
            'min_speed_diff': metrics['speed_difference']['min'],
            'max_speed_diff': metrics['speed_difference']['max'],
            'mean_time_headway': metrics['time_headway']['mean'],
            'std_time_headway': metrics['time_headway']['std'],
            'min_time_headway': metrics['time_headway']['min'],
            'max_time_headway': metrics['time_headway']['max']
        }
        
        all_pairs_data.append(pair_data)
    
    # Convert to DataFrame
    pairs_df = pd.DataFrame(all_pairs_data)
    
    # Save to CSV
    pairs_df.to_csv(filename, index=False)
    print(f"Exported {len(pairs)} car-following pairs to {filename}")
        
if __name__ == "__main__":
    main()

Pairs before validation: 966462
Pairs after validation: 963929
Found 2029 valid car-following pairs
Exported 2029 car-following pairs to all_car_following_pairs.csv
Selected 5 specific car-following pairs for analysis
Exported 5 car-following pairs to selected_car_following_pairs.csv

Pair 1:
Leader ID: 120.0
Follower ID: 125.0
Duration: 28.1 seconds
Lane: 1

Metrics:

spacing:
  mean: 17.80
  std: 6.47
  min: 10.29
  max: 28.33

speed_difference:
  mean: 0.28
  std: 1.40
  min: -3.13
  max: 3.17

time_headway:
  mean: 1.30
  std: 0.23
  min: 1.00
  max: 1.78

Pair 2:
Leader ID: 1087.0
Follower ID: 1101.0
Duration: 20.1 seconds
Lane: 1

Metrics:

spacing:
  mean: 37.24
  std: 17.02
  min: 17.05
  max: 65.33

speed_difference:
  mean: 2.25
  std: 1.86
  min: -2.29
  max: 5.32

time_headway:
  mean: 2.14
  std: 0.68
  min: 1.25
  max: 3.24

Pair 3:
Leader ID: 2066.0
Follower ID: 2074.0
Duration: 22.3 seconds
Lane: 1

Metrics:

spacing:
  mean: 45.91
  std: 8.10
  min: 35.46
  max: 64.52


Analyzes statistical properties of car-following pairs and generates comprehensive visualizations and analyses.

**Car-Following Visualization and Calibration**

The following code uses the car_following_pairs.csv file generated from the previous code to analyze the car-following pairs and plot the results.

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import logging

# Configure logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

def load_data(file_path):
    """Load and process the trajectory data"""
    try:
        # Read data with numbered columns first
        data = pd.read_csv(file_path, delimiter='\s+', header=None)
        
        # Rename columns to match expected format
        data.columns = ['Vehicle_ID', 'Frame_ID', 'Lane_ID', 'Local_Y', 
                       'Speed', 'Acceleration', 'Vehicle_Length',
                       'Vehicle_Class', 'Follower_ID', 'Leader_ID']
        
        logger.info(f"Successfully loaded data with {len(data)} rows")
        return data
        
    except Exception as e:
        logger.error(f"Error loading data: {e}")
        return pd.DataFrame()

def load_car_following_pairs(csv_file):
    """
    Load previously identified car-following pairs from a CSV file
    
    Parameters:
    csv_file: Path to the CSV file containing car-following pairs
    
    Returns:
    List of dictionaries with car-following pair information
    """
    try:
        # Load the CSV file
        pairs_df = pd.read_csv(csv_file)
        logger.info(f"Loaded {len(pairs_df)} car-following pairs from {csv_file}")
        
        # Convert dataframe rows to dictionaries for compatibility with existing code
        pairs = []
        for _, row in pairs_df.iterrows():
            pairs.append({
                'leader_id': row['leader_id'],
                'follower_id': row['follower_id'],
                'start_frame': row['start_frame'],
                'end_frame': row['end_frame'],
                'lane': row['lane_id'] if 'lane_id' in row else row.get('lane', 1),  # Handle both column naming conventions
                'duration': row['duration_seconds'] if 'duration_seconds' in row else 
                           (row['end_frame'] - row['start_frame'] + 1) * 0.1
            })
        
        return pairs
        
    except Exception as e:
        logger.error(f"Error loading car-following pairs: {e}")
        return []

def process_pair_data(data, selected_pairs):
    """Process data for selected car-following pairs"""
    pair_data = []
    
    for i, pair in enumerate(selected_pairs):
        # Get trajectory data for leader and follower
        frames = range(int(pair['start_frame']), int(pair['end_frame']) + 1)
        
        leader_data = data[
            (data['Vehicle_ID'] == pair['leader_id']) & 
            (data['Frame_ID'].isin(frames))
        ].sort_values('Frame_ID')
        
        follower_data = data[
            (data['Vehicle_ID'] == pair['follower_id']) & 
            (data['Frame_ID'].isin(frames))
        ].sort_values('Frame_ID')
        
        # Combine data
        merged_data = pd.merge(
            leader_data, 
            follower_data,
            on='Frame_ID',
            suffixes=('_leader', '_follower')
        )
        
        time_data = []
        for _, row in merged_data.iterrows():
            time_data.append({
                'time': (row['Frame_ID'] - pair['start_frame']) * 0.1,
                'leader_speed': row['Speed_leader'],
                'follower_speed': row['Speed_follower'],
                'spacing': row['Local_Y_leader'] - row['Local_Y_follower'],
                'relative_speed': row['Speed_leader'] - row['Speed_follower']
            })
        
        # Skip pairs with empty merged data
        if not time_data:
            logger.warning(f"Skipping pair {i+1} ({pair['leader_id']}-{pair['follower_id']}) due to empty merged data")
            continue
        
        pair_data.append({
            'pair_id': f"{pair['leader_id']}-{pair['follower_id']}",
            'time_data': pd.DataFrame(time_data)
        })
    
    return pair_data

def plot_pair_visualizations(pair_data, output_dir="."):
    """Create visualizations for car-following pairs"""
    for pair in pair_data:
        time_data = pair['time_data']
        
        fig, axes = plt.subplots(3, 1, figsize=(12, 15))
        fig.suptitle(f"Car-Following Pair Analysis: Pair {pair['pair_id']}", fontsize=16 )
        
        # Speed profiles
        axes[0].plot(time_data['time'], time_data['leader_speed'], 
                    label='Leader', color='#3B39FF', linewidth=2)
        axes[0].plot(time_data['time'], time_data['follower_speed'], 
                    label='Follower', color='#FF4040', linestyle='--', linewidth=2)
        axes[0].set_xlabel('Time (s)')
        axes[0].set_ylabel('Speed (m/s)')
        axes[0].set_title('Speed Profiles')
        axes[0].grid(True)
        axes[0].legend()
        
        # Space gap
        axes[1].plot(time_data['time'], time_data['spacing'], 
                    color='green', linewidth=2)
        axes[1].set_xlabel('Time (s)')
        axes[1].set_ylabel('Space Gap (m)')
        axes[1].set_title('Following Distance')
        axes[1].grid(True)
        
        # Relative speed
        axes[2].plot(time_data['time'], time_data['relative_speed'], 
                    color='purple', linewidth=2)
        axes[2].set_xlabel('Time (s)')
        axes[2].set_ylabel('Relative Speed (m/s)')
        axes[2].set_title('Relative Speed (Leader - Follower)')
        axes[2].grid(True)
        
        # Add summary statistics
        stats_text = (
            f"Mean Space Gap: {time_data['spacing'].mean():.1f}m\n"
            f"Mean Relative Speed: {time_data['relative_speed'].mean():.2f}m/s\n"
            f"Duration: {len(time_data)*0.1:.1f}s"
        )
        plt.figtext(0.02, 0.02, stats_text, fontsize=10, 
                   bbox=dict(facecolor='white', alpha=0.8))
        
        plt.tight_layout(rect=[0, 0.03, 1, 0.95])
        output_path = f"{output_dir}/pair_{pair['pair_id']}_analysis.png"
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        logger.info(f"Saved visualization to {output_path}")
        plt.close()

def main():
    # Define file paths
    data_file = "D:/ASU Academics/Traffic Flow Theroy/MP-1/Reconstructed NGSIM I80-1 data/Data/DATA (NO MOTORCYCLES).txt"
    pairs_csv = "selected_car_following_pairs.csv"  # Use the selected pairs CSV from first code
    output_dir = "visualizations"
    
    # Create output directory if it doesn't exist
    import os
    os.makedirs(output_dir, exist_ok=True)
    
    # Load trajectory data
    logger.info("Loading trajectory data...")
    data = load_data(data_file)
    
    if len(data) == 0:
        logger.error("Failed to load trajectory data")
        return
    
    # Load specific selected car-following pairs from CSV
    logger.info(f"Loading selected car-following pairs from {pairs_csv}...")
    pairs = load_car_following_pairs(pairs_csv)
    
    if len(pairs) == 0:
        logger.error(f"No car-following pairs found in {pairs_csv}")
        return
    
    # Process and visualize pairs
    logger.info("Processing and visualizing selected pairs...")
    pair_data = process_pair_data(data, pairs)
    
    if len(pair_data) == 0:
        logger.error("No valid pair data generated for visualization")
        return
    
    plot_pair_visualizations(pair_data, output_dir)
    
    logger.info(f"Analysis complete! Visualizations saved to {output_dir}/")

if __name__ == "__main__":
    main()

2025-02-22 16:01:17,872 - __main__ - INFO - Loading trajectory data...
2025-02-22 16:01:17,872 - __main__ - INFO - Loading trajectory data...
2025-02-22 16:01:17,872 - __main__ - INFO - Loading trajectory data...
2025-02-22 16:01:17,872 - __main__ - INFO - Loading trajectory data...
2025-02-22 16:01:17,872 - __main__ - INFO - Loading trajectory data...
2025-02-22 16:01:20,742 - __main__ - INFO - Successfully loaded data with 1055801 rows
2025-02-22 16:01:20,742 - __main__ - INFO - Successfully loaded data with 1055801 rows
2025-02-22 16:01:20,742 - __main__ - INFO - Successfully loaded data with 1055801 rows
2025-02-22 16:01:20,742 - __main__ - INFO - Successfully loaded data with 1055801 rows
2025-02-22 16:01:20,742 - __main__ - INFO - Successfully loaded data with 1055801 rows
2025-02-22 16:01:20,746 - __main__ - INFO - Loading selected car-following pairs from selected_car_following_pairs.csv...
2025-02-22 16:01:20,746 - __main__ - INFO - Loading selected car-following pairs from se

**Lane Change Analysis :** Count the lane-change occurrences in the dataset. Analyze where and when these lane changes occur, identifying any observable trends.

In [5]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

class LaneChangeAnalyzer:
    def __init__(self, min_duration: float = 0.5):
        self.min_duration = min_duration
        self.min_frames = int(min_duration * 10)
        self.lane_changes = []
        
    def analyze_trajectories(self, df: pd.DataFrame) -> None:
        """Analyzes trajectory data to identify lane changes"""
        # First, rename columns to match our expected format
        df.columns = ['Vehicle_ID', 'Frame_ID', 'Lane_ID', 'LocalY',
                     'Speed', 'Acceleration', 'Vehicle_Length',
                     'Vehicle_Class', 'Follower_ID', 'Leader_ID']
        
        # Group by vehicle
        for vehicle_id in df['Vehicle_ID'].unique():
            # Get vehicle's trajectory
            vehicle_data = df[df['Vehicle_ID'] == vehicle_id].sort_values('Frame_ID')
            
            # Initialize variables for tracking lane changes
            prev_lane = None
            start_frame = None
            from_lane = None
            
            # Analyze frame by frame
            for idx, row in vehicle_data.iterrows():
                curr_lane = row['Lane_ID']
                
                if prev_lane is not None and curr_lane != prev_lane:
                    if start_frame is None:
                        # Start of lane change
                        start_frame = row['Frame_ID']
                        from_lane = prev_lane
                    elif curr_lane != from_lane:
                        # End of lane change
                        end_frame = row['Frame_ID']
                        duration = (end_frame - start_frame) * 0.1
                        
                        if duration >= self.min_duration:
                            self.lane_changes.append({
                                'vehicle_id': vehicle_id,
                                'start_frame': start_frame,
                                'end_frame': end_frame,
                                'from_lane': from_lane,
                                'to_lane': curr_lane,
                                'position': row['LocalY'],
                                'speed': row['Speed'],
                                'duration': duration,
                                'direction': 'left' if curr_lane > from_lane else 'right'
                            })
                        
                        # Reset for next lane change but immediately start tracking a potential new lane change
                        start_frame = row['Frame_ID']  # Start tracking a new lane change immediately
                        from_lane = prev_lane  # The lane we just left might be the start of a new lane change
                
                prev_lane = curr_lane
    
    def generate_statistics(self) -> dict:
        """Generates summary statistics of lane changes"""
        if not self.lane_changes:
            return {
                'total_changes': 0,
                'direction_counts': {'left': 0, 'right': 0},
                'avg_duration': 0,
                'speed_stats': {'mean': 0, 'std': 0, 'min': 0, 'max': 0}
            }
            
        df_changes = pd.DataFrame(self.lane_changes)
        
        stats = {
            'total_changes': len(self.lane_changes),
            'direction_counts': df_changes['direction'].value_counts().to_dict(),
            'avg_duration': df_changes['duration'].mean(),
            'speed_stats': {
                'mean': df_changes['speed'].mean(),
                'std': df_changes['speed'].std(),
                'min': df_changes['speed'].min(),
                'max': df_changes['speed'].max()
            },
            'lane_transitions': pd.crosstab(
                df_changes['from_lane'],
                df_changes['to_lane']
            ).to_dict(),
            'position_dist': pd.cut(
                df_changes['position'],
                bins=np.arange(0, df_changes['position'].max() + 100, 100)
            ).value_counts().sort_index().to_dict()
        }
        
        return stats
    
    def plot_analysis(self, stats: dict) -> None:
        """Creates visualizations of lane change patterns"""
        fig = plt.figure(figsize=(15, 10))
        
        # 1. Direction Distribution (Top Left)
        plt.subplot(221)
        directions = list(stats['direction_counts'].keys())
        counts = list(stats['direction_counts'].values())
        plt.bar(directions, counts)
        plt.title('Lane Change Direction Distribution')
        plt.ylabel('Number of Lane Changes')
        
        # 2. Position Distribution (Top Right)
        plt.subplot(222)
        positions = list(stats['position_dist'].keys())
        position_counts = list(stats['position_dist'].values())
        plt.bar(range(len(positions)), position_counts)
        plt.title('Lane Change Position Distribution')
        plt.xlabel('Position (100m segments)')
        plt.ylabel('Number of Lane Changes')
        
        # 3. Lane Transitions Heatmap (Bottom Left)
        plt.subplot(223)
        if self.lane_changes:
            df_changes = pd.DataFrame(self.lane_changes)
            transition_matrix = pd.crosstab(
                df_changes['from_lane'],
                df_changes['to_lane']
            )
            sns.heatmap(transition_matrix, annot=True, fmt='d', cmap='YlOrRd')
            plt.title('Lane Change Transitions')
            plt.xlabel('To Lane')
            plt.ylabel('From Lane')
                    
        # 4. Speed Distribution (Bottom Right)
        plt.subplot(224)
        if self.lane_changes:
            speeds = [lc['speed'] for lc in self.lane_changes]
            plt.hist(speeds, bins=20)
            plt.title('Speed During Lane Changes')
            plt.xlabel('Speed (m/s)')
            plt.ylabel('Frequency')
                    
        plt.tight_layout()
        plt.savefig('lane_change_analysis.png', dpi=300, bbox_inches='tight')
        plt.close()

def main():
    # Read data file
    print("Reading data file...")
    df = pd.read_csv("D:/ASU Academics/Traffic Flow Theroy/MP-1/Reconstructed NGSIM I80-1 data/Data/DATA (NO MOTORCYCLES).txt", delimiter='\s+', header=None)
    
    # Initialize analyzer
    print("Analyzing lane changes...")
    analyzer = LaneChangeAnalyzer(min_duration=0.5)
    
    # Analyze trajectories
    analyzer.analyze_trajectories(df)
    
    # Generate statistics
    print("Generating statistics...")
    stats = analyzer.generate_statistics()
    
    # Print summary
    print("\nLane Change Analysis Summary:")
    print(f"Total lane changes: {stats['total_changes']}")
    print("\nDirection distribution:")
    for direction, count in stats['direction_counts'].items():
        print(f"{direction}: {count} ({count/stats['total_changes']*100:.1f}%)")
    print(f"\nAverage duration: {stats['avg_duration']:.2f} seconds")
    print("\nSpeed statistics during lane changes:")
    for stat, value in stats['speed_stats'].items():
        print(f"{stat}: {value:.2f} m/s")
    
    # Create visualizations
    print("\nCreating visualizations...")
    analyzer.plot_analysis(stats)
    print("Analysis complete. Visualizations saved as 'lane_change_analysis.png'")

if __name__ == "__main__":
    main()

Reading data file...
Analyzing lane changes...
Generating statistics...

Lane Change Analysis Summary:
Total lane changes: 204

Direction distribution:
right: 172 (84.3%)
left: 32 (15.7%)

Average duration: 15.26 seconds

Speed statistics during lane changes:
mean: 7.86 m/s
std: 2.44 m/s
min: 0.97 m/s
max: 14.77 m/s

Creating visualizations...
Analysis complete. Visualizations saved as 'lane_change_analysis.png'


**Time-Space Diagram :** Plot the lane-by-lane time-space diagram for all the NGSIM trajectory data. Based on the diagram, discuss traffic conditions and patterns of congestion.

In [28]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
import seaborn as sns

class TimeSpaceDiagramAnalyzer:
    def __init__(self, data_file: str):
        """Initialize analyzer with data file path"""
        self.df = pd.read_csv(data_file, delimiter='\s+', header=None,
                             names=['Vehicle_ID', 'Frame_ID', 'Lane_ID', 'LocalY',
                                   'Speed', 'Acceleration', 'Vehicle_Length',
                                   'Vehicle_Class', 'Follower_ID', 'Leader_ID'])
        
        # Convert Frame_ID to time in seconds
        self.df['Time'] = self.df['Frame_ID'] * 0.1
        
    def plot_lane_diagram(self, lane_id: int, ax=None, cmap='viridis'):
        """Plot time-space diagram for a specific lane"""
        lane_data = self.df[self.df['Lane_ID'] == lane_id]
        
        lines = []
        colors = []
        
        for vehicle_id in lane_data['Vehicle_ID'].unique():
            vehicle_traj = lane_data[lane_data['Vehicle_ID'] == vehicle_id]
            if len(vehicle_traj) > 1:
                points = np.column_stack((vehicle_traj['Time'], 
                                       vehicle_traj['LocalY']))
                lines.append(points)
                colors.append(np.mean(vehicle_traj['Speed']))
        
        if not lines:
            return None
            
        lc = LineCollection(lines, cmap=plt.get_cmap(cmap))
        lc.set_array(np.array(colors))
        
        if ax is None:
            ax = plt.gca()
            
        line = ax.add_collection(lc)
        
        # Set axis limits
        times = lane_data['Time']
        positions = lane_data['LocalY']
        ax.set_xlim(times.min(), times.max())
        ax.set_ylim(positions.min(), positions.max())
        
        return line

    def create_full_diagram(self):
        """Create time-space diagrams for all lanes"""
        lanes = sorted(self.df['Lane_ID'].unique())
        n_lanes = len(lanes)
        
        fig, axes = plt.subplots(n_lanes, 1, figsize=(15, 4*n_lanes), sharex=True)
        if n_lanes == 1:
            axes = [axes]
            
        fig.suptitle('Time-Space Diagram by Lane', fontsize=16, y=0.92)
        
        for ax, lane_id in zip(axes, lanes):
            line = self.plot_lane_diagram(lane_id, ax=ax)
            if line is not None:
                plt.colorbar(line, ax=ax, label='Speed (m/s)')
            
            ax.set_ylabel('Position (m)')
            ax.set_title(f'Lane {lane_id}')
            ax.grid(True, linestyle='--', alpha=0.7)
            
        axes[-1].set_xlabel('Time (seconds)')
        
        plt.tight_layout()
        return fig, axes

    def plot_congestion_heatmap(self):
        """Create a heatmap showing average speeds by lane and time"""
        # Calculate time bins (5-minute intervals)
        time_bins = np.arange(0, self.df['Time'].max() + 300, 300)
        lanes = sorted(self.df['Lane_ID'].unique())
        
        # Create speed matrix
        speed_matrix = np.zeros((len(lanes), len(time_bins)-1))
        
        for i, lane_id in enumerate(lanes):
            lane_data = self.df[self.df['Lane_ID'] == lane_id]
            
            for j, (t_start, t_end) in enumerate(zip(time_bins[:-1], time_bins[1:])):
                mask = (lane_data['Time'] >= t_start) & (lane_data['Time'] < t_end)
                avg_speed = lane_data[mask]['Speed'].mean()
                speed_matrix[i, j] = avg_speed if not np.isnan(avg_speed) else 0
        
        # Create figure
        fig, ax = plt.subplots(figsize=(15, 5))
        
        # Create heatmap
        im = ax.imshow(speed_matrix, 
                      aspect='auto',
                      cmap='RdYlGn',
                      extent=[0, self.df['Time'].max()/60, len(lanes)-0.5, -0.5])
        
        # Add colorbar
        plt.colorbar(im, ax=ax, label='Average Speed (m/s)')
        
        # Configure axes
        ax.set_yticks(range(len(lanes)))
        ax.set_yticklabels([f'Lane {lane}' for lane in lanes])
        
        # Add time labels (in minutes)
        time_ticks = np.linspace(0, self.df['Time'].max()/60, 10)
        ax.set_xticks(time_ticks)
        ax.set_xticklabels([f'{t:.0f}' for t in time_ticks])
        
        plt.title('Traffic Speed Heatmap')
        plt.xlabel('Time (minutes)')
        plt.ylabel('Lane')
        
        return fig, ax

    def analyze_congestion(self):
        """Analyze congestion patterns"""
        congestion_threshold = 10  # m/s
        
        # Calculate average speeds in 5-minute windows
        self.df['time_window'] = pd.cut(self.df['Time'], 
                                      bins=np.arange(0, self.df['Time'].max() + 300, 300))
        
        speed_stats = self.df.groupby(['Lane_ID', 'time_window'])['Speed'].agg([
            'mean', 'std', 'count'
        ]).reset_index()
        
        # Identify congestion
        congestion = speed_stats[speed_stats['mean'] < congestion_threshold]
        
        # Calculate overall statistics
        lane_stats = self.df.groupby('Lane_ID')['Speed'].agg([
            'mean', 'std', 'min', 'max'
        ]).round(2)
        
        return {
            'congestion_periods': congestion,
            'lane_statistics': lane_stats,
            'congestion_threshold': congestion_threshold
        }

def main():
    # Initialize analyzer
    analyzer = TimeSpaceDiagramAnalyzer("D:/ASU Academics/Traffic Flow Theroy/MP-1/Reconstructed NGSIM I80-1 data/Data/DATA (NO MOTORCYCLES).txt")
    
    # Create time-space diagrams
    print("Creating time-space diagrams...")
    fig_ts, axes_ts = analyzer.create_full_diagram()
    fig_ts.savefig('time_space_diagram.png', dpi=100, bbox_inches='tight')
    plt.close(fig_ts)
    
    # Create congestion heatmap
    print("Creating congestion heatmap...")
    fig_heat, ax_heat = analyzer.plot_congestion_heatmap()
    fig_heat.savefig('congestion_heatmap.png', dpi=300, bbox_inches='tight')
    plt.close(fig_heat)
    
    # Analyze congestion
    print("\nAnalyzing congestion patterns...")
    stats = analyzer.analyze_congestion()
    
    # Print summary
    print("\nTraffic Analysis Summary:")
    print(f"\nCongestion threshold: {stats['congestion_threshold']} m/s")
    
    print("\nLane Statistics:")
    print(stats['lane_statistics'])
    
    print("\nCongestion Periods:")
    congestion = stats['congestion_periods']
    if not congestion.empty:
        for _, period in congestion.iterrows():
            print(f"Lane {period['Lane_ID']}: "
                  f"Time window {period['time_window']}, "
                  f"Average speed: {period['mean']:.1f} m/s")

if __name__ == "__main__":
    main()

Creating time-space diagrams...
Creating congestion heatmap...

Analyzing congestion patterns...

Traffic Analysis Summary:

Congestion threshold: 10 m/s

Lane Statistics:
          mean   std   min    max
Lane_ID                          
1        16.63  3.71  0.58  31.63
2         7.11  2.64  0.00  17.74
3         7.06  2.47  0.00  16.52
4         6.37  2.78  0.00  16.74
5         7.02  2.99  0.00  19.24
6         6.90  3.14  0.00  20.08
7         6.34  4.18  0.00  20.74
999      10.76  3.60  0.80  17.77

Congestion Periods:
Lane 2: Time window (0.0, 300.0], Average speed: 7.3 m/s
Lane 2: Time window (300.0, 600.0], Average speed: 7.5 m/s
Lane 2: Time window (600.0, 900.0], Average speed: 6.3 m/s
Lane 2: Time window (900.0, 1200.0], Average speed: 9.1 m/s
Lane 3: Time window (0.0, 300.0], Average speed: 7.6 m/s
Lane 3: Time window (300.0, 600.0], Average speed: 7.1 m/s
Lane 3: Time window (600.0, 900.0], Average speed: 6.4 m/s
Lane 3: Time window (900.0, 1200.0], Average speed: 8.2 m

  speed_stats = self.df.groupby(['Lane_ID', 'time_window'])['Speed'].agg([
