## 1. Import Necessary Libraries

In [1]:
import pybullet as p
import pybullet_data
import time
import numpy as np
from PIL import Image
import torch
from transformers import AutoModel, AutoProcessor
import sys
import traceback

## 2. Configuration and Hyperparameters ⚙️

This block defines the key constants and settings that control the simulation's behavior, the VLA model to be used, and the criteria for success.

* **`FINED_TUNED_MODEL_PATH`**: This variable holds the path to the AI model that will control the robot. It's currently set to a local, fine-tuned version.
    * **Important Note**: for test orginal model just change it to `"IPEC-COMMUNITY/spatialvla-4b-224-pt"`.

* **`NUM_EPISODES`**: Defines the total number of times the simulation will run from start to finish.

* **`MAX_STEPS_PER_EPISODE`**: Sets the maximum number of actions the robot can take within a single episode. This prevents the simulation from getting stuck in an infinite loop.

* **Success Thresholds**: These values define the conditions for a successful action:
    * **`GRASP_THRESHOLD`**: The maximum distance (in meters) between the robot's gripper and an object to consider the grasp successful.
    * **`LIFT_THRESHOLD`**: The minimum height (in meters) an object must be lifted off the surface to count as a successful lift.
    * **`TRAY_XY_THRESHOLD`**: The maximum distance (in meters) in the XY plane from the center of the tray for a placement to be considered successful.

In [2]:
FINED_TUNED_MODEL_PATH = "ahrzeroday/Fine_tuned_SpatialVLA"
NUM_EPISODES = 100
MAX_STEPS_PER_EPISODE = 5
GRASP_THRESHOLD = 0.1
LIFT_THRESHOLD = 0.01
TRAY_XY_THRESHOLD = 0.2

## 3. Environment Setup and Evaluation Functions

This block contains all the necessary functions to set up the PyBullet simulation environment, reset it for each new episode, and check for various success conditions throughout the robotic task.

### 3.1. Environment Setup and Reset
These functions are responsible for creating and managing the state of the simulation world.

* **`setup_world()`**: A helper function that builds the static parts of the environment. It sets the gravity and loads the ground plane, the table, and the Franka Emika Panda robot into the scene.
* **`reset_environment()`**: This function that prepares the simulation for the start of a new episode. It performs several key actions:
    1.  **Full Reset**: Calls `p.resetSimulation()` to completely clear the previous state.
    2.  **Rebuild World**: Uses `setup_world()` to add the static elements back.
    3.  **Set Initial Pose**: Moves the robot's joints to a predefined starting configuration.
    4.  **Randomize Objects**: To ensure the model learns a generalizable skill, it places the cube and the tray in randomized positions on the left or right side of the table for each episode. It also randomizes the cube's color.
    5.  **Stabilize**: Runs the simulation for a few steps to allow the objects to settle before the agent begins its task.

### 3.2. Success Condition Checkers
These functions evaluate the robot's performance at different stages of the pick-and-place task. They translate abstract goals into measurable, physics-based conditions.

* **`check_grasp_proximity()`**: Checks if the robot's gripper is within a certain distance of the cube.
* **`check_successful_grasp()`**: Determines if the robot has securely closed its fingers on the cube AND lifted it off the table.
* **`check_release_over_tray()`**: Checks if the gripper is opening while positioned within the rectangular boundary (AABB) of the tray. This is a robust way to confirm the release location.
* **`check_task_success()`**: The final check to determine if the episode was successful, verifying that the cube is resting inside the tray.

In [3]:
def setup_world():
    """Sets up the static environment: ground, table, and robot."""
    p.setGravity(0, 0, -9.81)
    p.loadURDF("plane.urdf")
    p.loadURDF("table/table.urdf", basePosition=[0, 0, 0])
    # Load the Franka Panda robot with a fixed base.
    robot_id = p.loadURDF("franka_panda/panda.urdf", basePosition=[-0.45, 0, 0.625], useFixedBase=True)
    return robot_id

def reset_environment():
    """Resets the entire simulation and rebuilds the world for a new episode."""
    # Resetting the simulation provides a clean slate for each episode.
    p.resetSimulation()
    
    # Re-create the static world (ground, table, robot).
    robot_id = setup_world()

    # Set the robot's joints to a default starting pose.
    for i, angle in enumerate([0, -0.4, 0, -2.4, 0, 2.0, 0.8]):
        p.resetJointState(robot_id, i, targetValue=angle, targetVelocity=0)

    # Set the gripper fingers (joints 9 and 10) to an open position.
    p.setJointMotorControl2(robot_id, 9, p.POSITION_CONTROL, 0.04, force=50)
    p.setJointMotorControl2(robot_id, 10, p.POSITION_CONTROL, 0.04, force=50)

    # Define two zones on the table for object placement.
    left_zone = {'x': (0.1, 0.3), 'y': (0.1, 0.3)}
    right_zone = {'x': (0.1, 0.3), 'y': (-0.3, -0.1)}
    # Randomly assign the cube and tray to these zones.
    cube_zone, tray_zone = (left_zone, right_zone) if np.random.rand() > 0.5 else (right_zone, left_zone)

    # Spawn the cube at a random position within its assigned zone.
    initial_cube_pos = [np.random.uniform(*cube_zone['x']), np.random.uniform(*cube_zone['y']), 0.64]
    cube_id = p.loadURDF("cube_small.urdf", basePosition=initial_cube_pos)

    # Give the cube a random color for better generalization.
    p.changeVisualShape(cube_id, -1, rgbaColor=[np.random.uniform(0.2, 0.8), np.random.uniform(0.2, 0.8), np.random.uniform(0.2, 0.8), 1.0])

    # Spawn the tray at a random position within its assigned zone.
    tray_pos = [np.random.uniform(*tray_zone['x']), np.random.uniform(*tray_zone['y']), 0.63]
    tray_id = p.loadURDF("tray/tray.urdf", basePosition=tray_pos, globalScaling=0.6)

    # Run the simulation for a short period to let objects settle.
    for _ in range(50):
        p.stepSimulation()
        
    return robot_id, cube_id, tray_id


def check_grasp_proximity(robot_id, cube_id):
    """Checks if the gripper is close enough to the cube to attempt a grasp."""
    try:
        # Get the position of the end-effector (link 11).
        link_state = p.getLinkState(robot_id, 11)
        gripper_pos = np.array(link_state[0])
        cube_pos, _ = p.getBasePositionAndOrientation(cube_id)

        # Return True if the distance is within the defined threshold.
        return np.linalg.norm(gripper_pos - np.array(cube_pos)) < GRASP_THRESHOLD
    except p.error:
        return False

def check_successful_grasp(robot_id, cube_id, initial_cube_z):
    """Checks if the gripper is closed and the cube has been lifted."""
    try:
         # Check if the gripper fingers (joints 9 and 10) are closed.
        finger1_pos = p.getJointState(robot_id, 9)[0]
        finger2_pos = p.getJointState(robot_id, 10)[0]
        is_gripper_closed = finger1_pos < 0.01 and finger2_pos < 0.01
        if not is_gripper_closed: return False

        # Check if the cube has been lifted above its starting height.
        current_cube_pos, _ = p.getBasePositionAndOrientation(cube_id)
        is_cube_lifted = current_cube_pos[2] > (initial_cube_z + LIFT_THRESHOLD)
        return is_cube_lifted
    except p.error:
        return False

def check_release_over_tray(robot_id, tray_id, is_gripper_opening):
    """Checks if the gripper is opening while positioned over the tray's area."""
    if not is_gripper_opening:
        return False
    
    try:
        # Get the gripper's current XY position.
        link_state = p.getLinkState(robot_id, 11)
        gripper_pos = np.array(link_state[0])
        gx, gy = gripper_pos[0], gripper_pos[1]

        # Get the tray's Axis-Aligned Bounding Box (AABB) for a precise area check.
        aabb_min, aabb_max = p.getAABB(tray_id)
        min_x, min_y = aabb_min[0], aabb_min[1]
        max_x, max_y = aabb_max[0], aabb_max[1]
        
       # Check if the gripper's XY position is within the tray's rectangular bounds.
        is_over_tray = (min_x < gx < max_x) and (min_y < gy < max_y)
        
        return is_over_tray
    except p.error:
        return False

def check_task_success(cube_id, tray_id):
    """Checks if the cube is successfully placed inside the tray."""
    try:
        cube_pos, _ = p.getBasePositionAndOrientation(cube_id)
        tray_pos, _ = p.getBasePositionAndOrientation(tray_id)

        # Check if the XY distance between the cube and tray is small enough.
        xy_distance = np.linalg.norm(np.array(cube_pos[:2]) - np.array(tray_pos[:2]))
        # Check if the cube is resting at the correct height inside the tray.
        is_z_correct = (tray_pos[2] < cube_pos[2] < tray_pos[2] + 0.1)
        return xy_distance < TRAY_XY_THRESHOLD and is_z_correct
    except p.error:
        return False

## 4. Main Simulation and Evaluation

This is the key part that coordinates the whole process. It starts the simulation, and loads the VLA model, executes the evaluation episodes, drives the robot by model predictions, and, lastly, announces the results.

### 4.1. Initialization
First, the script connects to the PyBullet physics engine with a graphical user interface (GUI) and loads the pre-trained VLA model (either the local fine-tuned version or the one from Hugging Face) onto the GPU. It also defines the natural language prompt for the task and initializes a dictionary to store the evaluation results.

### 4.2. The Main Episode
The script iterates for a predefined `NUM_EPISODES`. In each episode:
1.  **Reset**: The environment is reset to a new, randomized configuration using `reset_environment()`.
2.  **Visualize**: The debug camera is positioned to provide a clear view of the scene.
3.  **Initialize Trackers**: Flags and variables for tracking the episode's progress (e.g., if a grasp was successful) are reset.

### 4.3. The Perception-Action Cycle
This is the core of the robot's intelligence, where it repeatedly perceives, thinks, and acts. For each step within an episode:
1.  **Perception**: It captures a 224x224 image from the virtual camera's perspective.
2.  **Inference**: The captured image and the text prompt are passed to the VLA model. The model predicts the next action for the robot's arm and gripper.
3.  **Action Processing**: The raw action predicted by the model is **smoothed** and **clipped**. Smoothing prevents jerky movements, and clipping ensures the robot doesn't try to move too far or too fast in a single step.
4.  **Execution**:
    * **Inverse Kinematics (IK)**: The script calculates the necessary robot joint angles to move the gripper to the desired target position.
    * **Motor Control**: It sends commands to the robot's motors to execute the calculated joint movements and gripper actions (open/close).

In [None]:
def main():
    p.connect(p.GUI)
    p.setAdditionalSearchPath(pybullet_data.getDataPath())

    print(f"Loading model from: {FINED_TUNED_MODEL_PATH}")
    try:
        # Load the processor and model from the specified path.
        processor = AutoProcessor.from_pretrained(FINED_TUNED_MODEL_PATH, trust_remote_code=True)
        model = AutoModel.from_pretrained(FINED_TUNED_MODEL_PATH, trust_remote_code=True, torch_dtype=torch.bfloat16).eval().to("cuda")
    except Exception as e:
        print(f"Fatal Error loading model: {e}")
        p.disconnect()

    prompt = "pick up the cube and put it in the tray"
    # A special key required by the processor for data normalization/denormalization.
    # Note: This key must match the one used during model training that "my_robot_dataset/1.0.0" was trained and used. for using orginal model from Hugging Face you should use "bridge_orig/1.0.0"
    unnorm_key = "my_robot_dataset/1.0.0"

    # Dictionary to store the results of the evaluation.
    evaluation_results = {
        'grasp_proximity': 0, 'successful_grasps': 0,
        'release_attempts': 0, 'successful_tasks': 0,
    }

    print(f"\n--- Starting Evaluation for {NUM_EPISODES} Episodes ---")

    for episode in range(NUM_EPISODES):
        print(f"\n--- Episode {episode + 1}/{NUM_EPISODES} ---")
        
        # Reset the environment and get IDs for the robot, cube, and tray.
        robot_id, cube_id, tray_id = reset_environment()
        
        # Configure the GUI camera for a consistent view.
        p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
        p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 1)
        p.resetDebugVisualizerCamera(0.9, 60, -35, [0.1, 0, 0.65])
        
        # Store the initial height of the cube for the lift check.
        initial_cube_pos, _ = p.getBasePositionAndOrientation(cube_id)
        initial_cube_z = initial_cube_pos[2]
        time.sleep(1)

        # Flags to track if milestones were achieved in this episode.
        episode_proximity_achieved = False
        episode_grasp_successful = False
        episode_release_achieved = False

        # Variables for smoothing the robot's movement.
        smoothed_pos_delta = np.zeros(3)
        smoothing_alpha = 0.7 # Smoothing factor (higher means more responsive).

        try:
            for step in range(MAX_STEPS_PER_EPISODE):
                    # Capture an image from the virtual camera
                    print(f"  Attempt {step + 1}/{MAX_STEPS_PER_EPISODE}...")
                    view_matrix = p.computeViewMatrixFromYawPitchRoll(cameraTargetPosition=[0.1, 0, 0.65], distance=0.9, yaw=60, pitch=-35, roll=0, upAxisIndex=2)
                    projection_matrix = p.computeProjectionMatrixFOV(fov=60, aspect=1.0, nearVal=0.01, farVal=2.0)
                    img_data = p.getCameraImage(224, 224, view_matrix, projection_matrix)
                    image = Image.fromarray(np.reshape(img_data[2], (224, 224, 4))[:, :, :3])
                    
                    # Inference: Get action from the model
                    inputs = processor(images=[image], text=prompt, unnorm_key=unnorm_key, return_tensors="pt").to("cuda")
                    with torch.no_grad():
                        generation_outputs = model.predict_action(inputs)
                    actions_chunk = processor.decode_actions(generation_outputs, unnorm_key=unnorm_key)['actions']


                    # Execute the predicted actions with smoothing and clipping.
                    for action_delta in actions_chunk:
                        # Smooth the position delta to avoid jerky movements.
                        raw_pos_delta = action_delta[:3]
                        smoothed_pos_delta = smoothing_alpha * raw_pos_delta + (1 - smoothing_alpha) * smoothed_pos_delta

                        # Clip the movement magnitude to a safe maximum.
                        max_step_magnitude = 0.25
                        delta_magnitude = np.linalg.norm(smoothed_pos_delta)
                        clipped_pos_delta = smoothed_pos_delta / delta_magnitude * max_step_magnitude if delta_magnitude > max_step_magnitude else smoothed_pos_delta

                        # Get the current position and orientation of the gripper.
                        link_state = p.getLinkState(robot_id, 11, computeForwardKinematics=True)
                        current_pos, current_orn_quat = np.array(link_state[0]), np.array(link_state[1])
                        
                        # Calculate the target position.
                        target_pos = current_pos + clipped_pos_delta

                        # Determine gripper command (open or close).
                        gripper_cmd = action_delta[6]
                        is_opening = gripper_cmd <= 0.5
                        gripper_target = 0.04 if is_opening else 0.00

                        # Execute the movement over a short duration for smoothness.
                        for _ in range(30):
                            try:
                                joint_poses = p.calculateInverseKinematics(robot_id, 11, target_pos, current_orn_quat)
                                for i in range(7): p.setJointMotorControl2(robot_id, i, p.POSITION_CONTROL, joint_poses[i], force=120)
                                p.setJointMotorControl2(robot_id, 9, p.POSITION_CONTROL, gripper_target, force=50)
                                p.setJointMotorControl2(robot_id, 10, p.POSITION_CONTROL, gripper_target, force=50)
                                p.stepSimulation()
                                time.sleep(1/240.)
                            except p.error:
                                break  # Skip this movement if IK fails.

                        if not episode_proximity_achieved: episode_proximity_achieved = check_grasp_proximity(robot_id, cube_id)
                        if not episode_grasp_successful: episode_grasp_successful = check_successful_grasp(robot_id, cube_id, initial_cube_z)
                        if not episode_release_achieved: episode_release_achieved = check_release_over_tray(robot_id, tray_id, is_opening)

                    # If the task is done, end the episode early.
                    if check_task_success(cube_id, tray_id):
                        print("  Task successful in this attempt, moving to next episode.")
                        break
            
            final_task_success = check_task_success(cube_id, tray_id)
            if episode_proximity_achieved: evaluation_results['grasp_proximity'] += 1
            if episode_grasp_successful: evaluation_results['successful_grasps'] += 1
            if episode_release_achieved: evaluation_results['release_attempts'] += 1
            if final_task_success: evaluation_results['successful_tasks'] += 1
            print(f"  Episode {episode + 1} Report:")
            print(f"    - Grasp Proximity Met:   {'✅' if episode_proximity_achieved else '❌'}")
            print(f"    - Successful Grasp Met:  {'✅' if episode_grasp_successful else '❌'}")
            print(f"    - Release Over Tray Met: {'✅' if episode_release_achieved else '❌'}")
            print(f"    - Final Task Success:    {'✅' if final_task_success else '❌'}")

        except KeyboardInterrupt:
            print("\nUser terminated."); break
        except Exception as e:
            print(f"\nERROR IN EPISODE {episode + 1}: {e}"); traceback.print_exc(); break
            
    print("\n\n--- Final Evaluation Results ---")
    total_run = episode + 1 if 'episode' in locals() and episode is not None else 0
    if total_run > 0:
        proximity_accuracy = (evaluation_results['grasp_proximity'] / total_run) * 100
        grasp_success_rate = (evaluation_results['successful_grasps'] / total_run) * 100
        release_accuracy = (evaluation_results['release_attempts'] / total_run) * 100
        success_rate = (evaluation_results['successful_tasks'] / total_run) * 100
        print(f"Total Episodes Run: {total_run}")
        print("-" * 50)
        print(f"1. Grasp Proximity Accuracy: {proximity_accuracy:.2f}%")
        print(f"2. Successful Grasp Accuracy: {grasp_success_rate:.2f}%")
        print(f"3. Release Over Tray Accuracy: {release_accuracy:.2f}%")
        print(f"4. Overall Task Success Rate: {success_rate:.2f}%")
        print("-" * 50)
    else:
        print("No episodes were run.")


    if p.isConnected():
        p.disconnect()

if __name__ == "__main__":
    main()

Loading model from: ahrzeroday/Fine_tuned_SpatialVLA


Some kwargs in processor config are unused and will not have any effect: num_obs_steps, statistics, action_chunk_size, action_config, obs_delta, min_sigma, bin_policy, intrinsic_config. 


Add 0 TRANSLATION TOKENS, tokenizer vocab size 257152 / 265347
Add 0 ROTATION TOKENS to tokenizer, tokenizer vocab size 257152 / 265347
Add 0 GRIPPER TOKENS to tokenizer, tokenizer vocab size 257152 / 265347


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/176 [00:00<?, ?B/s]


--- Starting Evaluation for 100 Episodes ---

--- Episode 1/100 ---


Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


  Attempt 1/5...


The 'batch_size' attribute of HybridCache is deprecated and will be removed in v4.49. Use the more precisely named 'self.max_batch_size' attribute instead.


  Attempt 2/5...
  Attempt 3/5...
  Attempt 4/5...
  Attempt 5/5...
  Episode 1 Report:
    - Grasp Proximity Met:   ✅
    - Successful Grasp Met:  ❌
    - Release Over Tray Met: ✅
    - Final Task Success:    ❌

--- Episode 2/100 ---
  Attempt 1/5...
  Attempt 2/5...
  Attempt 3/5...
  Attempt 4/5...
  Attempt 5/5...
  Episode 2 Report:
    - Grasp Proximity Met:   ❌
    - Successful Grasp Met:  ❌
    - Release Over Tray Met: ✅
    - Final Task Success:    ❌

--- Episode 3/100 ---
  Attempt 1/5...
  Attempt 2/5...
  Attempt 3/5...
  Attempt 4/5...
  Attempt 5/5...
  Episode 3 Report:
    - Grasp Proximity Met:   ✅
    - Successful Grasp Met:  ❌
    - Release Over Tray Met: ❌
    - Final Task Success:    ❌

--- Episode 4/100 ---
  Attempt 1/5...
  Attempt 2/5...
  Attempt 3/5...
  Attempt 4/5...
  Attempt 5/5...
  Episode 4 Report:
    - Grasp Proximity Met:   ✅
    - Successful Grasp Met:  ❌
    - Release Over Tray Met: ❌
    - Final Task Success:    ❌

--- Episode 5/100 ---
  Attempt