In [2]:
!pip install matplotlib
import logging
import time
import torch
import numpy as np
from pathlib import Path
from typing import Dict, Any
from IPython.display import display, clear_output
import matplotlib.pyplot as plt

from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig
from lerobot.robots.so100_follower import SO100Follower, SO100FollowerConfig
from lerobot.policies.factory import make_policy
from lerobot.configs.policies import PreTrainedConfig
from lerobot.utils.control_utils import predict_action
from lerobot.utils.utils import get_safe_torch_device
from lerobot.datasets.utils import build_dataset_frame

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully!")


Collecting matplotlib
  Downloading matplotlib-3.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting contourpy>=1.0.1 (from matplotlib)
  Downloading contourpy-1.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.5 kB)
Collecting cycler>=0.10 (from matplotlib)
  Downloading cycler-0.12.1-py3-none-any.whl.metadata (3.8 kB)
Collecting fonttools>=4.22.0 (from matplotlib)
  Downloading fonttools-4.59.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (107 kB)
Collecting kiwisolver>=1.3.1 (from matplotlib)
  Downloading kiwisolver-1.4.8-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (6.2 kB)
Collecting pyparsing>=2.3.1 (from matplotlib)
  Downloading pyparsing-3.2.3-py3-none-any.whl.metadata (5.0 kB)
Downloading matplotlib-3.10.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.6/8.6 MB[0m [31m48.4 MB/s[

  from .autonotebook import tqdm as notebook_tqdm


✅ All libraries imported successfully!


In [8]:
# Your specific configuration
POLICY_REPO_ID = "adungus/act_all"  # Your HuggingFace model
ROBOT_PORT = "/dev/ttyACM1"         # Your robot port
TASK_DESCRIPTION = "Pick objects and put them in the box"
INFERENCE_DURATION = 30.0           # How long to run inference (seconds)
FPS = 30                           # Control frequency

# Camera configuration - we'll try RealSense first, then fallback to OpenCV
USE_REALSENSE = True               # Set to False to use OpenCV camera directly

print(f"Policy: {POLICY_REPO_ID}")
print(f"Robot port: {ROBOT_PORT}")
print(f"Task: {TASK_DESCRIPTION}")
print(f"Camera type: {'RealSense' if USE_REALSENSE else 'OpenCV'}")


Policy: adungus/act_all
Robot port: /dev/ttyACM1
Task: Pick objects and put them in the box
Camera type: RealSense


In [11]:
def create_robot_config(robot_port: str, use_realsense: bool = True) -> SO100FollowerConfig:
    """Create robot configuration with camera setup."""
    
    if use_realsense:
        # Try RealSense camera configuration
        cameras = {
            "main": RealSenseCameraConfig(
                serial_number_or_name="auto",  # Will use first available RealSense
                width=640,
                height=480,
                fps=30
            )
        }
    else:
        # OpenCV camera configuration (fallback)
        cameras = {
            "main": OpenCVCameraConfig(
                index_or_path=Path("/dev/video0"),  # Your camera device
                width=640,
                height=480,
                fps=30
            )
        }
    
    return SO100FollowerConfig(
        port=robot_port,
        id="inference_robot",
        cameras=cameras
    )

# Create robot configuration
try:
    robot_config = create_robot_config(ROBOT_PORT, USE_REALSENSE)
    robot = SO100Follower(robot_config)
    print("✅ Robot configuration created!")
    print(f"Robot type: {robot.name}")
    print(f"Port: {ROBOT_PORT}")
except Exception as e:
    print(f"❌ Failed to create robot config: {e}")
    if USE_REALSENSE:
        print("Trying OpenCV fallback...")
        robot_config = create_robot_config(ROBOT_PORT, False)
        robot = SO100Follower(robot_config)
        print("✅ Robot configuration created with OpenCV camera!")


❌ Failed to create robot config: name 'rs' is not defined
Trying OpenCV fallback...
✅ Robot configuration created with OpenCV camera!


In [12]:
# Connect to robot
try:
    print("🤖 Connecting to robot...")
    robot.connect()
    print("✅ Robot connected successfully!")
    print(f"Robot is connected: {robot.is_connected}")
    
    # Get observation and action features
    observation_features = robot.observation_features
    action_features = robot.action_features
    print(f"📊 Observation features: {list(observation_features.keys())}")
    print(f"🎮 Action features: {list(action_features.keys())}")
    
except Exception as e:
    print(f"❌ Failed to connect to robot: {e}")
    print("Make sure:")
    print("1. Robot is connected to the correct port")
    print("2. You have permissions to access the port")
    print("3. No other process is using the robot")

# Load pre-trained policy
try:
    print(f"\n🧠 Loading policy from {POLICY_REPO_ID}...")
    
    # Load policy configuration
    policy_config = PreTrainedConfig.from_pretrained(POLICY_REPO_ID)
    print(f"Policy type: {policy_config.type}")
    
    # Create policy
    policy = make_policy(policy_config, ds_meta=None)
    
    # Get device
    device = get_safe_torch_device(policy_config.device if policy_config.device else "auto")
    print(f"✅ Policy loaded successfully on device: {device}")
    
except Exception as e:
    print(f"❌ Failed to load policy: {e}")
    print("Make sure:")
    print("1. You have internet connection")
    print("2. The repository ID is correct")
    print("3. You have access to the repository (if private)")


🤖 Connecting to robot...
❌ Failed to connect to robot: 
Could not connect on port '/dev/ttyACM1'. Make sure you are using the correct port.
Try running `python -m lerobot.find_port`

Make sure:
1. Robot is connected to the correct port
2. You have permissions to access the port
3. No other process is using the robot

🧠 Loading policy from adungus/act_all...
Policy type: act
❌ Failed to load policy: Either one of a dataset metadata or a sim env must be provided.
Make sure:
1. You have internet connection
2. The repository ID is correct
3. You have access to the repository (if private)


In [None]:
# Ask for confirmation before starting
print("⚠️  SAFETY CHECK:")
print("1. Is the robot workspace clear?")
print("2. Are you ready to stop the robot if needed?")
print("3. Is the emergency stop accessible?")

confirmation = input("\nType 'yes' to proceed with inference: ")

if confirmation.lower() == 'yes':
    print(f"🚀 Starting inference for {INFERENCE_DURATION}s...")
    print("Press 'Interrupt' to stop early if needed!")
    
    start_time = time.time()
    loop_duration = 1.0 / FPS
    step_count = 0
    
    try:
        while time.time() - start_time < INFERENCE_DURATION:
            loop_start = time.time()
            
            # Get current observation
            observation = robot.get_observation()
            
            # Convert to policy format
            observation_frame = {}
            for key, value in observation.items():
                observation_frame[f"observation.{key}"] = value
            
            # Predict action
            action_values = predict_action(
                observation_frame,
                policy,
                device,
                policy.config.use_amp,
                task=TASK_DESCRIPTION,
                robot_type=robot.robot_type,
            )
            
            # Convert to robot action format
            action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
            
            # Send action to robot
            sent_action = robot.send_action(action)
            
            step_count += 1
            
            # Show live updates every second
            if step_count % FPS == 0:
                elapsed = time.time() - start_time
                clear_output(wait=True)
                print(f"⏱️  Step {step_count} | Elapsed: {elapsed:.1f}s / {INFERENCE_DURATION}s")
                print(f"Current action: {action}")
                print(f"Progress: {elapsed/INFERENCE_DURATION*100:.1f}%")
                
                # Show progress bar
                progress = int(elapsed / INFERENCE_DURATION * 30)
                bar = "█" * progress + "░" * (30 - progress)
                print(f"[{bar}]")
            
            # Maintain control frequency
            loop_time = time.time() - loop_start
            if loop_time < loop_duration:
                time.sleep(loop_duration - loop_time)
    
    except KeyboardInterrupt:
        print("\n🛑 Inference interrupted by user")
    
    except Exception as e:
        print(f"\n❌ Error during inference: {e}")
        import traceback
        traceback.print_exc()
    
    finally:
        elapsed = time.time() - start_time
        print(f"\n✅ Inference completed!")
        print(f"Total time: {elapsed:.1f}s")
        print(f"Total steps: {step_count}")
        print(f"Average FPS: {step_count/elapsed:.1f}")

else:
    print("❌ Inference cancelled by user")


In [None]:
try:
    print("🧹 Cleaning up...")
    
    if robot.is_connected:
        robot.disconnect()
        print("✅ Robot disconnected successfully")
    else:
        print("ℹ️  Robot was already disconnected")
    
    print("✅ Cleanup complete!")
    
except Exception as e:
    print(f"⚠️  Error during cleanup: {e}")

print("\n🎉 Real-world inference session completed!")
print("\n### Next Steps:")
print("1. **Adjust parameters**: Modify INFERENCE_DURATION, FPS, or camera settings")
print("2. **Try different tasks**: Change TASK_DESCRIPTION to test different behaviors")
print("3. **Analyze performance**: Review the action outputs and robot behavior")
print("4. **Record data**: Use the record command to collect more training data")
