In [3]:
import xml.etree.ElementTree as ET
import pandas as pd
from collections import defaultdict

def process_fcd_data(fcd_file_path):
    # Parse the XML file
    tree = ET.parse(fcd_file_path)
    root = tree.getroot()
    
    # Vehicle tracking dictionaries
    vehicle_paths = {}  # {vehicle_id: [list of lanes]}
    vehicle_entry_lanes = {}  # {vehicle_id: entry_lane}
    vehicle_exit_lanes = {}  # {vehicle_id: exit_lane}
    
    # Process all timesteps
    for timestep in root.findall('timestep'):
        for vehicle in timestep.findall('vehicle'):
            vehicle_id = vehicle.get('id')
            lane = vehicle.get('lane')
            
            # Record the lane for this vehicle at this timestep
            if vehicle_id not in vehicle_paths:
                vehicle_paths[vehicle_id] = []
            
            # Only add the lane if it's different from the last recorded lane
            if not vehicle_paths[vehicle_id] or vehicle_paths[vehicle_id][-1] != lane:
                vehicle_paths[vehicle_id].append(lane)
            
            # Record entry lane (first lane the vehicle appears on)
            if vehicle_id not in vehicle_entry_lanes:
                vehicle_entry_lanes[vehicle_id] = lane
    
    # Determine exit lanes (last lane the vehicle appears on)
    for vehicle_id, lanes in vehicle_paths.items():
        if lanes:
            vehicle_exit_lanes[vehicle_id] = lanes[-1]
    
    return vehicle_entry_lanes, vehicle_exit_lanes, vehicle_paths

def determine_relation(entry_lane, exit_lane):
    # Map entry lanes to approach identifiers
    approach_map = {
        'Ai': 'A',
        'Bi': 'B',
        'Ci': 'C',
        'Di': 'D'
    }
    
    # Map exit lanes to movement direction
    direction_map = {
        # From A (Piastowska North)
        ('Ai', 'Bo'): 'L',  # Left turn to B (Reymonta East)
        ('Ai', 'Co'): 'W',  # Straight to C (Piastowska South)
        ('Ai', 'Do'): 'P',  # Right turn to D (Buszka West)
        
        # From B (Reymonta East)
        ('Bi', 'Ao'): 'P',  # Right turn to A (Piastowska North)
        ('Bi', 'Co'): 'L',  # Left turn to C (Piastowska South)
        ('Bi', 'Do'): 'W',  # Straight to D (Buszka West)
        
        # From C (Piastowska South)
        ('Ci', 'Ao'): 'W',  # Straight to A (Piastowska North)
        ('Ci', 'Bo'): 'P',  # Right turn to B (Reymonta East)
        ('Ci', 'Do'): 'L',  # Left turn to D (Buszka West)
        
        # From D (Buszka West)
        ('Di', 'Ao'): 'L',  # Left turn to A (Piastowska North)
        ('Di', 'Bo'): 'W',  # Straight to B (Reymonta East)
        ('Di', 'Co'): 'P',  # Right turn to C (Piastowska South)
    }
    
    # Extract approach from entry lane (e.g., 'Ai_0' -> 'Ai')
    entry_approach = entry_lane.split('_')[0]
    # Extract approach from exit lane (e.g., 'Bo_0' -> 'Bo')
    exit_approach = exit_lane.split('_')[0]
    
    # Get the relation code
    relation_key = (entry_approach, exit_approach)
    if relation_key in direction_map:
        approach = approach_map.get(entry_approach, '?')
        direction = direction_map[relation_key]
        return f"{approach}{direction}"
    
    return "Unknown"

def calculate_flow_rates(vehicle_entry_lanes, vehicle_exit_lanes, simulation_time_seconds):
    # Count the number of vehicles for each relation
    relation_counts = defaultdict(int)
    
    for vehicle_id, entry_lane in vehicle_entry_lanes.items():
        if vehicle_id in vehicle_exit_lanes:
            exit_lane = vehicle_exit_lanes[vehicle_id]
            relation = determine_relation(entry_lane, exit_lane)
            relation_counts[relation] += 1
    
    # Calculate vehicles per hour
    hours = simulation_time_seconds / 3600
    relation_flow_rates = {relation: count / hours for relation, count in relation_counts.items()}
    
    # Convert to passenger car equivalent per hour (E/h)
    # Assuming all vehicles are of type 'type1' with PCE=1
    relation_pce_rates = relation_flow_rates.copy()
    
    return relation_counts, relation_flow_rates, relation_pce_rates

def main(fcd_file_path, simulation_time_seconds):
    # Process the FCD data
    vehicle_entry_lanes, vehicle_exit_lanes, vehicle_paths = process_fcd_data(fcd_file_path)
    
    # Calculate flow rates
    relation_counts, relation_flow_rates, relation_pce_rates = calculate_flow_rates(
        vehicle_entry_lanes, vehicle_exit_lanes, simulation_time_seconds)
    
    # Create a DataFrame for the results
    data = []
    for relation, count in relation_counts.items():
        data.append({
            'Relacja': relation,
            'Liczba samochodów': count,
            'P/h': relation_flow_rates[relation],
            'E/h': relation_pce_rates[relation]
        })
    
    df = pd.DataFrame(data)
    
    # Sort by relation
    df = df.sort_values('Relacja')
    
    # Display the results
    print("Traffic Flow Analysis Results:")
    print(df.to_string(index=False))
    
    # Save the results to a CSV file
    df.to_csv('traffic_flow_analysis.csv', index=False)
    print("\nResults saved to traffic_flow_analysis.csv")

if __name__ == "__main__":
    # Path to the FCD XML file
    fcd_file_path = "fcd_rano.xml"
    
    # Total simulation time in seconds
    # Note: This should be adjusted based on the actual simulation duration
    simulation_time_seconds = 3600  # Assuming a 1-hour simulation
    
    main(fcd_file_path, simulation_time_seconds)

Traffic Flow Analysis Results:
Relacja  Liczba samochodów   P/h   E/h
     AL                370 370.0 370.0
     AP                 15  15.0  15.0
     AW                400 400.0 400.0
     BL                 32  32.0  32.0
     BP                193 193.0 193.0
     BW                 13  13.0  13.0
     CL                 29  29.0  29.0
     CP                 37  37.0  37.0
     CW                457 457.0 457.0
     DL                 14  14.0  14.0
     DP                 15  15.0  15.0
     DW                 20  20.0  20.0
Unknown                 18  18.0  18.0

Results saved to traffic_flow_analysis.csv
