In [None]:
import os
import sys
import json
import joblib

import numpy as np
import pandas as pd
from tqdm import tqdm
import geopandas as gpd
import matplotlib.pyplot as plt

import torch

# Add the 'scripts' directory to Python Path
scripts_path=os.path.abspath(os.path.join(os.getcwd(), '..'))
if scripts_path not in sys.path:
    sys.path.append(scripts_path)

import evaluation.help_functions as hf
import evaluation.plot_functions as pf

from gnn.help_functions import compute_spearman_pearson
from gnn.models.point_net_transf_gat import PointNetTransfGAT

In [None]:
# Get the absolute path to the project root
project_root = os.path.abspath(os.path.join(os.getcwd(), '..', '..'))

# Paths, adjust as needed
run_path = os.path.join(project_root, "data", "runs_01_2025", "wannabe_best_6")
districts = gpd.read_file(os.path.join(project_root, "data", "visualisation", "districts_paris.geojson"))
base_case_path = os.path.join(project_root, "data", "links_and_stats", "pop_1pct_basecase_average_output_links.geojson")
result_path = 'results/'


# GNN Parameters
point_net_conv_layer_structure_local_mlp="256"
point_net_conv_layer_structure_global_mlp = "512"
gat_conv_layer_structure = "128,256,512"
dropout = 0.3
use_dropout = False
predict_mode_stats = False
in_channels = 5
out_channels = 1

links_base_case = gpd.read_file(base_case_path, crs="EPSG:4326")
data_created_during_training = os.path.join(run_path, 'data_created_during_training')

In [None]:
###########################################
### Load test data from the run itself! ###
###########################################

# Load scalers
scaler_x = joblib.load(os.path.join(data_created_during_training, 'test_x_scaler.pkl'))
scaler_pos = joblib.load(os.path.join(data_created_during_training, 'test_pos_scaler.pkl'))

# Load the test dataset created during training
test_set_dl = torch.load(os.path.join(data_created_during_training, 'test_dl.pt'))

# Load the DataLoader parameters
with open(os.path.join(data_created_during_training, 'test_loader_params.json'), 'r') as f:
    test_set_dl_loader_params = json.load(f)
    
# Remove or correct collate_fn if it is incorrectly specified
if 'collate_fn' in test_set_dl_loader_params and isinstance(test_set_dl_loader_params['collate_fn'], str):
    del test_set_dl_loader_params['collate_fn']  # Remove it to use the default collate function
    
test_set_loader = torch.utils.data.DataLoader(test_set_dl, **test_set_dl_loader_params)

In [None]:
point_net_conv_layer_structure_local_mlp = [int(x) for x in point_net_conv_layer_structure_local_mlp.split(',')]
point_net_conv_layer_structure_global_mlp = [int(x) for x in point_net_conv_layer_structure_global_mlp.split(',')]
gat_conv_layer_structure = [int(x) for x in gat_conv_layer_structure.split(',')]

model = PointNetTransfGAT(in_channels=in_channels, out_channels=out_channels,
              point_net_conv_layer_structure_local_mlp=point_net_conv_layer_structure_local_mlp, 
              point_net_conv_layer_structure_global_mlp = point_net_conv_layer_structure_global_mlp,
              gat_conv_layer_structure=gat_conv_layer_structure,
              dropout=dropout,
              use_dropout=use_dropout,
              predict_mode_stats=predict_mode_stats)

# Load the model state dictionary
model_path = os.path.join(run_path, 'trained_model/model.pth')
model.load_state_dict(torch.load(model_path), strict=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

loss_fct = torch.nn.MSELoss().to(dtype=torch.float32).to(device)

In [None]:
# Create gdfs for the entire test set
gdfs = []

for i in tqdm(range(len(test_set_loader.dataset))):
    my_test_data = test_set_loader.dataset[i]
    my_test_x = test_set_loader.dataset[i].x
    my_test_x = my_test_x.to('cpu')
    
    test_loss_my_test_data, r_squared_my_test_data, actual_vals_my_test_data, predictions_my_test_data, baseline_loss_my_test_data = hf.validate_model_on_test_set(model, my_test_data, loss_fct, device)
    inversed_x = scaler_x.inverse_transform(my_test_x)
    
    gdf = hf.data_to_geodataframe_with_og_values(data=my_test_data, original_gdf=links_base_case, predicted_values=predictions_my_test_data, inversed_x=inversed_x)
    gdf = gpd.sjoin(gdf, districts, how='left', op='intersects')
    gdf = gdf.rename(columns={"c_ar": "district"})
    gdf['capacity_reduction_rounded'] = gdf['capacity_reduction'].round(decimals=3)
    gdf['highway'] = gdf['highway'].map(hf.highway_mapping)
    
    gdfs.append(gdf)

### District Analysis
Do all districts appear with the same frequency in the test set? Investigated below.

In [None]:
# Analyze district-level patterns
def analyze_district_selection_bias(gdf_inputs):
    """
    Analyze if districts with higher volumes/variances are selected more often.
    
    Parameters:
    -----------
    gdf_inputs : list of GeoDataFrames
        Each GDF should contain:
        - district information
        - vol_base_case (base volume)
        - capacity_reduction (to identify selected districts)
    
    Returns:
    --------
    dict with district statistics and selection patterns
    """
    # Get first GDF as reference for base case
    base_gdf = gdf_inputs[0].copy()
    
    # Calculate district-level statistics
    district_stats = {}
    
    # For each district
    for district in base_gdf['district'].unique():
        district_mask = base_gdf['district'] == district
        district_data = base_gdf[district_mask]
        
        # Calculate base case statistics
        mean_volume = district_data['vol_base_case'].mean()
        total_volume = district_data['vol_base_case'].sum()
        volume_variance = district_data['vol_base_case'].var()
        n_roads = len(district_data)
        
        # Count selections across scenarios
        selections = 0
        for gdf in gdf_inputs:
            district_data = gdf[gdf['district'] == district]
            # Count how often this district has capacity reductions
            has_reduction = (district_data['capacity_reduction'] < 0).any()
            if has_reduction:
                selections += 1
        
        district_stats[district] = {
            'mean_volume': mean_volume,
            'total_volume': total_volume,
            'volume_variance': volume_variance,
            'n_roads': n_roads,
            'selection_frequency': selections,
            'selection_probability': selections / len(gdf_inputs)
        }
    
    # Convert to DataFrame for easier analysis
    df_stats = pd.DataFrame.from_dict(district_stats, orient='index')
    
    # Calculate correlations
    volume_selection_corr = df_stats['total_volume'].corr(df_stats['selection_frequency'])
    variance_selection_corr = df_stats['volume_variance'].corr(df_stats['selection_frequency'])
    
    # Sort districts by selection frequency
    df_stats_sorted = df_stats.sort_values('selection_frequency', ascending=False)
    
    return {
        'district_stats': df_stats_sorted,
        'correlations': {
            'volume_selection': volume_selection_corr,
            'variance_selection': variance_selection_corr
        }
    }

# Run the analysis
analysis = analyze_district_selection_bias(gdfs)

# Print results
print("\nDistrict Statistics (sorted by selection frequency):")
print(analysis['district_stats'])

print("\nCorrelations:")
print(f"Volume vs Selection Frequency: {analysis['correlations']['volume_selection']:.3f}")
print(f"Variance vs Selection Frequency: {analysis['correlations']['variance_selection']:.3f}")

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Volume vs Selection Frequency
ax1.scatter(analysis['district_stats']['total_volume'], 
           analysis['district_stats']['selection_frequency'])
ax1.set_xlabel('Total Base Case Volume')
ax1.set_ylabel('Selection Frequency')
ax1.set_title('Volume vs Selection Frequency')

# Variance vs Selection Frequency
ax2.scatter(analysis['district_stats']['volume_variance'], 
           analysis['district_stats']['selection_frequency'])
ax2.set_xlabel('Base Case Volume Variance')
ax2.set_ylabel('Selection Frequency')
ax2.set_title('Variance vs Selection Frequency')

plt.tight_layout()
plt.show()

In [None]:
# Analyze district-level patterns focusing on volumes
def analyze_district_volume_bias(gdf_inputs):
    """
    Analyze if districts with higher volumes are selected more often.
    
    Parameters:
    -----------
    gdf_inputs : list of GeoDataFrames
        Each GDF should contain:
        - district information
        - vol_base_case (base volume)
        - capacity_reduction (to identify selected districts)
    
    Returns:
    --------
    dict with district statistics and selection patterns
    """
    # Get first GDF as reference for base case
    base_gdf = gdf_inputs[0].copy()
    
    # Calculate district-level statistics
    district_stats = {}
    
    # For each district
    for district in base_gdf['district'].unique():
        district_mask = base_gdf['district'] == district
        district_data = base_gdf[district_mask]
        
        # Calculate base case volume statistics
        mean_volume = district_data['vol_base_case'].mean()
        total_volume = district_data['vol_base_case'].sum()
        n_roads = len(district_data)
        
        # Count selections across scenarios
        selections = 0
        for gdf in gdf_inputs:
            district_data = gdf[gdf['district'] == district]
            # Count how often this district has capacity reductions
            has_reduction = (district_data['capacity_reduction'] < 0).any()
            if has_reduction:
                selections += 1
        
        district_stats[district] = {
            'mean_volume': mean_volume,
            'total_volume': total_volume,
            'n_roads': n_roads,
            'selection_frequency': selections,
            'selection_probability': selections / len(gdf_inputs)
        }
    
    # Convert to DataFrame for easier analysis
    df_stats = pd.DataFrame.from_dict(district_stats, orient='index')
    
    # Calculate correlation
    volume_selection_corr = df_stats['total_volume'].corr(df_stats['selection_frequency'])
    
    # Sort districts by volume
    df_stats_sorted = df_stats.sort_values('total_volume', ascending=False)
    
    return {
        'district_stats': df_stats_sorted,
        'correlation': volume_selection_corr
    }

# Run the analysis
volume_analysis = analyze_district_volume_bias(gdfs)

# Print results
print("\nDistrict Statistics (sorted by total volume):")
print(volume_analysis['district_stats'])

print("\nCorrelation between Volume and Selection Frequency:", 
      f"{volume_analysis['correlation']:.3f}")

# Plot relationship
plt.figure(figsize=(10, 6))
plt.scatter(volume_analysis['district_stats']['total_volume'], 
           volume_analysis['district_stats']['selection_frequency'])
plt.xlabel('Total Base Case Volume')
plt.ylabel('Selection Frequency')
plt.title('District Volume vs Selection Frequency')

# Add district labels to points
for idx, row in volume_analysis['district_stats'].iterrows():
    plt.annotate(f"District {idx}", 
                (row['total_volume'], row['selection_frequency']))

plt.tight_layout()
plt.show()

In [None]:
# Analyze volume distributions within districts
def analyze_within_district_volumes(gdf_inputs):
    """
    Analyze how volumes are distributed within each district.
    """
    # Get first GDF as reference for base case
    base_gdf = gdf_inputs[0].copy()
    
    # Calculate district-level statistics
    district_stats = {}
    
    # For each district
    for district in base_gdf['district'].unique():
        district_mask = base_gdf['district'] == district
        district_data = base_gdf[district_mask]
        
        # Calculate volume statistics
        volume_stats = {
            'mean': district_data['vol_base_case'].mean(),
            'median': district_data['vol_base_case'].median(),
            'std': district_data['vol_base_case'].std(),
            'q25': district_data['vol_base_case'].quantile(0.25),
            'q75': district_data['vol_base_case'].quantile(0.75),
            'skew': district_data['vol_base_case'].skew(),
            'n_roads': len(district_data),
            'n_high_volume': len(district_data[district_data['vol_base_case'] > district_data['vol_base_case'].mean()]),
            'total_volume': district_data['vol_base_case'].sum()
        }
        
        # Count selections across scenarios
        selections = 0
        for gdf in gdf_inputs:
            district_data = gdf[gdf['district'] == district]
            has_reduction = (district_data['capacity_reduction'] < 0).any()
            if has_reduction:
                selections += 1
        
        volume_stats['selection_frequency'] = selections
        district_stats[district] = volume_stats
    
    # Convert to DataFrame
    df_stats = pd.DataFrame.from_dict(district_stats, orient='index')
    
    # Sort by total volume
    df_stats_sorted = df_stats.sort_values('total_volume', ascending=False)
    
    # Calculate proportion of high-volume roads
    df_stats_sorted['prop_high_volume'] = df_stats_sorted['n_high_volume'] / df_stats_sorted['n_roads']
    
    return df_stats_sorted

# Run the analysis
district_volume_analysis = analyze_within_district_volumes(gdfs)

print("\nDistrict Volume Distribution Analysis:")
print(district_volume_analysis)

# Create visualization of volume distributions
plt.figure(figsize=(15, 10))

# Plot 1: Distribution skewness vs selection frequency
plt.subplot(2, 2, 1)
plt.scatter(district_volume_analysis['skew'], 
           district_volume_analysis['selection_frequency'])
plt.xlabel('Volume Distribution Skewness')
plt.ylabel('Selection Frequency')
plt.title('Skewness vs Selection Frequency')

# Plot 2: Proportion of high-volume roads vs selection frequency
plt.subplot(2, 2, 2)
plt.scatter(district_volume_analysis['prop_high_volume'], 
           district_volume_analysis['selection_frequency'])
plt.xlabel('Proportion of High-Volume Roads')
plt.ylabel('Selection Frequency')
plt.title('High-Volume Road Proportion vs Selection Frequency')

# Plot 3: Box plot of volume distributions by selection frequency quartile
plt.subplot(2, 2, 3)
district_volume_analysis['selection_quartile'] = pd.qcut(district_volume_analysis['selection_frequency'], 
                                                       q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
selection_groups = district_volume_analysis.groupby('selection_quartile')
box_data = [group['mean'] for name, group in selection_groups]
plt.boxplot(box_data, labels=['Q1', 'Q2', 'Q3', 'Q4'])
plt.xlabel('Selection Frequency Quartile')
plt.ylabel('Mean Volume')
plt.title('Volume Distribution by Selection Frequency')

plt.tight_layout()
plt.show()

# Print summary statistics
print("\nSummary of findings:")
print("\n1. Volume distribution characteristics:")
print(f"Mean skewness across districts: {district_volume_analysis['skew'].mean():.2f}")
print(f"Mean proportion of high-volume roads: {district_volume_analysis['prop_high_volume'].mean():.2f}")

print("\n2. Correlations with selection frequency:")
print(f"Skewness correlation: {district_volume_analysis['skew'].corr(district_volume_analysis['selection_frequency']):.3f}")
print(f"High-volume proportion correlation: {district_volume_analysis['prop_high_volume'].corr(district_volume_analysis['selection_frequency']):.3f}")

# Group districts by selection frequency quartile and analyze volume patterns
quartile_stats = district_volume_analysis.groupby('selection_quartile').agg({
    'mean': 'mean',
    'std': 'mean',
    'skew': 'mean',
    'prop_high_volume': 'mean'
}).round(2)

print("\n3. Statistics by selection frequency quartile:")
print(quartile_stats)

In [None]:
def analyze_district_volumes(gdf_inputs):
    """
    Analyze volume distribution across districts with robust nan handling.
    
    Parameters:
    -----------
    gdf_inputs : list of GeoDataFrames
        Each GDF should contain:
        - district information
        - vol_base_case (base volume)
        - capacity_reduction (to identify selected districts)
    
    Returns:
    --------
    dict with district statistics
    """
    if not gdf_inputs or len(gdf_inputs) == 0:
        raise ValueError("Empty input list provided")

    # Get first GDF as reference
    base_gdf = gdf_inputs[0].copy()
    
    # Validate required columns
    required_columns = ['district', 'vol_base_case', 'capacity_reduction']
    if not all(col in base_gdf.columns for col in required_columns):
        raise ValueError(f"Missing required columns. Expected: {required_columns}")
    
    # Calculate district statistics
    district_stats = {}
    
    # For each district
    for district in base_gdf['district'].unique():
        if pd.isna(district):  # Skip if district is nan
            continue
            
        district_mask = base_gdf['district'] == district
        district_data = base_gdf[district_mask]
        
        # Calculate mean volume with nan handling
        volumes = district_data['vol_base_case'].dropna()
        if len(volumes) == 0:
            mean_volume = np.nan
        else:
            mean_volume = volumes.mean()
        
        # Count edges (excluding nan values)
        n_edges = len(district_data.dropna(subset=['vol_base_case']))
        
        # Count selections across all scenarios
        selections = 0
        for gdf in gdf_inputs:
            if 'district' not in gdf.columns or 'capacity_reduction' not in gdf.columns:
                continue
                
            district_data = gdf[gdf['district'] == district]
            # Count how often this district has capacity reductions, handling nans
            capacity_reductions = district_data['capacity_reduction'].dropna()
            if len(capacity_reductions) > 0 and (capacity_reductions < 0).any():
                selections += 1
        
        district_stats[district] = {
            'mean_volume': mean_volume,
            'n_edges': n_edges,
            'selection_frequency': selections
        }
    
    # Calculate overall statistics, excluding nan values
    valid_volumes = [stats['mean_volume'] for stats in district_stats.values() 
                    if not np.isnan(stats['mean_volume'])]
    
    if not valid_volumes:
        return {
            'district_stats': district_stats,
            'mean_volume': np.nan,
            'std_volume': np.nan,
            'threshold': np.nan,
            'high_volume_districts': {}
        }
    
    mean_volume = np.mean(valid_volumes)
    std_volume = np.std(valid_volumes)
    threshold = mean_volume + std_volume
    
    # Identify high-volume districts, excluding nan values
    high_volume_districts = {
        d: stats for d, stats in district_stats.items() 
        if not np.isnan(stats['mean_volume']) and stats['mean_volume'] > threshold
    }
    
    return {
        'district_stats': district_stats,
        'mean_volume': mean_volume,
        'std_volume': std_volume,
        'threshold': threshold,
        'high_volume_districts': high_volume_districts
    }

# Example usage:
stats = analyze_district_volumes(gdfs)
print(f"Mean volume across districts: {stats['mean_volume']:.2f}")
print(f"Standard deviation: {stats['std_volume']:.2f}")
print(f"Threshold for high-volume: {stats['threshold']:.2f}")
print("\nHigh-volume districts:")
for district, data in stats['high_volume_districts'].items():
    print(f"District {district}:")
    print(f"  Mean volume: {data['mean_volume']:.2f}")
    print(f"  Number of edges: {data['n_edges']}")
    print(f"  Selection frequency: {data['selection_frequency']}")

In [None]:
# Calculate correlation between volume and variance
volume_variance_corr = analysis['district_stats']['total_volume'].corr(analysis['district_stats']['volume_variance'])
print("\nCorrelation between Volume and Variance:", f"{volume_variance_corr:.3f}")

# Create scatter plot of volume vs variance
plt.figure(figsize=(10, 6))
plt.scatter(analysis['district_stats']['total_volume'], 
           analysis['district_stats']['volume_variance'])
plt.xlabel('Total Base Case Volume')
plt.ylabel('Volume Variance')
plt.title('District Volume vs Variance')

# Add district labels to points
for idx, row in analysis['district_stats'].iterrows():
    plt.annotate(f"District {idx}", 
                (row['total_volume'], row['volume_variance']))

plt.tight_layout()
plt.show()

### Analysis by Road Type

In [None]:
def analyze_volumes_by_capacity_reduction(gdf):
    """
    Compare volumes between roads with and without capacity reduction.
    
    Parameters:
    -----------
    gdf : GeoDataFrame
        Should contain:
        - vol_base_case (base volume)
        - capacity_reduction
    
    Returns:
    --------
    dict with comparison statistics
    """
    # Create masks for roads with and without capacity reduction
    has_reduction = gdf['capacity_reduction'] < 0
    no_reduction = gdf['capacity_reduction'] >= 0
    
    # Calculate statistics for each group
    reduced_stats = {
        'mean_volume': gdf[has_reduction]['vol_base_case'].mean(),
        'std_volume': gdf[has_reduction]['vol_base_case'].std(),
        'count': has_reduction.sum()
    }
    
    normal_stats = {
        'mean_volume': gdf[no_reduction]['vol_base_case'].mean(),
        'std_volume': gdf[no_reduction]['vol_base_case'].std(),
        'count': no_reduction.sum()
    }
    
    return {
        'with_reduction': reduced_stats,
        'without_reduction': normal_stats
    }

# Example usage:
comparison = analyze_volumes_by_capacity_reduction(gdfs[0])
print("\nComparison of roads with and without capacity reduction:")
print("\nRoads WITH capacity reduction:")
print(f"  Mean volume: {comparison['with_reduction']['mean_volume']:.2f}")
print(f"  Std volume: {comparison['with_reduction']['std_volume']:.2f}")
print(f"  Count: {comparison['with_reduction']['count']}")
print("\nRoads WITHOUT capacity reduction:")
print(f"  Mean volume: {comparison['without_reduction']['mean_volume']:.2f}")
print(f"  Std volume: {comparison['without_reduction']['std_volume']:.2f}")
print(f"  Count: {comparison['without_reduction']['count']}")

In [None]:
def perform_statistical_analysis(gdf):
    """
    Perform comprehensive statistical analysis comparing roads with and without capacity reduction.
    
    Parameters:
    -----------
    gdf : GeoDataFrame
        Should contain:
        - vol_base_case (base volume)
        - capacity_reduction
    
    Returns:
    --------
    dict with detailed statistical analysis
    """
    import scipy.stats as stats
    import numpy as np
    
    # Create masks for roads with and without capacity reduction
    has_reduction = gdf['capacity_reduction'] < 0
    no_reduction = gdf['capacity_reduction'] >= 0
    
    # Get volumes for each group
    volumes_reduced = gdf[has_reduction]['vol_base_case'].dropna()
    volumes_normal = gdf[no_reduction]['vol_base_case'].dropna()
    
    # Basic statistics
    stats_reduced = {
        'count': len(volumes_reduced),
        'mean': volumes_reduced.mean(),
        'median': volumes_reduced.median(),
        'std': volumes_reduced.std(),
        'q25': volumes_reduced.quantile(0.25),
        'q75': volumes_reduced.quantile(0.75),
        'skew': stats.skew(volumes_reduced),
        'kurtosis': stats.kurtosis(volumes_reduced)
    }
    
    stats_normal = {
        'count': len(volumes_normal),
        'mean': volumes_normal.mean(),
        'median': volumes_normal.median(),
        'std': volumes_normal.std(),
        'q25': volumes_normal.quantile(0.25),
        'q75': volumes_normal.quantile(0.75),
        'skew': stats.skew(volumes_normal),
        'kurtosis': stats.kurtosis(volumes_normal)
    }
    
    # Perform Mann-Whitney U test (non-parametric test for different distributions)
    mw_stat, mw_pval = stats.mannwhitneyu(volumes_reduced, volumes_normal, alternative='two-sided')
    
    # Calculate effect size (Cohen's d)
    pooled_std = np.sqrt(((stats_reduced['count'] - 1) * stats_reduced['std']**2 + 
                         (stats_normal['count'] - 1) * stats_normal['std']**2) / 
                        (stats_reduced['count'] + stats_normal['count'] - 2))
    cohens_d = (stats_reduced['mean'] - stats_normal['mean']) / pooled_std
    
    return {
        'with_reduction': stats_reduced,
        'without_reduction': stats_normal,
        'mann_whitney': {
            'statistic': mw_stat,
            'p_value': mw_pval
        },
        'cohens_d': cohens_d
    }

# Run the analysis
analysis = perform_statistical_analysis(gdfs[0])

# Print results
print("\nDetailed Statistical Analysis:")
print("\nRoads WITH capacity reduction:")
print(f"  Count: {analysis['with_reduction']['count']}")
print(f"  Mean: {analysis['with_reduction']['mean']:.2f}")
print(f"  Median: {analysis['with_reduction']['median']:.2f}")
print(f"  Std Dev: {analysis['with_reduction']['std']:.2f}")
print(f"  Q25-Q75: [{analysis['with_reduction']['q25']:.2f} - {analysis['with_reduction']['q75']:.2f}]")
print(f"  Skewness: {analysis['with_reduction']['skew']:.2f}")

print("\nRoads WITHOUT capacity reduction:")
print(f"  Count: {analysis['without_reduction']['count']}")
print(f"  Mean: {analysis['without_reduction']['mean']:.2f}")
print(f"  Median: {analysis['without_reduction']['median']:.2f}")
print(f"  Std Dev: {analysis['without_reduction']['std']:.2f}")
print(f"  Q25-Q75: [{analysis['without_reduction']['q25']:.2f} - {analysis['without_reduction']['q75']:.2f}]")
print(f"  Skewness: {analysis['without_reduction']['skew']:.2f}")

print("\nStatistical Tests:")
print(f"  Mann-Whitney U p-value: {analysis['mann_whitney']['p_value']:.10f}")
print(f"  Effect size (Cohen's d): {analysis['cohens_d']:.2f}")

# Print interpretation
print("\nInterpretation:")
print("1. Statistical Significance:")
if analysis['mann_whitney']['p_value'] < 0.05:
    print("  - The difference in volumes is statistically significant (p < 0.05)")
else:
    print("  - The difference in volumes is not statistically significant (p >= 0.05)")

print("\n2. Effect Size Interpretation:")
d = abs(analysis['cohens_d'])
if d < 0.2:
    effect = "negligible"
elif d < 0.5:
    effect = "small"
elif d < 0.8:
    effect = "medium"
else:
    effect = "large"
print(f"  - Cohen's d = {d:.2f} indicates a {effect} effect size")

In [None]:
def analyze_variances_by_road_type_and_reduction(gdfs):
    # Initialize dictionaries for each road type
    road_types = {
        1: "Primary",
        2: "Secondary",
        3: "Tertiary"
    }
    
    stats = {road_type: {
        'with_reduction': {'actual_changes': [], 'predictions': [], 'errors': []},
        'without_reduction': {'actual_changes': [], 'predictions': [], 'errors': []}
    } for road_type in road_types.keys()}
    
    # Collect values from all GDFs
    for gdf in gdfs:
        for road_type in road_types.keys():
            # Filter for specific road type
            road_type_mask = gdf['highway'] == road_type
            
            # Roads with capacity reduction
            reduced = (gdf['capacity_reduction_rounded'] < -1e-3) & road_type_mask
            stats[road_type]['with_reduction']['actual_changes'].extend(gdf.loc[reduced, 'vol_car_change_actual'])
            stats[road_type]['with_reduction']['predictions'].extend(gdf.loc[reduced, 'vol_car_change_predicted'])
            stats[road_type]['with_reduction']['errors'].extend(
                gdf.loc[reduced, 'vol_car_change_predicted'] - gdf.loc[reduced, 'vol_car_change_actual']
            )
            
            # Roads without capacity reduction
            not_reduced = (gdf['capacity_reduction_rounded'] >= -1e-3) & road_type_mask
            stats[road_type]['without_reduction']['actual_changes'].extend(gdf.loc[not_reduced, 'vol_car_change_actual'])
            stats[road_type]['without_reduction']['predictions'].extend(gdf.loc[not_reduced, 'vol_car_change_predicted'])
            stats[road_type]['without_reduction']['errors'].extend(
                gdf.loc[not_reduced, 'vol_car_change_predicted'] - gdf.loc[not_reduced, 'vol_car_change_actual']
            )
    
    # Calculate and print statistics for each road type
    for road_type in road_types.keys():
        print(f"\n{road_types[road_type]} Roads:")
        print("-" * 50)
        
        for reduction_type in ['with_reduction', 'without_reduction']:
            # Convert to numpy arrays
            actual = np.array(stats[road_type][reduction_type]['actual_changes'])
            errors = np.array(stats[road_type][reduction_type]['errors'])
            
            if len(actual) > 0:  # Only print if we have data
                print(f"\n{reduction_type.replace('_', ' ').title()}:")
                print(f"Number of observations: {len(actual)}")
                print(f"Variance of actual changes: {np.var(actual):.2f}")
                print(f"Mean absolute error: {np.mean(np.abs(errors)):.2f}")
                print(f"MSE: {np.mean(errors**2):.2f}")
                print(f"Mean actual change: {np.mean(actual):.2f}")
                print(f"Std of actual changes: {np.std(actual):.2f}")
                print(f"R² score: {1 - np.mean(errors**2)/np.var(actual):.3f}")

analyze_variances_by_road_type_and_reduction(gdfs)

In [None]:
def validate_model_with_interpretable_error(indices, gdfs):
    """
    Validate model performance across all test set observations for specific road types.
    
    Args:
        indices: Indices of roads to consider for each GDF
        gdfs: List of GeoDataFrames, each representing one test set observation
    """
    loss_fct_l1 = torch.nn.L1Loss()
    loss_fct_l2 = torch.nn.MSELoss()
    
    # Initialize lists to store values across all observations
    all_actual_vals = []
    all_predicted_vals = []
    mean_car_vols = []
    variances_base_case = []
    variances = []
    std_devs = []
    std_dev_multiplied = []
    cv_percents = []
    
    # Collect values from all GDFs
    i = 0
    for gdf in gdfs:
        indices = hf.get_road_type_indices(gdf)[road_type]
        
        if len(indices) > 0:  # Only process if we have roads of this type
            i += 1
            actual_vals = gdf.loc[indices, 'vol_car_change_actual']
            predicted_vals = gdf.loc[indices, 'vol_car_change_predicted']
            
            all_actual_vals.extend(actual_vals.to_numpy())
            all_predicted_vals.extend(predicted_vals.to_numpy())
            
            # Collect statistics
            mean_car_vols.append(gdf.loc[indices, 'vol_base_case'].mean())
            
            car_volumes = actual_vals.to_numpy()
            car_volume_variance = np.var(car_volumes)
            
            variances.append(car_volume_variance)
            
            variances_base_case.append(gdf.loc[indices, 'variance_base_case'].mean())
            std_devs.append(gdf.loc[indices, 'std_dev'].mean())
            std_dev_multiplied.append(gdf.loc[indices, 'std_dev_multiplied'].mean())
            cv_percents.append(gdf.loc[indices, 'cv_percent'].mean())
    
    # Convert to numpy arrays
    all_actual_vals = np.array(all_actual_vals)
    all_predicted_vals = np.array(all_predicted_vals)
    
    actual_mean = torch.mean(torch.tensor(all_actual_vals))
    
    # Calculate metrics
    spearman_corr, pearson_corr = compute_spearman_pearson(all_predicted_vals, all_actual_vals, is_np=True)    
    r_squared = hf.compute_r2_torch(preds=torch.tensor(all_predicted_vals), targets=torch.tensor(all_actual_vals))
    
    l1_loss = loss_fct_l1(torch.tensor(all_actual_vals), torch.tensor(all_predicted_vals))
    l2_loss = loss_fct_l2(torch.tensor(all_actual_vals), torch.tensor(all_predicted_vals))
    
    l1_naive = loss_fct_l1(torch.tensor(all_actual_vals), torch.full_like(torch.tensor(all_actual_vals), actual_mean))    
    l2_naive = loss_fct_l2(torch.tensor(all_actual_vals), torch.full_like(torch.tensor(all_actual_vals), actual_mean))
    
    # Calculate averages of statistics
    mean_car_vol = np.mean(mean_car_vols)
    variance = np.mean(variances)
    variance_base_case = np.mean(variances_base_case)
    std_dev = np.mean(std_devs)
    std_dev_multiplied = np.mean(std_dev_multiplied)
    cv_percent = np.mean(cv_percents)
    
    print(" ")
    print(f"Road Type: {road_type}")
    print(f"Number of observations: {len(all_actual_vals)/416}")
    print(f"Mean Car Volume: {mean_car_vol}")
    print(f"R-squared: {round(r_squared.item(), 2)}")
    print(f"MSE Loss: {l2_loss}")
    print(f"Naive MSE Loss: {l2_naive}")
    print(f"Variance Base Case: {variance_base_case}")
    
    
    # print(f"Variance over car volumes: {car_volume_variance}")
    print(f"Variance: {variance}")
    print(f"L1 Loss: {l1_loss}")
    print(f"Naive L1 loss: {l1_naive}")
    print(f"Standard Deviation Multiplied: {std_dev_multiplied}")
    print(f"Spearman Correlation: {spearman_corr}")
    print(f"Pearson Correlation: {pearson_corr}")
    print(f"Standard Deviation: {std_dev}")
    print(f"Coefficient of Variation: {cv_percent}")
    print(" ")
    
    return {
        'road_type': road_type,
        'number_of_observations': len(all_actual_vals),
        'mean_car_vol': mean_car_vol,
        'r_squared': r_squared,
        'mse': l2_loss,
        'naive_mse': l2_naive,
        'l1': l1_loss,
        'naive_l1': l1_naive,
        'variance': variance_base_case,
        'std_dev': std_dev,
        'std_dev_normalized': std_dev_multiplied,
        'spearman': spearman_corr,
        'pearson': pearson_corr,
        'cv_percent': cv_percent
    }

road_types = list(hf.get_road_type_indices(gdfs[0]).keys())

# Then calculate metrics for each road type
metrics_by_type = {}
for road_type in road_types:
    metrics_by_type[road_type] = validate_model_with_interpretable_error(road_type, gdfs)

In [None]:
selected_metrics = [       
            {
                'id': 'spearman',
                'label': 'Spearman\nCorrelation',
                'transform': lambda x: max(0, x),
                'y_pos': -0.05
            },
            
            {
                'id': 'r_squared',
                'label': 'R²',
                'transform': lambda x: max(0, x),
                'y_pos': -0.05
            },
            
            {
                'id': 'l1_ratio',
                'label': 'Normalized MAE',
                'transform': lambda x, max_ratio: (1 - x/max_ratio),
                'y_pos': -0.1
            },
            
            {
                'id': 'pearson',
                'label': 'Pearson\nCorrelation',
                'transform': lambda x: max(0, x),
                'y_pos': -0.05
            },

            {
                'id': 'error_distribution',
                'label': 'Error\nDistribution',
                'transform': lambda x: max(0, (x - 68) / (100 - 68)),  # Normalize relative to normal distribution baseline
                'y_pos': -0.05
            }
        ]

# colors = {
#     'Trunk Roads': '#d73027',         # Red
#     'Primary Roads': '#fc8d59',       # Orange
#     'Secondary Roads': '#fee090',     # Yellow
#     'Tertiary Roads': '#e0f3f8',      # Light blue
#     'Residential Streets': '#91bfdb',  # Medium blue
#     'Living Streets': '#4575b4',      # Dark blue
#     'P/S/T Roads with Capacity Reduction': '#999999',    
#     'P/S/T Roads with No Capacity Reduction': '#666666',
# }

# Professional color palette with good contrast and accessibility
colors = {
    'Trunk Roads': '#1f77b4',         # Muted blue
    'Primary Roads': '#2ca02c',       # Muted green
    'Secondary Roads': '#ff7f0e',     # Muted orange
    'Tertiary Roads': '#9467bd',      # Muted purple
    'Residential Streets': '#8c564b',  # Brown
    'Living Streets': '#e377c2',      # Pink
    'P/S/T Roads with Capacity Reduction': '#7f7f7f',    # Gray
    'P/S/T Roads with No Capacity Reduction': '#bcbd22', # Olive
}

# Define selected road types
selected_types = [
    # 'All Roads',
    'Trunk Roads',
    'Primary Roads',
    'Secondary Roads',
    'Tertiary Roads',
    'Residential Streets',
    'Living Streets',
    'P/S/T Roads with Capacity Reduction',
    'P/S/T Roads with No Capacity Reduction'
    # 'Primary Roads with Capacity Reduction',
    # 'Primary Roads with No Capacity Reduction',
    # 'Secondary Roads with Capacity Reduction',
    # 'Secondary Roads with No Capacity Reduction',
    # 'Tertiary Roads with Capacity Reduction',
    # 'Tertiary Roads with No Capacity Reduction'
]

In [None]:
pf.create_correlation_radar_plot_sort_by_r2(metrics_by_type, selected_metrics, result_path=result_path, save_it=True, selected_types=selected_types, colors=colors)

In [None]:
pf.create_error_vs_variability_scatterplots(metrics_by_type, result_path=result_path, save_it=True, selected_types=selected_types, colors=colors)

In [None]:
# Add to your notebook to investigate the distribution of errors
def analyze_error_distribution(metrics_by_type):
    road_type = "All roads"
    gdf_data = gdfs[0]  # Get first GDF for "All roads"
    
    actual = gdf_data['vol_car_change_actual']
    predicted = gdf_data['vol_car_change_predicted']
    
    # Calculate errors
    errors = predicted - actual
    
    # Plot error distribution
    plt.figure(figsize=(10, 6))
    plt.hist(errors, bins=50)
    plt.title("Distribution of Prediction Errors for All Roads")
    plt.xlabel("Prediction Error")
    plt.ylabel("Frequency")
    plt.show()
    
    # Print statistics
    print(f"Error Statistics for {road_type}:")
    print(f"Mean error: {errors.mean():.2f}")
    print(f"Median error: {errors.median():.2f}")
    print(f"Standard deviation of errors: {errors.std():.2f}")
    print(f"Percentage of errors within ±1 std: {(abs(errors) <= errors.std()).mean()*100:.1f}%")
    
analyze_error_distribution(metrics_by_type)