General Embodied Intelligence Robot Framework - VLA Model Supported Agent Digital Worker System
EmbodiedAgentsSys is a ROS2-based general-purpose embodied intelligence robot framework, supporting VLA (Vision-Language-Action) model based Agent digital worker systems.
-
VLA Multi-Model Support
- Adapters for LeRobot, ACT, GR00T and other VLA models
- Unified VLA interface design for easy extension
-
Rich Skills Library
- Atomic skills: grasp, place, reach, joint motion, inspect
- Skill chain orchestration and task planning support
-
Event-Driven Architecture
- Asynchronous non-blocking execution
- Event bus for loose-coupled component communication
-
Task Planning Capabilities
- Rule-based task planning
- LLM-driven intelligent task decomposition
-
Core Execution Loop (Phase 1)
- Hardware abstraction layer: unified arm interface + multi-vendor adapters
- Skills registry + capability gap detection (YAML-driven)
- Scene specification + voice interaction filling
- Dual-format execution plans (YAML machine-readable + Markdown human-readable)
- Automatic failure data recording + training script auto-generation
| Adapter | Description | Status |
|---|---|---|
| VLAAdapterBase | VLA adapter base class | ✅ |
| LeRobotVLAAdapter | LeRobot framework adapter | ✅ |
| ACTVLAAdapter | ACT (Action Chunking Transformer) adapter | ✅ |
| GR00TVLAAdapter | GR00T Diffusion Transformer adapter | ✅ |
| Skill | Description | Status |
|---|---|---|
| GraspSkill | Grasp skill | ✅ |
| PlaceSkill | Place skill | ✅ |
| ReachSkill | Reach skill | ✅ |
| MoveSkill | Joint motion skill | ✅ |
| InspectSkill | Inspect/recognize skill | ✅ |
| AssemblySkill | Assembly skill | ✅ |
| Perception3DSkill | 3D perception skill | ✅ |
| Component | Description | Status |
|---|---|---|
| VoiceCommand | Voice command understanding | ✅ |
| SemanticParser | Semantic parser (LLM enhanced) | ✅ |
| TaskPlanner | Task planner (with execution memory) | ✅ |
| EventBus | Event bus | ✅ |
| DistributedEventBus | Distributed event bus | ✅ |
| SkillGenerator | Skill code generator | ✅ |
| Tool | Description | Status |
|---|---|---|
| AsyncCache | Async cache | ✅ |
| BatchProcessor | Batch processor | ✅ |
| RateLimiter | Rate limiter | ✅ |
| ForceController | Force controller | ✅ |
| Module | Description | Status |
|---|---|---|
| ArmAdapter | Arm abstraction base class (ABC), defines unified interfaces like move_to_pose / move_joints / set_gripper |
✅ |
| AGXArmAdapter | AGX arm adapter (async, supports mock mode) | ✅ |
| LeRobotArmAdapter | LeRobot arm adapter (reuses LeRobotClient) | ✅ |
| RobotCapabilityRegistry | YAML-driven skills registry, supports querying capabilities by robot_type, returns GapType enum |
✅ |
| GapDetectionEngine | Classifies execution plan steps with hard-gap annotations, outputs GapReport |
✅ |
| Module | Description | Status |
|---|---|---|
| SceneSpec | Structured scene description dataclass, supports YAML serialization/deserialization | ✅ |
| PlanGenerator | Wraps TaskPlanner, maps flat actions to dot-notation skill names, outputs YAML + Markdown dual-format execution plans | ✅ |
| VoiceTemplateAgent | Guided voice Q&A, progressively fills SceneSpec fields | ✅ |
| Module | Description | Status |
|---|---|---|
| FailureDataRecorder | Auto-saves metadata.json + scene_spec.yaml + plan.yaml on failure |
✅ |
| TrainingScriptGenerator | Generates dataset requirements report and bash training scripts based on capability gaps | ✅ |
sudo apt install ros-humble-desktopsudo apt install ros-humble-automatika-ros-sugarOr build from source:
git clone https://github.com/automatika-robotics/sugarcoat
cd sugarcoat
pip install -e .pip install -e .from agents.clients.vla_adapters import LeRobotVLAAdapter
# Create LeRobot adapter
adapter = LeRobotVLAAdapter(config={
"policy_name": "panda_policy",
"checkpoint": "lerobot/act_...",
"host": "127.0.0.1",
"port": 8080,
"action_dim": 7
})
adapter.reset()import asyncio
from agents.skills.manipulation import GraspSkill
# Create grasp skill
skill = GraspSkill(
object_name="cube",
vla_adapter=adapter
)
# Prepare observation data
observation = {
"object_detected": True,
"grasp_success": False
}
# Execute skill
result = asyncio.run(skill.execute(observation))
print(f"Status: {result.status}")
print(f"Output: {result.output}")from agents.clients.vla_adapters import LeRobotVLAAdapter
adapter = LeRobotVLAAdapter(config={
"policy_name": "panda_policy",
"checkpoint": "lerobot/act_sim_transfer_cube_human",
"host": "127.0.0.1",
"port": 8080,
"action_dim": 7
})
adapter.reset()
# Generate action
observation = {
"image": image_data,
"joint_positions": joints
}
action = adapter.act(observation, "grasp(object=cube)")
# Execute action
result = adapter.execute(action)from agents.clients.vla_adapters import ACTVLAAdapter
adapter = ACTVLAAdapter(config={
"model_path": "/models/act",
"chunk_size": 100,
"horizon": 1,
"action_dim": 7
})from agents.clients.vla_adapters import GR00TVLAAdapter
adapter = GR00TVLAAdapter(config={
"model_path": "/models/gr00t",
"inference_steps": 10,
"action_dim": 7,
"action_horizon": 8
})from agents.skills.manipulation import GraspSkill
skill = GraspSkill(
object_name="cube",
vla_adapter=adapter
)
# Check preconditions
observation = {"object_detected": True}
if skill.check_preconditions(observation):
result = asyncio.run(skill.execute(observation))from agents.skills.manipulation import PlaceSkill
skill = PlaceSkill(
target_position=[0.5, 0.0, 0.1], # x, y, z
vla_adapter=adapter
)from agents.skills.manipulation import ReachSkill
skill = ReachSkill(
target_position=[0.3, 0.0, 0.2],
vla_adapter=adapter
)from agents.skills.manipulation import MoveSkill
# Joint mode
skill = MoveSkill(
target_joints=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
vla_adapter=adapter
)
# End-effector pose mode
skill = MoveSkill(
target_pose=[0.3, 0.0, 0.2, 0.0, 0.0, 0.0], # x, y, z, roll, pitch, yaw
vla_adapter=adapter
)from agents.skills.manipulation import InspectSkill
skill = InspectSkill(
target_object="cup",
inspection_type="detect", # detect/verify/quality
vla_adapter=adapter
)import asyncio
from agents.skills.manipulation import ReachSkill, GraspSkill, PlaceSkill
async def pick_and_place():
adapter = LeRobotVLAAdapter(config={"action_dim": 7})
# Create skill chain
reach = ReachSkill(target_position=[0.3, 0.0, 0.2], vla_adapter=adapter)
grasp = GraspSkill(object_name="cube", vla_adapter=adapter)
place = PlaceSkill(target_position=[0.5, 0.0, 0.1], vla_adapter=adapter)
# Execute in sequence
observation = await get_observation()
await reach.execute(observation)
await grasp.execute(observation)
await place.execute(observation)
asyncio.run(pick_and_place())from agents.events.bus import EventBus, Event
bus = EventBus()
async def on_skill_started(event: Event):
print(f"Skill started: {event.data}")
# Subscribe to event
bus.subscribe("skill.started", on_skill_started)
# Publish event
await bus.publish(Event(
type="skill.started",
source="agent",
data={"skill": "grasp", "object": "cube"}
))from agents.components.task_planner import TaskPlanner, PlanningStrategy
# Create planner (rule-based)
planner = TaskPlanner(strategy=PlanningStrategy.RULE_BASED)
# Plan task
task = planner.plan("Grasp the cup and place it on the table")
print(f"Task: {task.name}")
print(f"Skills: {task.skills}")
# Output: ['reach', 'grasp', 'reach', 'place']from agents.components.semantic_parser import SemanticParser
# Use LLM enhanced parsing
parser = SemanticParser(use_llm=True, ollama_model="qwen2.5:3b")
# Sync parsing (rule mode)
result = parser.parse("forward 20cm")
# {'intent': 'motion', 'direction': 'forward', 'distance': 0.2}
# Async parsing (LLM mode)
result = await parser.parse_async("move that round part over there")
# {'intent': 'motion', 'params': {'direction': 'forward', ...}}from skills.force_control import ForceController, ForceControlMode
controller = ForceController(
max_force=10.0,
contact_threshold=0.5
)
# Set force control mode
controller.set_mode(ForceControlMode.FORCE)
# Apply force
target_force = np.array([0.0, 0.0, -5.0])
result = await controller.execute(target_force)from agents.utils.performance import AsyncCache, get_cache
cache = get_cache(ttl_seconds=60)
@cache.cached
async def expensive_operation(data):
# Time-consuming operation
return resultfrom agents.utils.performance import BatchProcessor
processor = BatchProcessor(batch_size=10, timeout=0.1)
async def handler(items):
# Batch processing
return [process(item) for item in items]
# Start processing
asyncio.create_task(processor.process(handler))
# Add task
result = await processor.add(item)from skills.teaching.skill_generator import SkillGenerator
generator = SkillGenerator(output_dir="./generated_skills", _simulated=False)
# Generate Skill from teaching action
teaching_action = {
"action_id": "demo_001",
"name": "pick_and_place",
"frames": [
{"joint_positions": [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]},
{"joint_positions": [0.5, 0.2, 0.1, 0.0, 0.0, 0.0, 0.0]},
]
}
result = await generator.generate_skill(
teaching_action=teaching_action,
skill_name="demo_pick_place"
)
# Export to file
export_result = await generator.export_skill(result["skill_id"])
# Generates executable Python fileimport asyncio
from agents.components.scene_spec import SceneSpec
from agents.components.voice_template_agent import VoiceTemplateAgent
# Method 1: Direct SceneSpec construction
scene = SceneSpec(
task_description="Move red part from area A to area B",
robot_type="arm",
objects=["red_part"],
target_positions={"red_part": [0.5, 0.2, 0.1]},
)
# Method 2: Guided voice interaction filling
agent = VoiceTemplateAgent()
scene = asyncio.run(agent.interactive_fill())from agents.components.plan_generator import PlanGenerator
generator = PlanGenerator(backend="mock") # backend="ollama" uses LLM
plan = asyncio.run(generator.generate(scene))
print(plan.yaml_content) # YAML execution plan (machine readable)
print(plan.markdown_report) # Markdown report (human readable)
print(plan.steps) # Step list, each with dot-notation skill name
# e.g. [{'action': 'manipulation.grasp', 'object': 'red_part', ...}]from agents.hardware.capability_registry import RobotCapabilityRegistry, GapType
from agents.hardware.gap_detector import GapDetectionEngine
registry = RobotCapabilityRegistry()
# Query single skill
result = registry.query("manipulation.grasp", robot_type="arm")
print(result.gap_type) # GapType.NONE - supported
result = registry.query("navigation.goto", robot_type="arm")
print(result.gap_type) # GapType.HARD - not supported
# Batch detect gaps for plan steps
engine = GapDetectionEngine(registry)
report = engine.detect(plan.steps, robot_type="arm")
print(report.has_gaps) # True/False
print(report.gap_steps) # List of steps with gaps
annotated = engine.annotate_steps(plan.steps, robot_type="arm")
# Each step gets new status: "pending" or "gap"from agents.data.failure_recorder import FailureDataRecorder
from agents.training.script_generator import TrainingScriptGenerator
# Save scene data on execution failure
recorder = FailureDataRecorder(base_dir="./failure_data")
record_path = asyncio.run(recorder.record(
scene=scene,
plan=plan,
error="manipulation.grasp execution timeout",
))
# Saves: failure_data/<timestamp>/metadata.json + scene_spec.yaml + plan.yaml
# Generate training script based on capability gaps
generator = TrainingScriptGenerator()
config = generator.generate_config(gap_report=report, scene=scene)
script = generator.generate_script(config)
print(script) # bash training script content
req_report = generator.generate_requirements_report(config)
print(req_report) # Dataset requirements report (Markdown)from agents.hardware.agx_arm_adapter import AGXArmAdapter
from agents.hardware.arm_adapter import Pose6D
# Create adapter (mock=True for testing, no real hardware needed)
arm = AGXArmAdapter(host="192.168.1.100", mock=True)
asyncio.run(arm.connect())
# Check ready
ready = asyncio.run(arm.is_ready())
# Move to target pose
pose = Pose6D(x=0.3, y=0.0, z=0.2, roll=0.0, pitch=0.0, yaw=0.0)
success = asyncio.run(arm.move_to_pose(pose, speed=0.1))
# Control gripper
asyncio.run(arm.set_gripper(opening=0.8, force=5.0))
# Query capabilities
caps = arm.get_capabilities()
print(caps.robot_type) # "arm"
print(caps.skill_ids) # ["manipulation.grasp", "manipulation.place", ...]from agents.events.bus import DistributedEventBus
# Create distributed event bus (requires ROS2 node)
bus = DistributedEventBus(ros_node=my_ros_node, namespace="/robots/events")
# Subscribe to event
async def on_robot_status(event):
print(f"Robot status: {event.data}")
bus.subscribe("robot.status", on_robot_status)
# Publish event (automatically broadcast to other ROS2 nodes)
await bus.publish(Event(
type="robot.status",
source="robot_1",
data={"status": "working", "battery": 85}
))lerobot:
policy_name: "default_policy"
checkpoint: null
host: "127.0.0.1"
port: 8080
action_dim: 7
vla_type: "lerobot"
skills:
max_retries: 3
observation_timeout: 5.0agents/
├── clients/
│ ├── vla_adapters/ # VLA adapters
│ │ ├── base.py
│ │ ├── lerobot.py
│ │ ├── act.py
│ │ └── gr00t.py
│ └── ollama.py # Ollama LLM client
├── components/ # Components
│ ├── voice_command.py
│ ├── semantic_parser.py
│ ├── task_planner.py # Contains _SKILL_NAMESPACE_MAP
│ ├── scene_spec.py # [Phase 1] Scene specification dataclass
│ ├── plan_generator.py # [Phase 1] Dual-format execution plan generator
│ └── voice_template_agent.py# [Phase 1] Guided voice interaction filling
├── hardware/ # [Phase 1] Hardware abstraction layer
│ ├── arm_adapter.py # ArmAdapter ABC + Pose6D / RobotState / RobotCapabilities
│ ├── agx_arm_adapter.py # AGX arm adapter
│ ├── lerobot_arm_adapter.py # LeRobot arm adapter
│ ├── capability_registry.py # RobotCapabilityRegistry + GapType enum
│ ├── gap_detector.py # GapDetectionEngine
│ └── skills_registry.yaml # Skills registry (9 skills)
├── data/ # [Phase 1] Data layer
│ └── failure_recorder.py # Automatic failure data recording
├── training/ # [Phase 1] Training layer
│ └── script_generator.py # Training script + dataset requirements report generation
├── skills/
│ ├── vla_skill.py # Skill base class
│ └── manipulation/ # Manipulation skills
│ ├── grasp.py
│ ├── place.py
│ ├── reach.py
│ ├── move.py
│ └── inspect.py
├── events/ # Event system
│ └── bus.py # EventBus + DistributedEventBus
└── utils/ # Utilities
└── performance.py
skills/
├── force_control/ # Force control module
│ └── force_control.py
├── vision/ # Vision skills
│ └── perception_3d_skill.py
└── teaching/ # Teaching module
└── skill_generator.py
tests/ # Tests (57 test cases)
docs/
├── api/ # API documentation
├── guides/ # Usage guides
└── plans/ # Development plans
The Agent Dashboard provides real-time camera preview, scene description and object detection capabilities. Built with React + FastAPI, using local Ollama qwen2.5vl vision model for inference.
Scene Analysis Panel: Real-time preview + qwen2.5vl scene description + object detection confidence
Detection Results: Automatically identifies monitor, folder, computer and other objects on office desk
- USB camera connected (default
/dev/video0) - Ollama installed with vision model pulled:
ollama pull qwen2.5vl
- Python dependencies:
pip install fastapi uvicorn opencv-python ollama
- Node.js dependencies (first run):
cd web-dashboard && npm install
Terminal 1 — Backend (USB camera + qwen2.5vl inference):
cd /path/to/EmbodiedAgentsSys
python examples/agent_dashboard_backend.py
# Backend runs on http://localhost:8000Terminal 2 — Frontend (React dev server):
cd web-dashboard
npx vite
# Frontend runs on http://localhost:5173Open browser at http://localhost:5173
| Sidebar | Feature |
|---|---|
| Camera | Real-time camera preview (~10 fps), start/stop buttons |
| Scene Analysis | Real-time preview + click "Scene Analysis" to call qwen2.5vl, returns scene description and object list |
| Detection | Table showing detected objects and confidence scores |
| Chat | Text interaction with backend Agent |
Backend provides the following REST endpoints (port 8000):
| Method | Path | Description |
|---|---|---|
| GET | /api/camera/frame |
Get current frame (base64 JPEG) |
| POST | /api/scene/describe |
Trigger qwen2.5vl scene understanding, returns description and object list |
| GET | /api/detection/result |
Get latest object detection results |
| GET | /healthz |
Health check |
MIT License - Copyright (c) 2024-2026