# Deploy Trained Smolvla Policy

<img src="./media/rollout3.gif" width="480" height="360">

Deploy trained policy in simulation.

In [1]:
!pip install transformers==4.50.3
!pip install num2words
!pip install accelerate
!pip install safetensors>=0.4.3

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


## Step 3. Deploy

In [2]:
# Dataset imports - moved from lerobot.common.datasets
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import write_json, serialize_dict, dataset_to_policy_features
from lerobot.datasets.factory import resolve_delta_timestamps

# SmolVLA policy imports - moved from lerobot.common.policies
from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
from lerobot.policies.smolvla.modeling_smolvla import SmolVLAPolicy

# Config types - moved from lerobot.configs
from lerobot.configs.types import FeatureType

# Standard imports remain the same
import numpy as np
import torch
from PIL import Image
import torchvision


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
device = 'cuda'

In [4]:
dataset_name = 'remove_red_block_from_plate_so100_smolvla_mujoco'
try:
    dataset_metadata = LeRobotDatasetMetadata(dataset_name, root='./demo_data_language')
except Exception:
    dataset_metadata = LeRobotDatasetMetadata(dataset_name, root='./demo_data_language')
features = dataset_to_policy_features(dataset_metadata.features)
output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION}
input_features = {key: ft for key, ft in features.items() if key not in output_features}
# Policies are initialized with a configuration class, in this case `DiffusionConfig`. For this example,
# we'll just use the defaults and so no arguments other than input/output features need to be passed.
# Temporal ensemble to make smoother trajectory predictions
cfg = SmolVLAConfig(input_features=input_features, output_features=output_features, chunk_size= 5, n_action_steps=5)
delta_timestamps = resolve_delta_timestamps(cfg, dataset_metadata)




In [5]:
policy_path = './ckpt/smolvla_omy/checkpoints/last/pretrained_model'
policy = SmolVLAPolicy.from_pretrained(
    policy_path,
    dataset_stats=dataset_metadata.stats,
)
# policy = SmolVLAPolicy.from_pretrained("DragonHu/lerobot_remove_block_uniform_ramdom_smolvla_base", config=cfg, dataset_stats=dataset_metadata.stats)



policy.to(device)


Reducing the number of VLM layers to 16 ...
Loading weights from local directory


SmolVLAPolicy(
  (normalize_inputs): Normalize(
    (buffer_observation_state): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 6 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 6 (cuda:0)]
    )
    (buffer_observation_block_pose): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
    )
  )
  (normalize_targets): Normalize(
    (buffer_action): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
    )
  )
  (unnormalize_outputs): Unnormalize(
    (buffer_action): ParameterDict(
        (mean): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
        (std): Parameter containing: [torch.cuda.FloatTensor of size 7 (cuda:0)]
    )
  )
  (model): VLAFlowMat

In [6]:
from mujoco_env.y_env2_removeBlock_Test_random import EnvRemoveBlockTestRandom
xml_path = './asset/scene_remove_block_so100.xml'
PnPEnv = EnvRemoveBlockTestRandom(
    xml_path,
    action_type='joint_angle',
    random_block_position=True,
    plate_flat_radius=0.102,
    sampling_method='latin_hypercube',
    seed=25
)



-----------------------------------------------------------------------------
name:[Tabletop] dt:[0.002] HZ:[500]
 n_qpos:[27] n_qvel:[24] n_qacc:[24] n_ctrl:[6]
 integrator:[RK4]

n_body:[18]
 [0/18] [world] mass:[0.00]kg
 [1/18] [front_object_table] mass:[1.00]kg
 [2/18] [camera] mass:[0.00]kg
 [3/18] [camera2] mass:[0.00]kg
 [4/18] [camera3] mass:[0.00]kg
 [5/18] [Base] mass:[0.56]kg
 [6/18] [Rotation_Pitch] mass:[0.12]kg
 [7/18] [Upper_Arm] mass:[0.16]kg
 [8/18] [Lower_Arm] mass:[0.15]kg
 [9/18] [Wrist_Pitch_Roll] mass:[0.07]kg
 [10/18] [Fixed_Jaw] mass:[0.09]kg
 [11/18] [camera_center] mass:[0.00]kg
 [12/18] [Moving_Jaw] mass:[0.02]kg
 [13/18] [body_obj_plate_11] mass:[0.00]kg
 [14/18] [object_plate_11] mass:[0.10]kg
 [15/18] [body_obj_mug_6] mass:[0.00]kg
 [16/18] [object_mug_6] mass:[0.08]kg
 [17/18] [body_obj_block_red] mass:[0.30]kg
body_total_mass:[2.66]kg

n_geom:[101]
geom_names:['floor', 'front_object_table', None, None, None, None, None, None, None, None, None, None, Non

In [7]:
from torchvision import transforms
# Approach 1: Using torchvision.transforms
def get_default_transform(image_size: int = 224):
    """
    Returns a torchvision transform that:
     Converts to a FloatTensor and scales pixel values [0,255] -> [0.0,1.0]
    """
    return transforms.Compose([
        transforms.ToTensor(),  # PIL [0–255] -> FloatTensor [0.0–1.0], shape C×H×W
    ])

In [8]:
# 测试配置参数
# 可以根据需要修改这些参数

# 测试轮数设置
TEST_ROUNDS = 20  # 默认测试10轮，可以修改为任意数字

# 超时设置（秒）
TIMEOUT_SECONDS = 90  # 默认2分钟超时，可以修改

# 设置环境的超时时间
PnPEnv.task_timeout = TIMEOUT_SECONDS

print(f"测试配置:")
print(f"- 测试轮数: {TEST_ROUNDS}")
print(f"- 单轮超时时间: {TIMEOUT_SECONDS} 秒")
print(f"- 预计最大测试时间: {TEST_ROUNDS * TIMEOUT_SECONDS / 60:.1f} 分钟")
print("准备开始测试...")

if hasattr(PnPEnv, "get_random_position_info"):
    random_info = PnPEnv.get_random_position_info()
    print("随机位置配置:")
    print(f"- 采样方法: {random_info['sampling_method']}")
    print(f"- Plate平整半径: {random_info['plate_flat_radius']:.3f} m")
    print(f"- Block角点距离: {random_info['block_corner_distance']:.3f} m")
    print(f"- 安全生成半径: {random_info['safe_generation_radius']:.3f} m")


测试配置:
- 测试轮数: 20
- 单轮超时时间: 90 秒
- 预计最大测试时间: 30.0 分钟
准备开始测试...
随机位置配置:
- 采样方法: latin_hypercube
- Plate平整半径: 0.102 m
- Block角点距离: 0.021 m
- 安全生成半径: 0.081 m


In [9]:
# 开始主测试循环
step = 0
IMG_TRANSFORM = get_default_transform()

# 开始测试循环
PnPEnv.start_testing(max_rounds=TEST_ROUNDS)
policy.reset()
policy.eval()

print(f"开始进行 {TEST_ROUNDS} 轮SmolVLA任务成功率测试...")

while PnPEnv.env.is_viewer_alive() and PnPEnv.is_testing:
    PnPEnv.step_env()
    if PnPEnv.env.loop_every(HZ=20):
        # 检查是否超时
        if PnPEnv.check_timeout():
            print(f"任务超时！已运行 {PnPEnv.task_timeout} 秒")
            all_tests_completed = PnPEnv.handle_task_completion(success=False, timeout=True)
            if all_tests_completed:
                break
            policy.reset()
            step = 0
            continue
        
        # 检查任务是否完成
        success = PnPEnv.check_success()
        if success:
            print('任务成功完成!')
            all_tests_completed = PnPEnv.handle_task_completion(success=True, timeout=False)
            if all_tests_completed:
                break
            # 重置policy和步数计数器
            policy.reset()
            step = 0
            continue
        
        # 获取当前环境状态
        state = PnPEnv.get_joint_state()[:6]
        # 获取当前图像
        image, wrist_image = PnPEnv.grab_image()
        image = Image.fromarray(image)
        image = image.resize((256, 256))
        image = IMG_TRANSFORM(image)
        wrist_image = Image.fromarray(wrist_image)
        wrist_image = wrist_image.resize((256, 256))
        wrist_image = IMG_TRANSFORM(wrist_image)
        
        data = {
            'observation.state': torch.tensor([state]).to(device),
            'observation.image': image.unsqueeze(0).to(device),
            'observation.wrist_image': wrist_image.unsqueeze(0).to(device),
            'task': [PnPEnv.instruction],
        }
        
        # 选择动作
        action = policy.select_action(data)
        action = action[0,:7].cpu().detach().numpy()
        
        # 执行动作
        _ = PnPEnv.step(action)
        
        # 渲染环境，显示测试进度
        progress = PnPEnv.get_test_progress()
        if progress:
            PnPEnv.render(idx=progress['current_round'])
        else:
            PnPEnv.render()
        
        step += 1
        
        # 显示测试进度信息 - 增加显示频率
        if step % 20 == 0 and progress:  # 每20步显示一次进度，增加显示频率
            print(f"测试进度: {progress['current_round']}/{progress['total_rounds']}, "
                  f"已运行时间: {progress['elapsed_time']:.1f}s, "
                  f"当前任务: {progress['current_task']}")

print("测试完成！")
# 如果测试被中断，也显示当前结果
if hasattr(PnPEnv, 'test_stats') and PnPEnv.test_stats['total_tests'] > 0:
    final_results = PnPEnv.print_test_results()
    print("\n📋 最终测试结果摘要:")
    print(f"🎯 成功率: {final_results['success_rate']:.2f}%")
    print(f"📊 总测试次数: {final_results['total_tests']}")
    print(f"✅ 成功次数: {final_results['successful_tests']}")
    print(f"❌ 失败次数: {final_results['failed_tests']}")
    print(f"⏰ 超时次数: {final_results['timeout_tests']}")
    
    # 显示成功时间摘要
    if 'success_times' in final_results and final_results['success_times']:
        success_times = final_results['success_times']
        avg_time = sum(success_times) / len(success_times)
        print(f"⏱️ 平均成功时间: {avg_time:.1f}秒")
        print(f"⚡ 最快成功时间: {min(success_times):.1f}秒")
        print(f"🐌 最慢成功时间: {max(success_times):.1f}秒")
    
    print("🔚 MuJoCo窗口已自动关闭，测试结束")

随机位置测试启动: method=latin_hypercube, safe_radius=0.081m
开始测试循环，总共将进行 20 轮测试
ik_err:[0.0163] is higher than ik_err_th:[0.0100].
You may want to increase max_ik_tick:[1000]
随机red block位置: [0.300, -0.235, 0.850]
DONE INITIALIZATION
开始进行 20 轮SmolVLA任务成功率测试...


  'observation.state': torch.tensor([state]).to(device),


测试进度: 1/20, 已运行时间: 1.9s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 3.5s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 5.1s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 6.7s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 8.3s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 9.8s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 11.4s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 12.9s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 14.5s, 当前任务: remove_red_block
测试进度: 1/20, 已运行时间: 16.0s, 当前任务: remove_red_block
红块移除任务完成！
任务成功完成!
第 1 轮测试成功！用时: 16.4秒
准备进行第 2 轮测试...
ik_err:[0.0163] is higher than ik_err_th:[0.0100].
You may want to increase max_ik_tick:[1000]
随机red block位置: [0.283, -0.217, 0.850]
DONE INITIALIZATION
测试进度: 2/20, 已运行时间: 1.7s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 3.2s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 4.7s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 6.2s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 7.7s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 9.2s, 当前任务: remove_red_block
测试进度: 2/20, 已运行时间: 10.

In [10]:
# 详细测试结果分析
import numpy as np

if hasattr(PnPEnv, 'test_stats') and PnPEnv.test_stats['total_tests'] > 0:
    results = PnPEnv.test_stats
    total = results['total_tests']
    success = results['successful_tests']
    failed = results['failed_tests']
    timeout = results['timeout_tests']
    success_times = results.get('success_times', [])

    print("\n" + "=" * 60)
    print("SmolVLA 任务成功率测试 - 详细分析报告")
    print("=" * 60)

    # 基本统计
    print("📊 基本统计:")
    print(f"   总测试轮数: {total}")
    print(f"   成功完成: {success} 轮")
    print(f"   失败: {failed} 轮")
    print(f"   超时: {timeout} 轮")

    # 成功率计算
    success_rate = (success / total * 100) if total > 0 else 0
    print(f"\n🎯 成功率: {success_rate:.2f}%")

    # 成功时间分析
    if success_times:
        avg_time = sum(success_times) / len(success_times)
        min_time = min(success_times)
        max_time = max(success_times)
        print("\n⏱️ 成功任务时间分析:")
        print(f"   平均完成时间: {avg_time:.1f}秒")
        print(f"   最快完成时间: {min_time:.1f}秒")
        print(f"   最慢完成时间: {max_time:.1f}秒")
        print(f"   所有成功时间: {[f'{t:.1f}s' for t in success_times]}")

        # 时间性能评估
        if avg_time <= 30:
            print("   ⚡ 速度评估: 非常快速")
        elif avg_time <= 60:
            print("   🚀 速度评估: 较快")
        elif avg_time <= 90:
            print("   🐌 速度评估: 一般")
        else:
            print("   🐢 速度评估: 较慢")

    # 失败分析
    if failed > 0:
        timeout_rate = (timeout / failed * 100) if failed > 0 else 0
        print("\n❌ 失败分析:")
        print(f"   失败率: {(failed / total * 100):.2f}%")
        print(f"   超时导致的失败: {timeout} 轮 ({timeout_rate:.1f}% of failures)")
        print(f"   其他原因失败: {failed - timeout} 轮")

    # 方块位置分析
    block_records = results.get('block_position_records', [])
    success_positions = results.get('block_positions_success', [])
    failure_positions = results.get('block_positions_failure', [])
    timeout_positions = results.get('block_positions_timeout', [])

    if block_records:
        print("\n🧱 方块位置分析:")
        print(f"   记录总数: {len(block_records)}")

        def summarize_positions(label, positions):
            if not positions:
                print(f"   {label}: 无记录")
                return
            arr = np.array([np.array(p) for p in positions])
            print(f"   {label}: {len(positions)} 次")
            print(f"      X 范围: [{arr[:, 0].min():.3f}, {arr[:, 0].max():.3f}]")
            print(f"      Y 范围: [{arr[:, 1].min():.3f}, {arr[:, 1].max():.3f}]")
            print(f"      平均位置: [{arr[:, 0].mean():.3f}, {arr[:, 1].mean():.3f}, {arr[:, 2].mean():.3f}]")

        summarize_positions("✅ 成功回合的方块位置", success_positions)
        summarize_positions("❌ 失败回合的方块位置", failure_positions)
        summarize_positions("⏰ 超时回合的方块位置", timeout_positions)

        result_labels = {'success': '成功', 'failure': '失败', 'timeout': '超时'}
        print("   每轮方块位置:")
        for record in block_records:
            pos = record.get('position', [0.0, 0.0, 0.0])
            if isinstance(pos, np.ndarray):
                pos = pos.tolist()
            print(
                f"      第{record.get('round', 0):02d}轮："
                f"{result_labels.get(record.get('result'), record.get('result'))}"
                f"，坐标为 ({pos[0]:.3f}, {pos[1]:.3f}, {pos[2]:.3f})"
            )
    else:
        print("\n🧱 方块位置分析: 暂无记录")

    # 性能评估
    print("\n📈 综合性能评估:")
    if success_rate >= 80:
        print("   🟢 优秀 - 模型表现非常好")
    elif success_rate >= 60:
        print("   🟡 良好 - 模型表现较好，有改进空间")
    elif success_rate >= 40:
        print("   🟠 一般 - 模型需要进一步优化")
    else:
        print("   🔴 较差 - 模型需要重新训练或调整")

    # 建议
    print("\n💡 优化建议:")
    if timeout > 0:
        print("   - 考虑优化模型推理速度或增加超时时间")
    if success_rate < 70:
        print("   - 考虑收集更多训练数据")
        print("   - 检查模型训练参数")
        print("   - 验证环境设置是否正确")
    if success_times and avg_time > 60:
        print("   - 考虑优化动作选择策略以提高执行效率")

    print("=" * 60)
    print("🎉 测试完成！MuJoCo窗口已自动关闭")
else:
    print("没有测试数据可供分析")



SmolVLA 任务成功率测试 - 详细分析报告
📊 基本统计:
   总测试轮数: 20
   成功完成: 10 轮
   失败: 10 轮
   超时: 10 轮

🎯 成功率: 50.00%

⏱️ 成功任务时间分析:
   平均完成时间: 13.3秒
   最快完成时间: 10.0秒
   最慢完成时间: 27.4秒
   所有成功时间: ['16.4s', '11.8s', '10.0s', '11.5s', '11.6s', '10.8s', '27.4s', '11.1s', '10.8s', '11.8s']
   ⚡ 速度评估: 非常快速

❌ 失败分析:
   失败率: 50.00%
   超时导致的失败: 10 轮 (100.0% of failures)
   其他原因失败: 0 轮

🧱 方块位置分析:
   记录总数: 20
   ✅ 成功回合的方块位置: 10 次
      X 范围: [0.267, 0.344]
      Y 范围: [-0.237, -0.143]
      平均位置: [0.306, -0.200, 0.850]
   ❌ 失败回合的方块位置: 10 次
      X 范围: [0.255, 0.379]
      Y 范围: [-0.266, -0.140]
      平均位置: [0.322, -0.177, 0.850]
   ⏰ 超时回合的方块位置: 10 次
      X 范围: [0.255, 0.379]
      Y 范围: [-0.266, -0.140]
      平均位置: [0.322, -0.177, 0.850]
   每轮方块位置:
      第01轮：成功，坐标为 (0.300, -0.235, 0.850)
      第02轮：成功，坐标为 (0.283, -0.217, 0.850)
      第03轮：成功，坐标为 (0.308, -0.236, 0.850)
      第04轮：成功，坐标为 (0.312, -0.187, 0.850)
      第05轮：成功，坐标为 (0.312, -0.143, 0.850)
      第06轮：成功，坐标为 (0.325, -0.182, 0.850)
      第07轮：超时，坐标为 (0.255

In [11]:
# [可选] 单次测试模式 - 用于调试
# 如果只想进行一次测试来检查模型是否正常工作，可以运行这个cell

# 取消注释下面的代码来进行单次测试
"""
print("开始单次测试模式...")
PnPEnv.reset(seed=0)
policy.reset()
policy.eval()
step = 0
IMG_TRANSFORM = get_default_transform()

while PnPEnv.env.is_viewer_alive():
    PnPEnv.step_env()
    if PnPEnv.env.loop_every(HZ=20):
        # 检查任务是否完成
        success = PnPEnv.check_success()
        if success:
            print('单次测试成功完成!')
            break
        
        # 获取当前环境状态
        state = PnPEnv.get_joint_state()[:6]
        # 获取当前图像
        image, wrist_image = PnPEnv.grab_image()
        image = Image.fromarray(image)
        image = image.resize((256, 256))
        image = IMG_TRANSFORM(image)
        wrist_image = Image.fromarray(wrist_image)
        wrist_image = wrist_image.resize((256, 256))
        wrist_image = IMG_TRANSFORM(wrist_image)
        
        data = {
            'observation.state': torch.tensor([state]).to(device),
            'observation.image': image.unsqueeze(0).to(device),
            'observation.wrist_image': wrist_image.unsqueeze(0).to(device),
            'task': [PnPEnv.instruction],
        }
        
        # 选择动作
        action = policy.select_action(data)
        action = action[0,:7].cpu().detach().numpy()
        
        # 执行动作
        _ = PnPEnv.step(action)
        PnPEnv.render()
        step += 1
        
        # 显示步数
        if step % 50 == 0:
            print(f"已执行 {step} 步")
"""

print("单次测试模式代码已准备就绪（当前被注释）")


单次测试模式代码已准备就绪（当前被注释）


In [12]:
# [可选] 手动关闭MuJoCo窗口
# 如果需要在测试过程中手动关闭窗口，可以运行这个cell

# 取消注释下面的代码来手动关闭窗口
"""
if hasattr(PnPEnv, 'close_viewer'):
    PnPEnv.close_viewer()
    print("MuJoCo窗口已手动关闭")
else:
    print("环境不支持关闭窗口功能")
"""

print("手动关闭窗口功能已准备就绪（当前被注释）")


手动关闭窗口功能已准备就绪（当前被注释）


In [None]:
# policy.push_to_hub(
#     repo_id='Jeongeun/omy_pnp_smolvla',
#     commit_message='Add trained policy for PnP task',
# )