In [None]:
import torch
from core.networks.embeddUnet import ConditionalUnet1D
from core.diffusion.diffusion import PlaneDiffusionPolicy, build_networks_from_config, build_noise_scheduler_from_config
from core.datasets import plane_dataset_embeed
from config.plane_test_embeed import PlaneTestEmbedConfig
import matplotlib.pyplot as plt
import numpy as np

import io
from pathlib import Path
from typing import Union
from utils.dataset_utils import sample_start_goal, random_rectangles, rectangles_to_grid, validate_path_collision_free, in_collision

import imageio.v3 as iio   

In [2]:

def create_test_scenario(bounds, cell_size, origin, rng, max_rectangles, device='cuda'):
    """Create a test scenario with a 3x2 rectangular obstacle in the center of an 8x8 grid"""
    nx, ny = (
    int((hi - lo) / cell_size) for lo, hi in bounds
)

    rects = random_rectangles(
        bounds=bounds,
        rng=rng,
        max_rectangles=max_rectangles,
    )
    
    grid = rectangles_to_grid(
        nx=nx,
        ny=ny,
        bounds=bounds,
        cell_size=cell_size,
        rects=rects,
    )
    
    collision = True
    while collision:
        start, goal = sample_start_goal(
            grid=grid,
            bounds=bounds,
            cell_size=cell_size,
            origin=origin,
            rng=rng,
        )
        
        collision = in_collision(goal, grid, cell_size, origin) or in_collision(start, grid, cell_size, origin)
    
    obstacle_map = torch.from_numpy(grid.astype(np.float32)).unsqueeze(0).unsqueeze(0).to(device)
    start_tensor = torch.from_numpy(start).unsqueeze(0).to(device)
    goal_tensor = torch.from_numpy(goal).unsqueeze(0).to(device)
    
    

    return start_tensor, goal_tensor, obstacle_map

def generate_path(policy: PlaneDiffusionPolicy, start, goal, obstacles):
    device = start.device
    obs_dim = start.shape[1]
    pred_horizon = policy.config["horizon"]

    # Fake observation sample (all zeros)
    fake_obs_sample = np.zeros((pred_horizon, obs_dim), dtype=np.float32)


    start_np = start.cpu().numpy()[0]
    goal_np = goal.cpu().numpy()[0]
    env_cond = np.concatenate([start_np, goal_np])  # [2 * obs_dim]


    map_cond = obstacles[0].cpu().numpy()  # shape: [1, 8, 8]


    obs_dict = {
        "sample": fake_obs_sample,
        "env": env_cond,
        "map": map_cond,
    }


    trajectory, trajectory_all = policy.predict_action(obs_dict)  # shape: [pred_horizon, obs_dim]

    return torch.tensor(trajectory, device=device).unsqueeze(0), trajectory_all  # [1, H, obs_dim]

import matplotlib.pyplot as plt
import numpy as np

def visualize_result(trajectory, start, goal, obstacles, save_path=None):
    """Visualize the generated trajectory with obstacle map overlay"""
    # Convert to numpy
    traj_np = trajectory[0].cpu().numpy()
    start_np = start[0].cpu().numpy()
    goal_np = goal[0].cpu().numpy()
    obs_np = obstacles[0, 0].cpu().numpy()  # shape: [8, 8]
    
    # Create figure
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
    
    # Plot 1: Trajectory with obstacles
    ax1.imshow(obs_np, origin='lower', cmap='gray_r', extent=[0, 8, 0, 8], alpha=0.5)  # transpose to match axis

    # ax1.plot(traj_np[:, 0], traj_np[:, 1], 'b-', linewidth=2, label='Generated Path')
    # ax1.plot(traj_np[:, 0], traj_np[:, 1], 'bo', markersize=3, alpha=0.6)
    ax1.plot(start_np[0], start_np[1], 'go', markersize=10, label='Start')
    ax1.plot(goal_np[0], goal_np[1], 'ro', markersize=10, label='Goal')
    ax1.set_xlim(0, 8)
    ax1.set_ylim(0, 8)
    ax1.set_title('Generated Trajectory')
    ax1.set_xticks(np.arange(0, 9))
    ax1.set_yticks(np.arange(0, 9))
    ax1.grid(True, alpha=0.3)
    ax1.legend()
    
    # Plot 2: Trajectory over time
    # ax2.plot(range(len(traj_np)), traj_np[:, 0], 'r-', label='X coordinate')
    # ax2.plot(range(len(traj_np)), traj_np[:, 1], 'b-', label='Y coordinate')
    ax2.set_xlabel('Time step')
    ax2.set_ylabel('Position')
    ax2.set_title('Trajectory Components')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    
    if save_path:
        plt.savefig(save_path, dpi=150, bbox_inches='tight')
        print(f"Plot saved to {save_path}")
    
    plt.show()


In [3]:
def visualize_trajectory_gif(
    action_history: Union[np.ndarray, torch.Tensor],
    start: Union[np.ndarray, torch.Tensor],
    goal: Union[np.ndarray, torch.Tensor],
    obstacles: Union[np.ndarray, torch.Tensor],
    save_path: str = "trajectory_evolution.gif",
    fps: int = 5,
):

    action_history = (
        action_history.detach().cpu().numpy()
        if isinstance(action_history, torch.Tensor)
        else np.asarray(action_history)
    )
    start = start.detach().cpu().numpy() if isinstance(start, torch.Tensor) else np.asarray(start)
    goal = goal.detach().cpu().numpy() if isinstance(goal, torch.Tensor) else np.asarray(goal)
    obstacles = (
        obstacles.detach().cpu().numpy() if isinstance(obstacles, torch.Tensor) else np.asarray(obstacles)
    )

    start = start.squeeze()
    goal = goal.squeeze()
    if obstacles.ndim == 4:                      # [B, C, H, W]
        obstacles = obstacles[0, 0]
    elif obstacles.ndim == 3:                    # [C, H, W]
        obstacles = obstacles[0]

    Path(save_path).parent.mkdir(parents=True, exist_ok=True)

    frames = []
    for step_idx, traj in enumerate(action_history):
        fig, ax = plt.subplots(figsize=(5, 5))

        ax.imshow(
            obstacles,
            cmap="gray_r",
            origin="lower",
            extent=[0, obstacles.shape[1], 0, obstacles.shape[0]],
            alpha=0.25,
        )


        ax.plot(traj[:, 0], traj[:, 1], "b-", linewidth=2, label="Path")
        ax.scatter(traj[:, 0], traj[:, 1], c="blue", s=12, alpha=0.7)


        ax.scatter(start[0], start[1], c="green", s=80, marker="o", label="Start")
        ax.scatter(goal[0], goal[1], c="red", s=80, marker="o", label="Goal")


        ax.set_xlim(0, obstacles.shape[1])
        ax.set_ylim(0, obstacles.shape[0])
        ax.set_title(f"Diffusion step {step_idx}/{len(action_history) - 1}")
        ax.grid(True, alpha=0.3)
        ax.legend(loc="upper right", fontsize="small")


        buf = io.BytesIO()
        plt.savefig(buf, format="png", dpi=120, bbox_inches="tight")
        plt.close(fig)
        buf.seek(0)
        frames.append(iio.imread(buf))  


    iio.imwrite(save_path, frames, duration=1 / fps)  
    print(f"GIF Save to: {Path(save_path).resolve()}")

In [4]:
def multiple_path_analysis(
    policy: PlaneDiffusionPolicy,
    num_paths: int = 5,
):
    trajectory_list = np.zeros((num_paths, policy.config["horizon"], start.shape[1]), dtype=np.float32)
    trajectory_all_list = np.zeros((num_paths, policy.config["horizon"], start.shape[1]), dtype=np.float32)
    start, goal, obstacles = np.zeros((1, 2), dtype=np.float32), np.zeros((1, 2), dtype=np.float32), np.zeros((1, 8, 8), dtype=np.float32)
    for idx in range(num_paths):
        start, goal, obstacles = create_test_scenario(
            bounds=((0, 0), (8, 8)),
            cell_size=1.0,
            origin=(0, 0),
            rng=np.random.default_rng(),
            max_rectangles=5,
        )
        trajectory, trajectory_all = generate_path(policy, start, goal, obstacles)
        trajectory_list[idx] = trajectory.cpu().numpy()[0]
        trajectory_all_list[idx] = trajectory_all.cpu().numpy()[0]
    return trajectory_list, trajectory_all_list
        


In [5]:
# Complete code for batch visualization using your dataset_utils logic

import torch
from core.networks.embeddUnet import ConditionalUnet1D
from core.diffusion.diffusion import PlaneDiffusionPolicy, build_networks_from_config, build_noise_scheduler_from_config
from core.datasets import plane_dataset_embeed
from config.plane_test_embeed import PlaneTestEmbedConfig
import matplotlib.pyplot as plt
import numpy as np

import io
from pathlib import Path
from typing import Union, List, Tuple
from utils.dataset_utils import sample_start_goal, random_rectangles, rectangles_to_grid, validate_path_collision_free, in_collision

import imageio.v3 as iio


def evaluate_path_success(trajectory, start, goal, obstacles, 
                         goal_threshold=0.5, cell_size=1.0):
    """评估路径是否成功，使用你的dataset_utils函数"""
    # 转换为numpy
    if isinstance(trajectory, torch.Tensor):
        traj_np = trajectory.cpu().numpy()
    else:
        traj_np = trajectory
    
    if isinstance(start, torch.Tensor):
        start_np = start.cpu().numpy()
    else:
        start_np = start
        
    if isinstance(goal, torch.Tensor):
        goal_np = goal.cpu().numpy()
    else:
        goal_np = goal
    
    if isinstance(obstacles, torch.Tensor):
        grid = obstacles.cpu().numpy()
    else:
        grid = obstacles
    
    # 处理维度
    if traj_np.ndim == 3:  # [1, H, 2]
        traj_np = traj_np[0]
    if start_np.ndim == 2:  # [1, 2]
        start_np = start_np[0]
    if goal_np.ndim == 2:  # [1, 2] 
        goal_np = goal_np[0]
    if grid.ndim == 4:  # [1, 1, H, W]
        grid = grid[0, 0]
    elif grid.ndim == 3:  # [1, H, W]
        grid = grid[0]
    
    # 设置参数以匹配dataset_utils
    bounds = np.array([[0.0, 8.0], [0.0, 8.0]])
    origin = bounds[:, 0]
    
    # 1. 检查是否到达目标
    final_pos = traj_np[-1]
    goal_distance = np.linalg.norm(final_pos - goal_np)
    reached_goal = goal_distance < goal_threshold
    
    # 2. 使用你的validate_path_collision_free函数检查路径是否无碰撞
    collision_free = validate_path_collision_free(traj_np, grid, cell_size, origin)
    
    # 3. 计算其他指标
    path_length = np.sum(np.linalg.norm(np.diff(traj_np, axis=0), axis=1))
    straight_line_distance = np.linalg.norm(goal_np - start_np)
    efficiency = straight_line_distance / path_length if path_length > 0 else 0
    
    success = reached_goal and collision_free
    
    return {
        'success': success,
        'reached_goal': reached_goal,
        'collision_free': collision_free,
        'goal_distance': goal_distance,
        'path_length': path_length,
        'efficiency': efficiency,
        'straight_line_distance': straight_line_distance
    }

def visualize_batch_scenarios(
    policy: PlaneDiffusionPolicy,
    num_scenarios: int = 16,
    save_path: str = "batch_scenarios_visualization.png",
    device: str = "cuda"
):
    """
    使用你的dataset_utils逻辑生成多个场景并可视化
    """
    # 设置参数
    bounds = np.array([[0.0, 8.0], [0.0, 8.0]])  # x_bounds, y_bounds
    cell_size = 1.0
    origin = bounds[:, 0]  # [0.0, 0.0]
    max_rectangles = (1, 3)  # 随机1-3个矩形
    rng = np.random.default_rng()
    
    # 计算网格布局
    cols = 4
    rows = (num_scenarios + cols - 1) // cols
    
    # 创建图形
    fig, axes = plt.subplots(rows, cols, figsize=(16, 4 * rows))
    if rows == 1:
        axes = axes.reshape(1, -1)
    elif cols == 1:
        axes = axes.reshape(-1, 1)
    
    # 存储统计信息
    results = []
    success_count = 0
    
    print(f"使用你的逻辑生成 {num_scenarios} 个场景...")
    
    for i in range(num_scenarios):
        row = i // cols
        col = i % cols
        
        # 安全地获取axis
        if rows == 1 and cols == 1:
            ax = axes
        elif rows == 1:
            ax = axes[col]
        elif cols == 1:
            ax = axes[row]
        else:
            ax = axes[row, col]
        
        try:
            # 使用你的逻辑生成测试场景
            start, goal, obstacles = create_test_scenario(
                bounds=bounds,
                cell_size=cell_size,
                origin=origin,
                rng=rng,
                max_rectangles=max_rectangles,
                device=device
            )
            
            print(f"场景 #{i+1}: 起点 {start[0].cpu().numpy()}, 终点 {goal[0].cpu().numpy()}")
            print("Grid", obstacles)
            
            # 生成路径
            trajectory, _ = generate_path(policy, start, goal, obstacles)
            
            # 使用修复后的评估函数
            eval_result = evaluate_path_success(trajectory[0], start[0], goal[0], obstacles)
            results.append(eval_result)
            
            if eval_result['success']:
                success_count += 1
            
            # 转换为numpy用于可视化
            traj_np = trajectory[0].cpu().numpy()
            start_np = start[0].cpu().numpy()
            goal_np = goal[0].cpu().numpy()
            obs_np = obstacles[0, 0].cpu().numpy()
            
            # 绘制障碍物
            ax.imshow(obs_np, origin='lower', cmap='gray_r', 
                     extent=[0, 8, 0, 8], alpha=0.3)
            
            # 选择路径颜色
            if eval_result['success']:
                path_color = 'blue'
                path_style = '-'
                status = 'SUCCESS'
            elif eval_result['collision_free']:
                path_color = 'orange' 
                path_style = '-'
                status = 'NO GOAL'
            else:
                path_color = 'red'
                path_style = '--'
                status = 'COLLISION'
            
            # 绘制路径
            ax.plot(traj_np[:, 0], traj_np[:, 1], path_style, 
                   color=path_color, linewidth=2, alpha=0.8)
            ax.scatter(traj_np[:, 0], traj_np[:, 1], 
                      c=path_color, s=8, alpha=0.6)
            
            # 绘制起点和终点
            ax.scatter(start_np[0], start_np[1], c='green', s=100, 
                      marker='o', edgecolors='black', linewidth=2, label='Start')
            ax.scatter(goal_np[0], goal_np[1], c='red', s=100, 
                      marker='s', edgecolors='black', linewidth=2, label='Goal')
            
            # 设置图形属性
            ax.set_xlim(0, 8)
            ax.set_ylim(0, 8)
            ax.set_aspect('equal')
            ax.grid(True, alpha=0.2)
            
            # 添加标题
            title = f'#{i+1}: {status}\n'
            title += f'Goal: {eval_result["goal_distance"]:.2f} | '
            title += f'Eff: {eval_result["efficiency"]:.2f}'
            ax.set_title(title, fontsize=9, pad=5)
            
            # 移除刻度标签以节省空间
            ax.set_xticks([])
            ax.set_yticks([])
            
        except Exception as e:
            print(f"场景 #{i+1} 发生错误: {str(e)}")
            # 处理错误情况
            ax.text(0.5, 0.5, f'Error:\n{str(e)[:50]}...', 
                   ha='center', va='center', transform=ax.transAxes,
                   fontsize=8, color='red')
            ax.set_title(f'#{i+1}: ERROR', fontsize=9)
            ax.set_xlim(0, 1)
            ax.set_ylim(0, 1)
            
            results.append({'success': False, 'error': str(e)})
    
    # 隐藏多余的子图
    for i in range(num_scenarios, rows * cols):
        row = i // cols
        col = i % cols
        if rows == 1 and cols > 1:
            axes[col].set_visible(False)
        elif cols == 1 and rows > 1:
            axes[row].set_visible(False)
        elif rows > 1 and cols > 1:
            axes[row, col].set_visible(False)
    
    # 计算统计信息
    valid_results = [r for r in results if 'error' not in r]
    success_rate = success_count / len(valid_results) if valid_results else 0
    avg_goal_dist = np.mean([r['goal_distance'] for r in valid_results]) if valid_results else 0
    avg_efficiency = np.mean([r['efficiency'] for r in valid_results]) if valid_results else 0
    collision_count = sum(1 for r in valid_results if not r['collision_free'])
    collision_rate = collision_count / len(valid_results) if valid_results else 0
    
    # 添加总体标题
    suptitle = f'Diffusion Policy Evaluation: {num_scenarios} Scenarios\n'
    suptitle += f'Success Rate: {success_rate:.1%} | '
    suptitle += f'Collision Rate: {collision_rate:.1%} | '
    suptitle += f'Avg Goal Distance: {avg_goal_dist:.2f} | '
    suptitle += f'Avg Efficiency: {avg_efficiency:.2f}'
    
    fig.suptitle(suptitle, fontsize=14, y=0.98)
    
    # 调整布局
    plt.tight_layout()
    plt.subplots_adjust(top=0.90, hspace=0.3, wspace=0.2)
    
    # 保存图像
    plt.savefig(save_path, dpi=200, bbox_inches='tight')
    print(f"可视化结果已保存到: {save_path}")
    
    plt.show()
    
    # 打印详细统计
    print(f"\n{'='*50}")
    print("详细统计信息:")
    print(f"{'='*50}")
    print(f"总场景数: {num_scenarios}")
    print(f"有效场景数: {len(valid_results)}")
    print(f"成功场景数: {success_count}")
    print(f"成功率: {success_rate:.2%}")
    print(f"碰撞场景数: {collision_count}")
    print(f"碰撞率: {collision_rate:.2%}")
    if valid_results:
        print(f"平均目标距离: {avg_goal_dist:.3f}")
        print(f"平均路径效率: {avg_efficiency:.3f}")
        print(f"平均路径长度: {np.mean([r['path_length'] for r in valid_results]):.3f}")
    
    return results



In [None]:
ckpt_path = '/Users/yulinli/Desktop/Exp/diffusion_policy/ckpt_ep999.ckpt'
device = "cpu"


config = PlaneTestEmbedConfig()
config_dict = config.to_dict()
net = ConditionalUnet1D(
    input_dim=config.network_config["unet_config"]["action_dim"],
    global_cond_dim=config.network_config["vit_config"]["num_classes"] + config.network_config["mlp_config"]["embed_dim"],
    network_config=config.network_config
)
scheduler = build_noise_scheduler_from_config(config_dict)

policy = PlaneDiffusionPolicy(model=net, noise_scheduler=scheduler, config=config_dict, device=device)
policy.load_weights(ckpt_path)


# Add this cell to run the evaluation


# 如果需要更多场景的评估
print("\n" + "="*50)
print("运行更大规模的评估...")
large_evaluation = visualize_batch_scenarios(
    policy=policy, 
    num_scenarios=25, 
    save_path="large_evaluation_25_scenarios.png",
    device=device
)