In [1]:
import os
import sys
import pathlib
import numpy as np
import torch
from typing import Dict, Any

from openpi.training import config as _config
from openpi.training import data_loader as _data_loader
from openpi.training import checkpoints as _checkpoints
from openpi import transforms as _transforms

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["JAX_PLATFORMS"] = "cpu"
os.environ["TORCH_DISABLE_DYNAMO"] = "1"
os.environ["PYTORCH_JIT"] = "0"

print("✅ 导入完成")


  from .autonotebook import tqdm as notebook_tqdm


✅ 导入完成


In [2]:
# 配置参数
config_name = "pi05_pick_blue_bottle_libero_downsample4x"
local_data_path = "/home/wyz/.cache/huggingface/lerobot/your_hf_username/pick_blue_bottle_libero_downsample4x"
output_txt_file = "pick_blue_bottle_actions_normalization_comparison.txt"

# 要查看的样本数量（None 表示查看所有）
max_samples = 10  # 可以设置为 None 查看所有样本

print(f"配置:")
print(f"  Config: {config_name}")
print(f"  Data path: {local_data_path}")
print(f"  Output file: {output_txt_file}")
print(f"  Max samples: {max_samples if max_samples else 'All'}")


配置:
  Config: pi05_pick_blue_bottle_libero_downsample4x
  Data path: /home/wyz/.cache/huggingface/lerobot/your_hf_username/pick_blue_bottle_libero_downsample4x
  Output file: pick_blue_bottle_actions_normalization_comparison.txt
  Max samples: 10


In [3]:
# 加载配置和数据
print("Loading config and dataset...")
config = _config.get_config(config_name)
data_config = config.data.create(config.assets_dirs, config.model)

# 从本地路径加载数据集
from datasets import load_dataset

data_dir = pathlib.Path(local_data_path) / "data"
if not data_dir.exists():
    data_dir = pathlib.Path(local_data_path)

parquet_files = list(data_dir.rglob("*.parquet"))
if parquet_files:
    print(f"Found {len(parquet_files)} parquet files")
    parquet_files = sorted(parquet_files)
    hf_dataset = load_dataset("parquet", data_files=[str(f) for f in parquet_files], split="train")
else:
    try:
        hf_dataset = load_dataset("parquet", data_dir=str(data_dir), split="train")
    except Exception as e:
        print(f"Warning: Could not load from {data_dir}: {e}")
        hf_dataset = load_dataset(data_config.repo_id, split="train")

print(f"Dataset length: {len(hf_dataset)}")

# 加载归一化统计信息
ckpt_dir = pathlib.Path("/home/wyz/openpi/checkpoints/pi05_pick_blue_bottle_libero_downsample4x/pick_blue_bottle_finetune/20000")
norm_stats = _checkpoints.load_norm_stats(ckpt_dir / "assets", data_config.asset_id)
print(f"✅ Loaded norm stats from checkpoint")


Loading config and dataset...
Found 101 parquet files
Dataset length: 8472
✅ Loaded norm stats from checkpoint


In [8]:
# 处理样本并收集归一化前后的 actions
def to_numpy(x):
    if isinstance(x, torch.Tensor):
        return x.cpu().numpy()
    return np.asarray(x)

def process_sample(raw_sample, sample_idx, episode_idx, frame_idx):
    """处理单个样本，返回归一化前后的 actions"""
    # 获取原始 action（单个 action，不是 sequence）
    raw_action = None
    if "actions" in raw_sample:
        raw_action = raw_sample["actions"]
        if hasattr(raw_action, 'numpy'):
            raw_action = raw_action.numpy()
        elif not isinstance(raw_action, np.ndarray):
            raw_action = np.array(raw_action)
        # 确保是 1D
        if len(raw_action.shape) > 1:
            raw_action = raw_action[0]
    
    if raw_action is None:
        return None
    
    # 应用 transforms 获取归一化后的 action
    # 首先应用 repack_transforms
    sample_dict = {}
    for key, value in raw_sample.items():
        if isinstance(value, np.ndarray):
            sample_dict[key] = value
        elif hasattr(value, 'numpy'):
            sample_dict[key] = value.numpy()
        else:
            sample_dict[key] = value
    
    # 确保 prompt 键存在（RepackTransform 需要）
    # LeRobot 数据集中可能使用 "task" 而不是 "prompt"
    if "prompt" not in sample_dict:
        if "task" in sample_dict:
            task = sample_dict["task"]
            if isinstance(task, bytes):
                task = task.decode('utf-8')
            sample_dict["prompt"] = str(task)
        else:
            sample_dict["prompt"] = "pick blue bottle"  # 默认 prompt
    
    # 确保所有数值数据都是 numpy array
    for key in ["actions", "state"]:
        if key in sample_dict:
            if not isinstance(sample_dict[key], np.ndarray):
                sample_dict[key] = np.asarray(sample_dict[key])
    
    # 应用 repack_transforms
    for transform in data_config.repack_transforms.inputs:
        sample_dict = transform(sample_dict)
    
    # 应用 data_transforms.inputs
    for transform in data_config.data_transforms.inputs:
        sample_dict = transform(sample_dict)
    
    # 再次确保 actions 是 numpy array（归一化需要）
    if "actions" in sample_dict:
        if not isinstance(sample_dict["actions"], np.ndarray):
            sample_dict["actions"] = np.asarray(sample_dict["actions"])
    
    # 应用归一化
    normalize_transform = _transforms.Normalize(norm_stats, use_quantiles=data_config.use_quantile_norm)
    sample_dict = normalize_transform(sample_dict)
    
    # 获取归一化后的 action
    normalized_action = None
    if "actions" in sample_dict:
        normalized_action = to_numpy(sample_dict["actions"])
        if len(normalized_action.shape) > 1:
            normalized_action = normalized_action[0]
    
    return {
        "sample_idx": sample_idx,
        "episode_idx": episode_idx,
        "frame_idx": frame_idx,
        "raw_action": raw_action,
        "normalized_action": normalized_action,
    }

# 处理所有样本
print("Processing samples...")
results = []
num_samples_to_process = min(max_samples, len(hf_dataset)) if max_samples else len(hf_dataset)

for i in range(num_samples_to_process):
    raw_sample = hf_dataset[i]
    episode_idx = raw_sample.get("episode_index", i)
    frame_idx = raw_sample.get("frame_index", i)
    
    result = process_sample(raw_sample, i, episode_idx, frame_idx)
    if result is not None:
        results.append(result)
    
    if (i + 1) % 100 == 0:
        print(f"  Processed {i + 1}/{num_samples_to_process} samples...")

print(f"✅ Processed {len(results)} samples")


Processing samples...
✅ Processed 10 samples


In [9]:
# 保存归一化前后的 actions 到 txt 文件
print(f"Writing results to {output_txt_file}...")

with open(output_txt_file, 'w', encoding='utf-8') as f:
    f.write("=" * 100 + "\n")
    f.write("Pick Blue Bottle 数据集 - Actions 归一化前后对比\n")
    f.write("=" * 100 + "\n\n")
    
    f.write(f"数据集: {config_name}\n")
    f.write(f"数据路径: {local_data_path}\n")
    f.write(f"总样本数: {len(results)}\n")
    f.write(f"归一化方法: {'Quantile' if data_config.use_quantile_norm else 'Z-score'}\n")
    f.write("\n" + "=" * 100 + "\n\n")
    
    # 写入归一化统计信息
    if "actions" in norm_stats:
        stats = norm_stats["actions"]
        f.write("归一化统计信息 (Actions):\n")
        f.write("-" * 100 + "\n")
        if data_config.use_quantile_norm:
            if stats.q01 is not None and stats.q99 is not None:
                f.write(f"  Q01: {stats.q01}\n")
                f.write(f"  Q99: {stats.q99}\n")
        else:
            f.write(f"  Mean: {stats.mean}\n")
            f.write(f"  Std:  {stats.std}\n")
        f.write("\n" + "=" * 100 + "\n\n")
    
    # 写入每个样本的对比
    for idx, result in enumerate(results):
        f.write(f"样本 {idx + 1} / {len(results)}\n")
        f.write("-" * 100 + "\n")
        f.write(f"  样本索引: {result['sample_idx']}\n")
        f.write(f"  Episode索引: {result['episode_idx']}\n")
        f.write(f"  Frame索引: {result['frame_idx']}\n")
        f.write("\n")
        
        raw_action = result['raw_action']
        normalized_action = result['normalized_action']
        
        f.write("  原始 Action (归一化前):\n")
        f.write(f"    Shape: {raw_action.shape}\n")
        f.write(f"    Values: {raw_action}\n")
        f.write(f"    Min: {raw_action.min():.6f}, Max: {raw_action.max():.6f}, Mean: {raw_action.mean():.6f}\n")
        f.write("\n")
        
        if normalized_action is not None:
            f.write("  归一化后的 Action:\n")
            f.write(f"    Shape: {normalized_action.shape}\n")
            f.write(f"    Values: {normalized_action}\n")
            f.write(f"    Min: {normalized_action.min():.6f}, Max: {normalized_action.max():.6f}, Mean: {normalized_action.mean():.6f}\n")
            f.write("\n")
            
            # 计算每个维度的变化
            f.write("  每个维度的变化:\n")
            for dim in range(min(len(raw_action), len(normalized_action))):
                raw_val = raw_action[dim]
                norm_val = normalized_action[dim]
                change = norm_val - raw_val
                change_pct = (change / (abs(raw_val) + 1e-10)) * 100 if abs(raw_val) > 1e-10 else 0
                f.write(f"    Dim {dim:2d}: {raw_val:10.6f} -> {norm_val:10.6f} (变化: {change:10.6f}, {change_pct:6.2f}%)\n")
        else:
            f.write("  归一化后的 Action: None (处理失败)\n")
        
        f.write("\n" + "=" * 100 + "\n\n")
    
    # 写入汇总统计
    f.write("\n" + "=" * 100 + "\n")
    f.write("汇总统计\n")
    f.write("=" * 100 + "\n\n")
    
    if results:
        all_raw_actions = np.array([r['raw_action'] for r in results if r['raw_action'] is not None])
        all_normalized_actions = np.array([r['normalized_action'] for r in results if r['normalized_action'] is not None])
        
        if len(all_raw_actions) > 0:
            f.write("所有原始 Actions 的统计:\n")
            f.write(f"  Shape: {all_raw_actions.shape}\n")
            f.write(f"  Min: {all_raw_actions.min():.6f}\n")
            f.write(f"  Max: {all_raw_actions.max():.6f}\n")
            f.write(f"  Mean: {all_raw_actions.mean():.6f}\n")
            f.write(f"  Std:  {all_raw_actions.std():.6f}\n")
            f.write("\n")
            
            f.write("每个维度的原始 Actions 统计:\n")
            for dim in range(all_raw_actions.shape[1]):
                dim_values = all_raw_actions[:, dim]
                f.write(f"  Dim {dim:2d}: min={dim_values.min():10.6f}, max={dim_values.max():10.6f}, mean={dim_values.mean():10.6f}, std={dim_values.std():10.6f}\n")
            f.write("\n")
        
        if len(all_normalized_actions) > 0:
            f.write("所有归一化 Actions 的统计:\n")
            f.write(f"  Shape: {all_normalized_actions.shape}\n")
            f.write(f"  Min: {all_normalized_actions.min():.6f}\n")
            f.write(f"  Max: {all_normalized_actions.max():.6f}\n")
            f.write(f"  Mean: {all_normalized_actions.mean():.6f}\n")
            f.write(f"  Std:  {all_normalized_actions.std():.6f}\n")
            f.write("\n")
            
            f.write("每个维度的归一化 Actions 统计:\n")
            for dim in range(all_normalized_actions.shape[1]):
                dim_values = all_normalized_actions[:, dim]
                f.write(f"  Dim {dim:2d}: min={dim_values.min():10.6f}, max={dim_values.max():10.6f}, mean={dim_values.mean():10.6f}, std={dim_values.std():10.6f}\n")

print(f"✅ 结果已保存到: {output_txt_file}")


Writing results to pick_blue_bottle_actions_normalization_comparison.txt...
✅ 结果已保存到: pick_blue_bottle_actions_normalization_comparison.txt


In [None]:
# 显示前几个样本的摘要
print("\n" + "=" * 100)
print("前 5 个样本的摘要")
print("=" * 100)

for idx, result in enumerate(results[:5]):
    print(f"\n样本 {idx + 1}:")
    print(f"  Episode: {result['episode_idx']}, Frame: {result['frame_idx']}")
    
    raw_action = result['raw_action']
    normalized_action = result['normalized_action']
    
    print(f"  原始 Action (归一化前):")
    print(f"    Shape: {raw_action.shape}")
    print(f"    Values: {raw_action}")
    print(f"    Min: {raw_action.min():.6f}, Max: {raw_action.max():.6f}, Mean: {raw_action.mean():.6f}")
    
    if normalized_action is not None:
        print(f"  归一化 Action:")
        print(f"    Shape: {normalized_action.shape}")
        print(f"    Values: {normalized_action}")
        print(f"    Min: {normalized_action.min():.6f}, Max: {normalized_action.max():.6f}, Mean: {normalized_action.mean():.6f}")
        
        # 显示每个维度的变化
        print(f"  每个维度的变化:")
        for dim in range(min(len(raw_action), len(normalized_action))):
            raw_val = raw_action[dim]
            norm_val = normalized_action[dim]
            change = norm_val - raw_val
            change_pct = (change / (abs(raw_val) + 1e-10)) * 100 if abs(raw_val) > 1e-10 else 0
            print(f"    Dim {dim:2d}: {raw_val:10.6f} -> {norm_val:10.6f} (变化: {change:10.6f}, {change_pct:6.2f}%)")
    else:
        print(f"  归一化 Action: None (处理失败)")

print("\n" + "=" * 100)
print(f"✅ 完整结果已保存到: {output_txt_file}")
print(f"   共处理了 {len(results)} 个样本")
print("=" * 100)
