# 07. Interactive Gold Set Curator (Full Schema)
**Semantic-Drive Evaluation Tool**

**Objective:** Manually verify ~100 frames to create the Ground Truth.
**Features:** Tabbed UI, Full WOD-E2E Schema support, Persistence.

**Workflow:**
1. Loads `consensus_final.jsonl` (Model Outputs).
2. Loads `gold_annotations_master.json` (Your Work) if it exists.
3. **Tabbed Interface:** Verify ODD, Topology, Agents, and Causal logic separately.
4. Saves to disk immediately on every confirmation.

In [1]:
import sys
import os
import json
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output
from PIL import Image

# Add project root to path
sys.path.append(os.path.abspath('..'))

from src.data.loader import NuScenesLoader

# Initialize Loader
print("üöÄ Loading NuScenes Database...")
loader = NuScenesLoader(dataroot="../nuscenes_data", version="v1.0-trainval")
print("‚úÖ NuScenes Ready.")

üöÄ Loading NuScenes Database...
Loading NuScenes v1.0-trainval database from ../nuscenes_data...
‚úÖ NuScenes Ready.


In [2]:
# --- 1. SCHEMA DEFINITIONS (Strict Enums) ---
# These match your src/model/prompts.py exactly

SCHEMA_OPTIONS = {
    # A. ODD
    "weather": ["clear", "overcast", "rain", "heavy_rain", "snow", "fog"],
    "time_of_day": ["day", "night", "dawn_dusk"],
    "lighting_condition": ["nominal", "glare_high", "shadow_contrast", "pitch_black", "streetlights_only"],
    "road_surface_friction": ["dry", "wet", "icy", "snowy", "muddy", "gravel"],
    "sensor_integrity": ["nominal", "lens_flare", "droplets_on_lens", "dirt_on_lens", "motion_blur", "sun_glare"],

    # B. Topology
    "scene_type": ["urban_street", "highway", "intersection", "highway_ramp", "parking_lot", "construction_zone", "rural_road"],
    "lane_configuration": ["straight", "curve", "merge_left", "merge_right", "roundabout", "intersection_4way", "intersection_t_junction"],
    "drivable_area_status": ["nominal", "restricted_by_static_obstacle", "blocked_by_dynamic_object"],
    "traffic_controls": ["green_light", "red_light", "yellow_light", "stop_sign", "yield_sign", "police_manual", "none"],

    # C. Actors
    "vru_status": ["none", "legal_crossing", "jaywalking_fast", "jaywalking_hesitant", "roadside_static", "cyclist_in_lane"],
    "lead_vehicle_behavior": ["none", "nominal", "braking_suddenly", "stalled", "turning"],
    "adjacent_vehicle_behavior": ["none", "nominal", "cutting_in_aggressive", "drifting", "tailgating"],
    "special_agent_class": ["none", "police_car", "ambulance", "fire_truck", "school_bus", "construction_machinery"],

    # D. Causal
    "primary_challenge": ["none", "occlusion_risk", "prediction_uncertainty", "violation_of_map_topology", "perception_degradation", "rule_violation"],
    "ego_required_action": ["lane_keep", "slow_down", "stop", "nudge_around_static_obstacle", "yield", "emergency_brake", "lane_change", "unprotected_turn"],
    "blocking_factor": ["none", "construction_barrier", "pedestrian", "vehicle", "debris", "flood"],

    # E. Tags (Multi-Select)
    "wod_e2e_tags": ["construction", "intersection_complex", "vru_hazard", "fod_debris", "weather_adverse", "special_vehicle", "lane_diversion", "sensor_failure"]
}

# Search Filters
FILTER_OPTIONS = [
    "All Candidates",
    "High Risk (Score > 7)",
    "Construction Zones",
    "Adverse Weather",
    "VRU Hazards",
    "Special Vehicles",
    "Nominal/Clear"
]

In [4]:
import os
import pandas as pd
import json

# --- 2. DATA LOADING & PERSISTENCE ---
SOURCE_FILE = "../output/consensus_final.jsonl"
MASTER_FILE = "../output/gold_annotations_master.json"

GOLD_SET = {}

# Load Existing Work
if os.path.exists(MASTER_FILE):
    with open(MASTER_FILE, 'r') as f:
        GOLD_SET = json.load(f)
    print(f"üìÇ Resumed session: {len(GOLD_SET)} verified frames loaded.")
else:
    print("‚ú® Starting fresh session.")

# Load Candidates
def load_candidates():
    data = []
    if not os.path.exists(SOURCE_FILE):
        print(f"‚ö†Ô∏è File not found: {SOURCE_FILE}")
        return pd.DataFrame()
        
    with open(SOURCE_FILE, 'r') as f:
        for line in f:
            try:
                item = json.loads(line)
                # Flatten for searching
                # Convert whole object to string for easy keyword search
                search_blob = json.dumps(item).lower()
                
                data.append({
                    "token": item['token'],
                    "search_blob": search_blob,
                    "risk_score": int(item.get('scenario_criticality', {}).get('risk_score', 0)),
                    "full_json": item
                })
            except: pass
    return pd.DataFrame(data)

df = load_candidates()
print(f"‚úÖ Loaded {len(df)} candidate frames from Judge.")

üìÇ Resumed session: 5 verified frames loaded.
‚úÖ Loaded 11 candidate frames from Judge.


In [None]:
import ipywidgets as widgets

# --- 3. GUI WIDGET SETUP ---
style = {'description_width': '140px'}
layout_full = widgets.Layout(width='95%')

# --- A. ODD Tab ---
w_weather = widgets.Dropdown(options=SCHEMA_OPTIONS['weather'], description="Weather:", style=style, layout=layout_full)
w_time = widgets.Dropdown(options=SCHEMA_OPTIONS['time_of_day'], description="Time:", style=style, layout=layout_full)
w_light = widgets.Dropdown(options=SCHEMA_OPTIONS['lighting_condition'], description="Lighting:", style=style, layout=layout_full)
w_friction = widgets.Dropdown(options=SCHEMA_OPTIONS['road_surface_friction'], description="Surface:", style=style, layout=layout_full)
w_sensor = widgets.Dropdown(options=SCHEMA_OPTIONS['sensor_integrity'], description="Sensor Integrity:", style=style, layout=layout_full)
tab_odd = widgets.VBox([w_weather, w_time, w_light, w_friction, w_sensor])

# --- B. Topology Tab ---
w_scene = widgets.Dropdown(options=SCHEMA_OPTIONS['scene_type'], description="Scene Type:", style=style, layout=layout_full)
w_lane = widgets.Dropdown(options=SCHEMA_OPTIONS['lane_configuration'], description="Lane Config:", style=style, layout=layout_full)
w_drivable = widgets.Dropdown(options=SCHEMA_OPTIONS['drivable_area_status'], description="Drivable Status:", style=style, layout=layout_full)
w_traffic = widgets.SelectMultiple(options=SCHEMA_OPTIONS['traffic_controls'], description="Traffic Controls:", style=style, layout=layout_full)
tab_topo = widgets.VBox([w_scene, w_lane, w_drivable, w_traffic])

# --- C. Agents Tab ---
w_vru = widgets.Dropdown(options=SCHEMA_OPTIONS['vru_status'], description="VRU Status:", style=style, layout=layout_full)
w_lead = widgets.Dropdown(options=SCHEMA_OPTIONS['lead_vehicle_behavior'], description="Lead Veh:", style=style, layout=layout_full)
w_adj = widgets.Dropdown(options=SCHEMA_OPTIONS['adjacent_vehicle_behavior'], description="Adj Veh:", style=style, layout=layout_full)
w_special = widgets.Dropdown(options=SCHEMA_OPTIONS['special_agent_class'], description="Special Agent:", style=style, layout=layout_full)
tab_agents = widgets.VBox([w_vru, w_lead, w_adj, w_special])

# --- D. Causal Tab ---
w_challenge = widgets.Dropdown(options=SCHEMA_OPTIONS['primary_challenge'], description="Primary Challenge:", style=style, layout=layout_full)
w_action = widgets.Dropdown(options=SCHEMA_OPTIONS['ego_required_action'], description="Ego Action:", style=style, layout=layout_full)
w_blocker = widgets.Dropdown(options=SCHEMA_OPTIONS['blocking_factor'], description="Blocking Factor:", style=style, layout=layout_full)
w_risk = widgets.IntSlider(min=0, max=10, description="Risk Score:", style=style, layout=layout_full)
tab_causal = widgets.VBox([w_challenge, w_action, w_blocker, w_risk])

# --- E. Tags Tab ---
w_tags = widgets.SelectMultiple(options=SCHEMA_OPTIONS['wod_e2e_tags'], description="WOD Tags:", style=style, layout=widgets.Layout(width='95%', height='140px'))
tab_tags = widgets.VBox([w_tags])

# --- MASTER TAB WIDGET ---
tabs = widgets.Tab(children=[tab_odd, tab_topo, tab_agents, tab_causal, tab_tags])
tabs.set_title(0, 'A. ODD')
tabs.set_title(1, 'B. Topology')
tabs.set_title(2, 'C. Agents')
tabs.set_title(3, 'D. Causal')
tabs.set_title(4, 'E. Tags')

# --- Navigation & Info ---
dropdown_filter = widgets.Dropdown(options=FILTER_OPTIONS, description='Filter:', style={'description_width': 'initial'})
btn_load = widgets.Button(description="Fetch", icon="search", layout=widgets.Layout(width='100px'))
btn_prev = widgets.Button(description="< Prev", icon="arrow-left")
btn_next = widgets.Button(description="Next >", icon="arrow-right")
btn_save = widgets.Button(description="‚úÖ CONFIRM & SAVE", button_style='success', layout=widgets.Layout(width='98%'))

lbl_status = widgets.Label("")
lbl_token = widgets.Text(description="Token:", disabled=True)
lbl_desc = widgets.Textarea(description="Judge Desc:", disabled=True, layout=widgets.Layout(width='98%', height='60px'))
out_img = widgets.Output()
out_msg = widgets.Output()

In [6]:
# --- 4. LOGIC CONTROLLER ---
current_subset = []
current_idx = 0

def create_front_panorama(token, resize_h=300):
    try: paths = loader.get_camera_paths(token)
    except: return None
    order = ["CAM_FRONT_LEFT", "CAM_FRONT", "CAM_FRONT_RIGHT"]
    images = []
    for cam in order:
        if cam in paths and os.path.exists(paths[cam]):
            img = Image.open(paths[cam])
            aspect = img.width / img.height
            new_w = int(resize_h * aspect)
            img = img.resize((new_w, resize_h), Image.Resampling.LANCZOS)
            images.append(img)
    if not images: return None
    total_w = sum(img.width for img in images)
    pano = Image.new('RGB', (total_w, resize_h))
    x_offset = 0
    for img in images: pano.paste(img, (x_offset, 0)); x_offset += img.width
    return pano

def load_subset(_):
    global current_subset, current_idx
    cat = dropdown_filter.value
    
    if cat == "All Candidates":
        current_subset = df.to_dict('records')
    elif cat == "High Risk (Score > 7)":
        current_subset = df[df['risk_score'] >= 7].to_dict('records')
    elif cat == "Construction Zones":
        current_subset = df[df['search_blob'].str.contains("construction|cone|drum|barrier")].to_dict('records')
    elif cat == "Adverse Weather":
        current_subset = df[df['search_blob'].str.contains("rain|wet|glare|fog|night")].to_dict('records')
    elif cat == "VRU Hazards":
        current_subset = df[df['search_blob'].str.contains("vru|pedestrian|cyclist|jaywalking")].to_dict('records')
    elif cat == "Special Vehicles":
        current_subset = df[df['search_blob'].str.contains("police|ambulance|bus|truck")].to_dict('records')
    elif cat == "Nominal/Clear":
        current_subset = df[(df['risk_score'] < 3)].to_dict('records')

    current_idx = 0
    render_frame()

def render_frame():
    if not current_subset:
        with out_img: clear_output(); print("No candidates found.")
        return

    item = current_subset[current_idx]
    token = item['token']
    lbl_token.value = token
    lbl_desc.value = item['full_json'].get('description', 'No desc')

    # --- STATUS CHECK & PRE-FILL ---
    # 1. Start with Model Prediction
    j = item['full_json']
    data = {
        'odd': j.get('odd_attributes', {}),
        'topo': j.get('road_topology', {}),
        'agents': j.get('key_interacting_agents', {}),
        'causal': j.get('scenario_criticality', {}),
        'tags': tuple(j.get('wod_e2e_tags', []))
    }

    # 2. Overwrite if Verified
    if token in GOLD_SET:
        lbl_status.value = "‚úÖ ALREADY VERIFIED (Loading your edits)"
        lbl_status.style = dict(text_color='green', font_weight='bold')
        saved = GOLD_SET[token]
        # Restore saved state
        data['odd'] = saved.get('odd_attributes', data['odd'])
        data['topo'] = saved.get('road_topology', data['topo'])
        data['agents'] = saved.get('key_interacting_agents', data['agents'])
        data['causal'] = saved.get('scenario_criticality', data['causal'])
        data['tags'] = tuple(saved.get('wod_e2e_tags', []))
    else:
        lbl_status.value = "‚ö†Ô∏è PENDING REVIEW (Showing Model Prediction)"
        lbl_status.style = dict(text_color='orange')

    # --- POPULATE WIDGETS SAFELY ---
    # Using .get() with defaults avoids crashes if keys missing
    try:
        # A. ODD
        w_weather.value = data['odd'].get('weather', 'clear')
        w_time.value = data['odd'].get('time_of_day', 'day')
        w_light.value = data['odd'].get('lighting_condition', 'nominal')
        w_friction.value = data['odd'].get('road_surface_friction', 'dry')
        w_sensor.value = data['odd'].get('sensor_integrity', 'nominal')

        # B. Topology
        w_scene.value = data['topo'].get('scene_type', 'urban_street')
        w_lane.value = data['topo'].get('lane_configuration', 'straight')
        w_drivable.value = data['topo'].get('drivable_area_status', 'nominal')
        # Handle List for traffic controls
        controls = data['topo'].get('traffic_controls', [])
        w_traffic.value = tuple(controls) if isinstance(controls, list) else ()

        # C. Agents
        w_vru.value = data['agents'].get('vru_status', 'none')
        w_lead.value = data['agents'].get('lead_vehicle_behavior', 'none')
        w_adj.value = data['agents'].get('adjacent_vehicle_behavior', 'none')
        w_special.value = data['agents'].get('special_agent_class', 'none')

        # D. Causal
        w_challenge.value = data['causal'].get('primary_challenge', 'none')
        w_action.value = data['causal'].get('ego_required_action', 'lane_keep')
        w_blocker.value = data['causal'].get('blocking_factor', 'none')
        w_risk.value = int(data['causal'].get('risk_score', 0))

        # E. Tags
        w_tags.value = data['tags']

    except Exception as e:
        with out_msg: print(f"Widget Population Error: {e}")

    # Show Image
    with out_img:
        clear_output(wait=True)
        pano = create_front_panorama(token)
        if pano: display(pano)

def save_entry(_):
    token = lbl_token.value
    # Construct the Gold Record
    record = {
        "token": token,
        "odd_attributes": {
            "weather": w_weather.value, "time_of_day": w_time.value, 
            "lighting_condition": w_light.value, "road_surface_friction": w_friction.value,
            "sensor_integrity": w_sensor.value
        },
        "road_topology": {
            "scene_type": w_scene.value, "lane_configuration": w_lane.value,
            "drivable_area_status": w_drivable.value, "traffic_controls": list(w_traffic.value)
        },
        "key_interacting_agents": {
            "vru_status": w_vru.value, "lead_vehicle_behavior": w_lead.value,
            "adjacent_vehicle_behavior": w_adj.value, "special_agent_class": w_special.value
        },
        "scenario_criticality": {
            "primary_challenge": w_challenge.value, "ego_required_action": w_action.value,
            "blocking_factor": w_blocker.value, "risk_score": w_risk.value
        },
        "wod_e2e_tags": list(w_tags.value)
    }

    # Persist to Memory & Disk
    GOLD_SET[token] = record
    with open(MASTER_FILE, 'w') as f:
        json.dump(GOLD_SET, f, indent=2)

    with out_msg: print(f"üíæ Saved {token[:6]} | Verified Total: {len(GOLD_SET)}")
    next_frame(None)

def next_frame(_):
    global current_idx
    if current_idx < len(current_subset) - 1:
        current_idx += 1
        render_frame()

def prev_frame(_):
    global current_idx
    if current_idx > 0:
        current_idx -= 1
        render_frame()

# --- 5. BINDING & LAYOUT ---
btn_load.on_click(load_subset)
btn_next.on_click(next_frame)
btn_prev.on_click(prev_frame)
btn_save.on_click(save_entry)

nav_bar = widgets.HBox([dropdown_filter, btn_load, btn_prev, btn_next])
status_bar = widgets.VBox([lbl_token, lbl_status])

left_col = widgets.VBox([widgets.HTML("<h4>üñºÔ∏è Visuals</h4>"), out_img, nav_bar, status_bar])
right_col = widgets.VBox([widgets.HTML("<h4>üìù Annotation Schema</h4>"), tabs, btn_save, out_msg])

ui = widgets.HBox([left_col, right_col], layout=widgets.Layout(border='solid 1px #ccc', padding='10px'))
display(ui)

HBox(children=(VBox(children=(HTML(value='<h4>üñºÔ∏è Visuals</h4>'), Output(), HBox(children=(Dropdown(description‚Ä¶