## Config

In [None]:
import os

# Force set environment variables (this is essential for Jupyter notebooks)
os.environ['NUPLAN_MAPS_ROOT'] = '/mnt/hdd8/navsim_dataset/maps'
os.environ['NUPLAN_MAP_VERSION'] = 'nuplan-maps-v1.0'
os.environ['OPENSCENE_DATA_ROOT'] = '/mnt/hdd8/navsim_dataset'

print("\nAfter setting:")
print(f"NUPLAN_MAPS_ROOT: {os.getenv('NUPLAN_MAPS_ROOT')}")
print(f"NUPLAN_MAP_VERSION: {os.getenv('NUPLAN_MAP_VERSION')}")
print(f"OPENSCENE_DATA_ROOT: {os.getenv('OPENSCENE_DATA_ROOT')}")

In [None]:
from pathlib import Path

import hydra
from hydra.utils import instantiate

from navsim.common.dataloader import SceneLoader
from navsim.common.dataclasses import SceneFilter, SensorConfig

SPLIT = "mini"  # ["mini", "test", "trainval"]
FILTER = "all_scenes"

hydra.initialize(
    config_path="../navsim/planning/script/config/common/train_test_split/scene_filter"
)
cfg = hydra.compose(config_name=FILTER)
scene_filter: SceneFilter = instantiate(cfg)
openscene_data_root = Path(os.getenv("OPENSCENE_DATA_ROOT"))

scene_loader = SceneLoader(
    openscene_data_root / f"navsim_logs/{SPLIT}", # data_path
    openscene_data_root / f"sensor_blobs/{SPLIT}", # original_sensor_path
    scene_filter,
    openscene_data_root / "warmup_two_stage/sensor_blobs", # synthetic_sensor_path
    openscene_data_root / "warmup_two_stage/synthetic_scene_pickles", # synthetic_scenes_path
    sensor_config=SensorConfig.build_all_sensors(),
)

## Print PDM with single token

In [None]:
# Select token from metric cache
# You can modify the path to any cache

import numpy as np
from pathlib import Path
from navsim.common.dataloader import MetricCacheLoader

workspace_cache = Path("/mnt/hdd5/Qiaoceng/navsim_workspace/exp/metric_cache")
cache_loader = MetricCacheLoader(workspace_cache)
available_tokens = cache_loader.tokens
# print(f"🎯 Available tokens # in cache: {len(available_tokens)}")

token = np.random.choice(available_tokens)
scene = scene_loader.get_scene_from_token(token)
print(f"🎯 Selected token from cache: {token}")

metric_cache = cache_loader.get_from_token(token)
print("✅ Successfully loaded scene and metric cache!")

In [None]:
# Plot BEV with agent

from navsim.agents.constant_velocity_agent import ConstantVelocityAgent
from navsim.agents.transfuser.transfuser_agent import TransfuserAgent
from navsim.agents.transfuser.transfuser_config import TransfuserConfig
from navsim.visualization.plots import plot_bev_with_agent
import matplotlib.pyplot as plt

# agent = ConstantVelocityAgent()
config = TransfuserConfig()
config.checkpoint_path = "../model/LatentTransfuser_mini/0830.ckpt"
agent = TransfuserAgent(config, scene)
fig, ax = plot_bev_with_agent(scene, agent)
plt.title(f"Scene token: {token}")
plt.show()

In [None]:
# Compute PDM score

from navsim.evaluate.pdm_score import pdm_score
from navsim.planning.simulation.planner.pdm_planner.simulation.pdm_simulator import PDMSimulator
from navsim.planning.simulation.planner.pdm_planner.scoring.pdm_scorer import PDMScorer, PDMScorerConfig
from navsim.traffic_agents_policies.navsim_IDM_traffic_agents import NavsimIDMTrafficAgents
from navsim.planning.simulation.observation.navsim_idm_agents import NavsimIDMAgents
from nuplan.planning.simulation.trajectory.trajectory_sampling import TrajectorySampling

print("🚀 Setting up PDM score components...")
agent_input = scene.get_agent_input()
if agent.requires_scene:
    trajectory = agent.compute_trajectory(agent_input, scene)
else:
    trajectory = agent.compute_trajectory(agent_input)

proposal_sampling = TrajectorySampling(num_poses=40, interval_length=0.1)
simulator = PDMSimulator(proposal_sampling=proposal_sampling)

scorer_config = PDMScorerConfig(
    progress_weight=5.0,
    ttc_weight=5.0,
    lane_keeping_weight=2.0,
    history_comfort_weight=2.0,
    two_frame_extended_comfort_weight=2.0,
    driving_direction_horizon=1.0,
    driving_direction_compliance_threshold=2.0,
    driving_direction_violation_threshold=6.0,
    stopped_speed_threshold=5e-03,
    future_collision_horizon_window=1.0,
    progress_distance_threshold=5.0,
    lane_keeping_deviation_limit=0.5,
    lane_keeping_horizon_window=2.0,
    human_penalty_filter=True
)
scorer = PDMScorer(proposal_sampling=proposal_sampling, config=scorer_config)

idm_agents = NavsimIDMAgents(
    target_velocity=10,
    min_gap_to_lead_agent=1.0,
    headway_time=1.5,
    accel_max=1.0,
    decel_max=2.0,
    open_loop_detections_types=[],
    minimum_path_length=20,
    planned_trajectory_samples=None,
    planned_trajectory_sample_interval=None,
    radius=100,
    add_open_loop_parked_vehicles=True,
    idm_snap_threshold=3.0
)
traffic_agents_policy = NavsimIDMTrafficAgents(
    idm_agents_observation=idm_agents,
    future_trajectory_sampling=simulator.proposal_sampling
)

print("📊 Computing PDM score...")
score_result = pdm_score(
    metric_cache=metric_cache,
    model_trajectory=trajectory,
    future_sampling=simulator.proposal_sampling,
    simulator=simulator,
    scorer=scorer,
    traffic_agents_policy=traffic_agents_policy
)

print("✅ PDM score computation done!")

In [None]:
# Add pdm score on the BEV plot

fig, ax = plot_bev_with_agent(scene, agent)
plt.subplots_adjust(top = 0.8)  # Adjust top to make space for the text
plt.title(f"Scene token: {token}")

if isinstance(score_result, tuple) and len(score_result) == 2:
    score_df, _ = score_result
plt.figtext(0.5, 0.855,
           f"PDM Score: {score_df['pdm_score'].iloc[0]:.3f}, Ego Progress: {score_df['ego_progress'].iloc[0]:.3f}", 
           ha='center', fontsize=8)

plt.show()

In [None]:
# 新增一個 cell 來計算完整的 Stage 2 PDM -> 還未確認code
from navsim.planning.script.run_pdm_score import compute_final_scores
import pandas as pd

# 先獲取 Stage 1 的結果
stage1_df = score_df.copy()

# 手動計算 two_frame_extended_comfort
# 比較 human trajectory 和 model trajectory
try:
    from navsim.planning.simulation.planner.pdm_planner.scoring.pdm_comfort_metrics import ego_is_two_frame_extended_comfort
    
    human_traj = metric_cache.human_trajectory
    model_traj = trajectory
    
    if human_traj is not None:
        print(f"🔍 檢查軌跡格式:")
        print(f"   Human trajectory shape: {human_traj.poses.shape}")
        print(f"   Model trajectory shape: {model_traj.poses.shape}")
        
        # 確保軌跡長度一致
        min_len = min(len(human_traj.poses), len(model_traj.poses))
        
        # 檢查軌跡是否有足夠的維度 (需要至少包含 x, y, heading, vx, vy)
        if human_traj.poses.shape[1] >= 3 and model_traj.poses.shape[1] >= 3:
            # 如果只有 x, y, heading，需要計算速度
            if human_traj.poses.shape[1] == 3:
                print("⚠️ 軌跡只包含位置和朝向，嘗試計算速度...")
                
                # 簡化方法：使用 ConstantVelocityAgent 作為基準比較
                cv_agent = ConstantVelocityAgent()
                cv_agent_input = scene.get_agent_input()
                cv_trajectory = cv_agent.compute_trajectory(cv_agent_input)
                
                # 比較 model trajectory 和 constant velocity trajectory
                model_poses = model_traj.poses[:min_len]
                cv_poses = cv_trajectory.poses[:min_len]
                
                print(f"   改用 CV agent 比較，軌跡長度: {min_len}")
                
                # 計算位置差異作為 extended comfort 的近似
                position_diff = np.sqrt(np.sum((model_poses[:, :2] - cv_poses[:, :2])**2, axis=1))
                mean_diff = np.mean(position_diff)
                
                # 將差異轉換為 comfort score (差異越小分數越高)
                extended_comfort_score = np.exp(-mean_diff)  # 簡化的評分方法
                
                print(f"   平均位置差異: {mean_diff:.3f} m")
                print(f"   Extended Comfort 近似分數: {extended_comfort_score:.4f}")
                
            else:
                print("✅ 軌跡包含足夠的狀態資訊")
                # 準備完整的 states 格式
                human_states = human_traj.poses[:min_len].reshape(1, min_len, -1)
                model_states = model_traj.poses[:min_len].reshape(1, min_len, -1)
                time_points = np.arange(min_len) * 0.1
                
                # 計算 two-frame extended comfort
                extended_comfort_result = ego_is_two_frame_extended_comfort(
                    human_states, model_states, time_points
                )
                extended_comfort_score = float(extended_comfort_result[0])
        else:
            print("❌ 軌跡格式不正確，使用預設值")
            extended_comfort_score = 0.0
        
        # 添加到 DataFrame
        stage1_df['two_frame_extended_comfort'] = extended_comfort_score
        
        # 計算最終 Stage 2 分數
        stage2_df = compute_final_scores(stage1_df)
        
        print("🎯 Stage 2 PDM Score (包含 Extended Comfort):")
        print(f"   Final PDM Score: {stage2_df['pdm_score'].iloc[0]:.4f}")
        print(f"   Two-Frame Extended Comfort: {stage2_df['two_frame_extended_comfort'].iloc[0]:.4f}")
        
    else:
        print("❌ 無法計算 Stage 2: 缺少 human trajectory")
        
except Exception as e:
    print(f"❌ Stage 2 計算失敗: {e}")

## Check Agent Load

In [None]:
# 在現有代碼的最後添加：
try:
    if 'agent' in globals():
        print(f"🤖 Agent 檢查:")
        print(f"   Agent 類型: {type(agent).__name__}")
        if hasattr(agent, '_transfuser_model'):
            model_loaded = agent._transfuser_model is not None
            print(f"   模型已載入: {model_loaded}")
            if hasattr(agent, 'config') and hasattr(agent.config, 'checkpoint_path'):
                print(f"   模型路徑: {agent.config.checkpoint_path}")
        else:
            print(f"   Agent 沒有 _transfuser_model 屬性")
    else:
        print("⚠️ Agent 尚未創建")
except Exception as e:
    print(f"❌ Agent 檢查失敗: {e}")

In [None]:
# 檢查模型文件
model_path = Path("../model/LatentTransfuser_mini/0830.ckpt")
print(f"📁 模型文件檢查:")
print(f"   路徑: {model_path}")
print(f"   存在: {model_path.exists()}")
if model_path.exists():
    size_mb = model_path.stat().st_size / (1024*1024)
    print(f"   大小: {size_mb:.1f} MB")

In [None]:
import torch
try:
    model_data = torch.load("../model/LatentTransfuser_mini/0830.ckpt", map_location='cpu')
    print("✅ 模型文件可以被 PyTorch 載入")
    print(f"   模型keys: {list(model_data.keys())}")
except Exception as e:
    print(f"❌ 模型文件載入失敗: {e}")

In [None]:
# 調試 TransfuserAgent 模型載入
try:
    # 重新創建一個 agent，但捕獲所有輸出
    print("🔧 調試 TransfuserAgent 載入過程...")
    
    test_config = TransfuserConfig()
    test_config.checkpoint_path = "../model/LatentTransfuser_mini/0830.ckpt"
    
    # 檢查 TransfuserAgent 的 __init__ 方法
    print("📝 開始創建 TransfuserAgent...")
    test_agent = TransfuserAgent(test_config, scene)
    
    # 檢查 agent 的所有屬性
    print(f"🔍 Agent 屬性檢查:")
    for attr in dir(test_agent):
        if not attr.startswith('_'):
            continue
        if 'model' in attr.lower():
            value = getattr(test_agent, attr, None)
            print(f"   {attr}: {type(value)} = {value}")
    
    # 嘗試手動載入到 agent
    print(f"🛠️ 嘗試手動設置模型...")
    model_data = torch.load("../model/LatentTransfuser_mini/0830.ckpt", map_location='cpu')
    print(f"   State dict keys: {len(model_data['state_dict'])} 個參數")
    
except Exception as e:
    print(f"❌ 調試失敗: {e}")
    import traceback
    traceback.print_exc()