In [None]:
import pandas as pd
import geopandas as gpd
from shapely.wkt import loads
from enum import Enum
import os
from pathlib import Path

# Interactive Issues Investigation Notebook

This notebook helps you **visually investigate evaluation results** on an interactive map.

**Prerequisites:** Run `eval.py` first to generate issues files.

**What this notebook does:**
1. Loads pre-computed issues files from `evaluation_results/`
2. Filters issues by OSM tag (name, oneway, lanes, etc.)
3. Visualizes issues on an interactive map with color coding
4. Allows drilling down into specific issue types (FP, FN, Mismatch)
5. Supports comparing multiple models

**Note:** This notebook does NOT re-run evaluation - it uses the results from `eval.py`.

---

## Constants

In [None]:
MAP_FEATURES = ['name',
                'oneway',
                'turn:lanes',
                'turn:lanes:forward',
                'turn:lanes:backward',
                'lanes',
                'lanes:forward',
                'lanes:backward',
                'maxspeed',
                'maxspeed:forward',
                'maxspeed:backward']

class MapFeatures(Enum):
    STREET_NAME = "name"
    ONEWAY = "oneway"
    TURN_LANES = "turn:lanes"
    TURN_LANES_FWD = "turn:lanes:forward"
    TURN_LANES_BWD = "turn:lanes:backward"
    LANES = "lanes"
    LANES_FWD = "lanes:forward"
    LANES_BWD = "lanes:backward"
    MAXSPEED = "maxspeed"
    MAXSPEED_FWD = "maxspeed:forward"
    MAXSPEED_BWD = "maxspeed:backward"

## utils 

In [None]:
def load_ways_geometry(file_path):
    """Load ways CSV with geometry data"""
    ways_df = pd.read_csv(file_path)
    ways_df['geometry'] = ways_df['geometry'].apply(loads)
    return ways_df[['osmid', 'geometry']]

def load_issues_file(issues_csv_path):
    """Load issues CSV file generated by eval.py"""
    if not os.path.exists(issues_csv_path):
        raise FileNotFoundError(f"Issues file not found: {issues_csv_path}")
    
    issues_df = pd.read_csv(issues_csv_path)
    print(f"Loaded {len(issues_df)} issues from {issues_csv_path}")
    
    # Display summary statistics
    print("\nIssue Summary:")
    print(issues_df.groupby(['tag', 'issue_type']).size().unstack(fill_value=0))
    
    return issues_df

def filter_issues_by_tag(issues_df, tag):
    """Filter issues for a specific OSM tag"""
    filtered = issues_df[issues_df['tag'] == tag].copy()
    
    if len(filtered) == 0:
        print(f"No issues found for tag: {tag}")
        return None
    
    # Calculate metrics for this tag
    tp_count = len(filtered[filtered['issue_type'] == 'TP'])
    fp_count = len(filtered[filtered['issue_type'].isin(['FP', 'Mismatch'])])
    fn_count = len(filtered[filtered['issue_type'].isin(['FN', 'Mismatch'])])
    
    precision = tp_count / (tp_count + fp_count) if (tp_count + fp_count) > 0 else 0
    recall = tp_count / (tp_count + fn_count) if (tp_count + fn_count) > 0 else 0
    f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"\nMetrics for '{tag}':")
    print(f"  TP: {tp_count}, FP: {fp_count}, FN: {fn_count}")
    print(f"  Precision: {precision:.2%}")
    print(f"  Recall: {recall:.2%}")
    print(f"  F1 Score: {f1:.2%}")
    
    return filtered

def create_issue_geodataframe(issues_df, ways_geometry_df):
    """Merge issues with geometry to create a GeoDataFrame"""
    # Merge issues with geometry
    gdf = issues_df.merge(ways_geometry_df, on='osmid', how='left')
    
    # Remove rows without geometry (if any)
    gdf = gdf[gdf['geometry'].notna()]
    
    # Create GeoDataFrame
    gdf = gpd.GeoDataFrame(gdf, geometry='geometry', crs='EPSG:4326')
    
    # Add color mapping
    color_map = {
        'TP': 'green',
        'FP': 'red', 
        'FN': 'orange',
        'Mismatch': 'purple'
    }
    gdf['color'] = gdf['issue_type'].map(color_map)
    
    # Create informative popup text
    gdf['popup_text'] = gdf.apply(lambda row: 
        f"<b>OSMID:</b> {row['osmid']}<br>"
        f"<b>Tag:</b> {row['tag']}<br>"
        f"<b>Issue Type:</b> {row['issue_type']}<br>"
        f"<b>Ground Truth:</b> {row['ground_truth']}<br>"
        f"<b>Prediction:</b> {row['prediction']}", 
        axis=1
    )
    
    return gdf

def plot_issues_map(gdf, title=None):
    """Plot issues on an interactive map"""
    if len(gdf) == 0:
        print("No data to plot")
        return None
    
    m = gdf.explore(
        tiles="cartodb positron",
        color=gdf['color'],
        tooltip='popup_text',
        popup=True,
        style_kwds={'weight': 7, 'opacity': 0.7}
    )
    
    if title:
        print(f"\n{title}")
    return m

def filter_by_issue_type(gdf, issue_types):
    """Filter GeoDataFrame by specific issue types
    
    Args:
        gdf: GeoDataFrame with issues
        issue_types: str or list of str ('TP', 'FP', 'FN', 'Mismatch')
    
    Returns:
        Filtered GeoDataFrame
    """
    if isinstance(issue_types, str):
        issue_types = [issue_types]
    
    filtered = gdf[gdf['issue_type'].isin(issue_types)].copy()
    print(f"Filtered to {len(filtered)} {'/'.join(issue_types)} issues")
    return filtered

## Step 1: Load the ways geometry data

In [None]:
# Load ways geometry (only needed once)
ways_geometry = load_ways_geometry('../metadata/ways.csv')
print(f"Loaded geometry for {len(ways_geometry)} ways")

## Step 2: Load the issues file (generated by eval.py)

In [None]:
# Load issues file from evaluation_results directory
# You can change this to any issues file generated by eval.py
issues_file = '../evaluation_results/issues_claude_3.5sonnet.csv'

issues_df = load_issues_file(issues_file)

## Step 3: Filter issues by a specific OSM tag

In [None]:
# Choose which tag to investigate
# Options: 'name', 'oneway', 'lanes', 'lanes:forward', 'lanes:backward', 
#          'turn:lanes', 'turn:lanes:forward', 'turn:lanes:backward',
#          'maxspeed', 'maxspeed:forward', 'maxspeed:backward'

tag_to_investigate = 'name'  # Change this to investigate different tags

filtered_issues = filter_issues_by_tag(issues_df, tag_to_investigate)

## Step 4: Create GeoDataFrame with geometry

In [None]:
if filtered_issues is not None:
    issues_gdf = create_issue_geodataframe(filtered_issues, ways_geometry)
    print(f"\nCreated GeoDataFrame with {len(issues_gdf)} issues")

## Step 5: Visualize on Map

**Color Legend:**
- ðŸŸ¢ **Green** = True Positive (TP) - Correct prediction
- ðŸ”´ **Red** = False Positive (FP) - Predicted when shouldn't have
- ðŸŸ  **Orange** = False Negative (FN) - Missed prediction
- ðŸŸ£ **Purple** = Mismatch - Both exist but wrong value (counts as both FP and FN)

# Plot all issues for the selected tag
if filtered_issues is not None:
    plot_issues_map(issues_gdf, title=f"All Issues for '{tag_to_investigate}'")

In [None]:
# Example 1: Show only False Negatives (missed predictions)
if filtered_issues is not None:
    fn_only = filter_by_issue_type(issues_gdf, 'FN')
    m = plot_issues_map(fn_only, title=f"False Negatives for '{tag_to_investigate}'")
    m


In [None]:
display(m)

In [None]:
# Example 2: Show only False Positives (extra/wrong predictions)
if filtered_issues is not None:
    fp_only = filter_by_issue_type(issues_gdf, 'FP')
    m = plot_issues_map(fp_only, title=f"False Positives for '{tag_to_investigate}'")
    display(m)


In [None]:
# Example 3: Show only Mismatches (wrong values)
if filtered_issues is not None:
    mismatch_only = filter_by_issue_type(issues_gdf, 'Mismatch')
    m = plot_issues_map(mismatch_only, title=f"Mismatches for '{tag_to_investigate}'")
    display(m)


In [None]:
# Example 4: Show all errors (exclude True Positives)
if filtered_issues is not None:
    errors_only = filter_by_issue_type(issues_gdf, ['FP', 'FN', 'Mismatch'])
    plot_issues_map(errors_only, title=f"All Errors for '{tag_to_investigate}'")


## (Optional) Inspect Specific OSMIDs

You can look up details for specific ways:


In [None]:
# Look up all issues for a specific OSMID
osmid_to_check = 1329887723  # Change this to any osmid you want to investigate

osmid_issues = issues_df[issues_df['osmid'] == osmid_to_check]
if len(osmid_issues) > 0:
    print(f"\nIssues for OSMID {osmid_to_check}:")
    print(osmid_issues[['tag', 'issue_type', 'ground_truth', 'prediction']].to_string(index=False))
else:
    print(f"No issues found for OSMID {osmid_to_check}")


## (Optional) Compare Multiple Models

You can load and compare issues from different model evaluations:


In [None]:
# Load and compare metrics from different models
import glob

# Find all issues files
issues_files = glob.glob('../evaluation_results/issues_*.csv')
print(f"Found {len(issues_files)} issues files:\n")

# Compare metrics for a specific tag across all models
comparison_tag = 'name'
comparison_results = []

for file in issues_files:
    model_name = Path(file).stem.replace('issues_', '')
    df = pd.read_csv(file)
    tag_issues = df[df['tag'] == comparison_tag]
    
    if len(tag_issues) > 0:
        tp = len(tag_issues[tag_issues['issue_type'] == 'TP'])
        fp = len(tag_issues[tag_issues['issue_type'].isin(['FP', 'Mismatch'])])
        fn = len(tag_issues[tag_issues['issue_type'].isin(['FN', 'Mismatch'])])
        
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        comparison_results.append({
            'model': model_name,
            'tag': comparison_tag,
            'precision': f"{precision:.2%}",
            'recall': f"{recall:.2%}",
            'f1': f"{f1:.2%}"
        })

if comparison_results:
    comparison_df = pd.DataFrame(comparison_results)
    print(f"\nComparison for '{comparison_tag}' tag across models:")
    print(comparison_df.to_string(index=False))
