# High-Level Reinforcement Learning for Unitree G1

This notebook implements a **high-level RL framework** for the Unitree G1 humanoid robot, focusing on:

1. **State Space:** Robot observations + USB controller commands
2. **Action Space:** Abstract high-level actions (path planning, gestures, audio)
3. **Reward Design:** Task-oriented rewards for real-world applications
4. **Training Pipeline:** From simulation to real robot deployment

## Key Concepts

**High-Level RL** operates at the **task planning level**, abstracting away low-level motor control:

```
Traditional RL:        State ‚Üí Neural Network ‚Üí Joint Torques
                       (Low-level, 29-DOF control)

High-Level RL:         State ‚Üí Neural Network ‚Üí Abstract Actions
                       (Task-level: "walk to X", "wave hand", "say hello")
                       
                       Abstract Actions ‚Üí Motion Primitives ‚Üí Joint Commands
```

**Benefits:**
- Faster learning (smaller action space)
- Better generalization
- Safer (primitives are pre-validated)
- Human-interpretable actions

---

## Table of Contents

1. [State Space Design](#state-space)
2. [USB Controller Integration](#controller)
3. [High-Level Action Space](#actions)
4. [Motion Primitives Library](#primitives)
5. [Reward Function Design](#rewards)
6. [Environment Implementation](#environment)
7. [Training with PPO](#training)
8. [Deployment to Real Robot](#deployment)
9. [Example Applications](#applications)

---

## Setup & Dependencies

In [None]:
# Install dependencies
!pip install gymnasium stable-baselines3 torch numpy matplotlib
!pip install pygame  # For USB controller support
!pip install pynput  # Alternative: keyboard/mouse input

In [None]:
# Standard imports
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from enum import Enum
import time
import json

# RL frameworks
import gymnasium as gym
from gymnasium import spaces
import torch
import torch.nn as nn

try:
    from stable_baselines3 import PPO
    from stable_baselines3.common.vec_env import DummyVecEnv
    from stable_baselines3.common.callbacks import BaseCallback
    SB3_AVAILABLE = True
except ImportError:
    print("Warning: stable-baselines3 not available")
    SB3_AVAILABLE = False

# USB Controller
try:
    import pygame
    PYGAME_AVAILABLE = True
except ImportError:
    print("Warning: pygame not available (USB controller input disabled)")
    PYGAME_AVAILABLE = False

# Unitree SDK
try:
    from unitree_sdk2py.core.channel import ChannelSubscriber, ChannelPublisher, ChannelFactoryInitialize
    from unitree_sdk2py.idl.unitree_hg.msg.dds_ import LowCmd_, LowState_
    from unitree_sdk2py.g1.loco.g1_loco_client import LocoClient
    from unitree_sdk2py.g1.audio.g1_audio_client import AudioClient
    from unitree_sdk2py.g1.arm.g1_arm_action_client import G1ArmActionClient, action_map
    UNITREE_SDK_AVAILABLE = True
except ImportError:
    print("Warning: Unitree SDK not available (simulation only)")
    UNITREE_SDK_AVAILABLE = False

print("‚úì Imports complete")
print(f"  - PyGame (Controller): {PYGAME_AVAILABLE}")
print(f"  - Stable-Baselines3: {SB3_AVAILABLE}")
print(f"  - Unitree SDK: {UNITREE_SDK_AVAILABLE}")

---
<a id='state-space'></a>
## 1. State Space Design

The state space combines **robot observations** and **human commands** from a USB controller.

### State Components

#### A. Robot Observations (Proprioceptive)
- **Base Orientation:** Roll, pitch, yaw (3D)
- **Base Angular Velocity:** œâx, œây, œâz (3D)
- **Base Linear Velocity:** vx, vy, vz (3D)
- **Joint Positions:** 29 joint angles (29D)
- **Joint Velocities:** 29 joint velocities (29D)
- **Contact State:** Foot contact sensors (2D - left/right)

#### B. Robot Observations (Exteroceptive)
- **Goal Direction:** Relative vector to target (3D)
- **Goal Distance:** Scalar distance to target (1D)
- **Obstacle Proximity:** Nearest obstacle distance in 8 directions (8D)

#### C. Human Commands (USB Controller)
- **Velocity Command:** Desired (vx, vy, œâ) from joystick (3D)
- **Mode Selector:** Discrete mode (walk/run/idle) (1D)
- **Gesture Trigger:** Which gesture to perform (1D)

**Total State Dimension:** 3 + 3 + 3 + 29 + 29 + 2 + 3 + 1 + 8 + 3 + 1 + 1 = **86 dimensions**

In [None]:
@dataclass
class RobotState:
    """Complete state representation for high-level RL"""
    
    # Proprioceptive (internal sensors)
    base_orientation: np.ndarray  # [roll, pitch, yaw] in radians
    base_angular_velocity: np.ndarray  # [wx, wy, wz] in rad/s
    base_linear_velocity: np.ndarray  # [vx, vy, vz] in m/s
    joint_positions: np.ndarray  # 29 joint angles in radians
    joint_velocities: np.ndarray  # 29 joint velocities in rad/s
    foot_contacts: np.ndarray  # [left_contact, right_contact] (0 or 1)
    
    # Exteroceptive (external sensors)
    goal_direction: np.ndarray  # Unit vector pointing to goal [x, y, z]
    goal_distance: float  # Euclidean distance to goal in meters
    obstacle_proximity: np.ndarray  # Distance to nearest obstacle in 8 directions
    
    # Human commands (from USB controller)
    cmd_velocity: np.ndarray  # Desired [vx, vy, omega] from joystick
    cmd_mode: int  # 0=idle, 1=walk, 2=run
    cmd_gesture: int  # Gesture ID (0=none, 1=wave, 2=handshake, etc.)
    
    def to_array(self) -> np.ndarray:
        """Convert state to flat numpy array for RL"""
        return np.concatenate([
            self.base_orientation,
            self.base_angular_velocity,
            self.base_linear_velocity,
            self.joint_positions,
            self.joint_velocities,
            self.foot_contacts,
            self.goal_direction,
            [self.goal_distance],
            self.obstacle_proximity,
            self.cmd_velocity,
            [self.cmd_mode],
            [self.cmd_gesture]
        ])
    
    @property
    def dimension(self) -> int:
        """Total state dimension"""
        return len(self.to_array())

# Example state initialization
example_state = RobotState(
    base_orientation=np.zeros(3),
    base_angular_velocity=np.zeros(3),
    base_linear_velocity=np.zeros(3),
    joint_positions=np.zeros(29),
    joint_velocities=np.zeros(29),
    foot_contacts=np.array([1, 1]),  # Both feet on ground
    goal_direction=np.array([1, 0, 0]),  # Goal straight ahead
    goal_distance=5.0,  # 5 meters away
    obstacle_proximity=np.ones(8) * 10.0,  # No nearby obstacles
    cmd_velocity=np.zeros(3),
    cmd_mode=1,  # Walk mode
    cmd_gesture=0  # No gesture
)

print(f"State dimension: {example_state.dimension}")
print(f"State vector shape: {example_state.to_array().shape}")

---
<a id='controller'></a>
## 2. USB Controller Integration

We'll use a standard USB game controller (e.g., Xbox, PlayStation) to provide human commands.

### Controller Mapping

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ        USB Game Controller              ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ                                         ‚îÇ
‚îÇ  Left Stick:  ‚Üí Forward/Backward (vx)  ‚îÇ
‚îÇ               ‚Üí Strafe Left/Right (vy)  ‚îÇ
‚îÇ                                         ‚îÇ
‚îÇ  Right Stick: ‚Üí Rotate Left/Right (œâ)  ‚îÇ
‚îÇ                                         ‚îÇ
‚îÇ  Buttons:                               ‚îÇ
‚îÇ    A/X     ‚Üí Walk Mode                  ‚îÇ
‚îÇ    B/Circle ‚Üí Run Mode                   ‚îÇ
‚îÇ    Y/Triangle ‚Üí Idle/Stop                ‚îÇ
‚îÇ    LB      ‚Üí Wave Hand                  ‚îÇ
‚îÇ    RB      ‚Üí Handshake                  ‚îÇ
‚îÇ    LT      ‚Üí Clap                       ‚îÇ
‚îÇ    Start   ‚Üí Emergency Stop             ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

In [None]:
class USBControllerInput:
    """Read commands from USB game controller (Xbox/PlayStation compatible)"""
    
    def __init__(self):
        self.available = False
        self.joystick = None
        
        if PYGAME_AVAILABLE:
            try:
                pygame.init()
                pygame.joystick.init()
                
                if pygame.joystick.get_count() > 0:
                    self.joystick = pygame.joystick.Joystick(0)
                    self.joystick.init()
                    self.available = True
                    print(f"‚úì Controller connected: {self.joystick.get_name()}")
                    print(f"  Axes: {self.joystick.get_numaxes()}")
                    print(f"  Buttons: {self.joystick.get_numbuttons()}")
                else:
                    print("‚ö†Ô∏è  No USB controller detected")
            except Exception as e:
                print(f"‚ö†Ô∏è  Controller initialization failed: {e}")
        
        # Default command values
        self.cmd_velocity = np.zeros(3)
        self.cmd_mode = 0
        self.cmd_gesture = 0
        self.emergency_stop = False
    
    def read_commands(self) -> Dict[str, Any]:
        """
        Read current controller state
        
        Returns:
            commands: Dictionary with velocity, mode, gesture commands
        """
        if not self.available:
            # Return default commands if no controller
            return {
                'velocity': self.cmd_velocity,
                'mode': self.cmd_mode,
                'gesture': self.cmd_gesture,
                'emergency_stop': False
            }
        
        # Process pygame events
        pygame.event.pump()
        
        # Read axes (joysticks)
        # Left stick: axis 0 (left/right), axis 1 (up/down)
        # Right stick: axis 3 (left/right)
        left_x = self.joystick.get_axis(0) if self.joystick.get_numaxes() > 0 else 0.0
        left_y = -self.joystick.get_axis(1) if self.joystick.get_numaxes() > 1 else 0.0  # Invert Y
        right_x = self.joystick.get_axis(3) if self.joystick.get_numaxes() > 3 else 0.0
        
        # Apply deadzone (ignore small movements)
        deadzone = 0.15
        left_x = left_x if abs(left_x) > deadzone else 0.0
        left_y = left_y if abs(left_y) > deadzone else 0.0
        right_x = right_x if abs(right_x) > deadzone else 0.0
        
        # Map to velocity commands
        # Scale: joystick [-1, 1] ‚Üí velocity [max_speed]
        max_linear = 0.5  # m/s
        max_angular = 0.8  # rad/s
        
        self.cmd_velocity = np.array([
            left_y * max_linear,   # Forward/backward
            left_x * max_linear,   # Strafe left/right
            right_x * max_angular  # Rotate
        ])
        
        # Read buttons
        button_a = self.joystick.get_button(0) if self.joystick.get_numbuttons() > 0 else 0
        button_b = self.joystick.get_button(1) if self.joystick.get_numbuttons() > 1 else 0
        button_y = self.joystick.get_button(3) if self.joystick.get_numbuttons() > 3 else 0
        button_lb = self.joystick.get_button(4) if self.joystick.get_numbuttons() > 4 else 0
        button_rb = self.joystick.get_button(5) if self.joystick.get_numbuttons() > 5 else 0
        button_lt = self.joystick.get_button(6) if self.joystick.get_numbuttons() > 6 else 0
        button_start = self.joystick.get_button(7) if self.joystick.get_numbuttons() > 7 else 0
        
        # Mode selection
        if button_a:
            self.cmd_mode = 1  # Walk
        elif button_b:
            self.cmd_mode = 2  # Run
        elif button_y:
            self.cmd_mode = 0  # Idle
        
        # Gesture selection
        if button_lb:
            self.cmd_gesture = 1  # Wave
        elif button_rb:
            self.cmd_gesture = 2  # Handshake
        elif button_lt:
            self.cmd_gesture = 3  # Clap
        else:
            self.cmd_gesture = 0  # None
        
        # Emergency stop
        self.emergency_stop = bool(button_start)
        
        return {
            'velocity': self.cmd_velocity.copy(),
            'mode': self.cmd_mode,
            'gesture': self.cmd_gesture,
            'emergency_stop': self.emergency_stop
        }
    
    def get_command_string(self) -> str:
        """Human-readable command description"""
        mode_names = ['IDLE', 'WALK', 'RUN']
        gesture_names = ['None', 'Wave', 'Handshake', 'Clap']
        
        return (
            f"Mode: {mode_names[self.cmd_mode]} | "
            f"Vel: [{self.cmd_velocity[0]:.2f}, {self.cmd_velocity[1]:.2f}, {self.cmd_velocity[2]:.2f}] | "
            f"Gesture: {gesture_names[self.cmd_gesture]}"
        )

# Test controller
controller = USBControllerInput()

if controller.available:
    print("\nTesting controller (move sticks and press buttons)...")
    for i in range(50):  # Read for 5 seconds
        commands = controller.read_commands()
        print(f"\r{controller.get_command_string()}", end='', flush=True)
        time.sleep(0.1)
    print("\n‚úì Controller test complete")
else:
    print("\n‚ö†Ô∏è  Controller not available - will use simulated commands")

---
<a id='actions'></a>
## 3. High-Level Action Space

Instead of directly controlling 29 joints, we define **high-level abstract actions**:

### Action Space Design

#### Discrete Actions (10 total)

| Action ID | Name | Description |
|-----------|------|-------------|
| 0 | `IDLE` | Stand still, damping mode |
| 1 | `WALK_FORWARD` | Walk forward at commanded velocity |
| 2 | `WALK_BACKWARD` | Walk backward |
| 3 | `STRAFE_LEFT` | Sidestep left |
| 4 | `STRAFE_RIGHT` | Sidestep right |
| 5 | `ROTATE_LEFT` | Turn left in place |
| 6 | `ROTATE_RIGHT` | Turn right in place |
| 7 | `GESTURE_WAVE` | Perform wave gesture |
| 8 | `GESTURE_HANDSHAKE` | Perform handshake gesture |
| 9 | `GESTURE_CLAP` | Perform clap gesture |

#### Continuous Parameters (3D)

- **Velocity Scale:** [0, 1] - How fast to execute action
- **Direction Bias:** [-1, 1] - Fine-tune direction
- **Audio Volume:** [0, 1] - Volume for audio feedback

**Total Action Space:** Discrete(10) + Box(3)

In [None]:
class HighLevelAction(Enum):
    """Enumeration of high-level actions"""
    IDLE = 0
    WALK_FORWARD = 1
    WALK_BACKWARD = 2
    STRAFE_LEFT = 3
    STRAFE_RIGHT = 4
    ROTATE_LEFT = 5
    ROTATE_RIGHT = 6
    GESTURE_WAVE = 7
    GESTURE_HANDSHAKE = 8
    GESTURE_CLAP = 9

@dataclass
class ActionCommand:
    """Complete action specification"""
    action_type: HighLevelAction
    velocity_scale: float  # [0, 1]
    direction_bias: float  # [-1, 1]
    audio_volume: float  # [0, 1]
    
    def to_motion_command(self) -> Dict[str, Any]:
        """
        Convert high-level action to low-level motion command
        
        Returns:
            motion_cmd: Dictionary with target velocities and gesture flags
        """
        # Base velocities for each action type
        action_velocities = {
            HighLevelAction.IDLE: (0.0, 0.0, 0.0),
            HighLevelAction.WALK_FORWARD: (0.3, 0.0, 0.0),
            HighLevelAction.WALK_BACKWARD: (-0.2, 0.0, 0.0),
            HighLevelAction.STRAFE_LEFT: (0.0, 0.2, 0.0),
            HighLevelAction.STRAFE_RIGHT: (0.0, -0.2, 0.0),
            HighLevelAction.ROTATE_LEFT: (0.0, 0.0, 0.5),
            HighLevelAction.ROTATE_RIGHT: (0.0, 0.0, -0.5),
            HighLevelAction.GESTURE_WAVE: (0.0, 0.0, 0.0),
            HighLevelAction.GESTURE_HANDSHAKE: (0.0, 0.0, 0.0),
            HighLevelAction.GESTURE_CLAP: (0.0, 0.0, 0.0),
        }
        
        vx, vy, vw = action_velocities[self.action_type]
        
        # Apply velocity scaling
        vx *= self.velocity_scale
        vy *= self.velocity_scale
        vw *= self.velocity_scale
        
        # Apply direction bias (adds lateral component)
        vy += self.direction_bias * 0.1
        
        # Determine if gesture should be executed
        is_gesture = self.action_type.value >= 7
        gesture_id = self.action_type.value - 7 if is_gesture else None
        
        return {
            'velocity': np.array([vx, vy, vw]),
            'is_gesture': is_gesture,
            'gesture_id': gesture_id,
            'audio_volume': self.audio_volume
        }
    
    def get_description(self) -> str:
        """Human-readable action description"""
        return (
            f"{self.action_type.name} "
            f"(speed={self.velocity_scale:.2f}, "
            f"bias={self.direction_bias:.2f}, "
            f"volume={self.audio_volume:.2f})"
        )

# Example actions
example_actions = [
    ActionCommand(HighLevelAction.WALK_FORWARD, 0.8, 0.0, 0.5),
    ActionCommand(HighLevelAction.ROTATE_LEFT, 0.5, 0.0, 0.3),
    ActionCommand(HighLevelAction.GESTURE_WAVE, 1.0, 0.0, 0.8),
]

print("Example high-level actions:\n")
for action in example_actions:
    print(f"  {action.get_description()}")
    motion_cmd = action.to_motion_command()
    print(f"    ‚Üí Velocity: {motion_cmd['velocity']}")
    print(f"    ‚Üí Gesture: {motion_cmd['is_gesture']}\n")

---
<a id='primitives'></a>
## 4. Motion Primitives Library

Motion primitives are **pre-validated movement sequences** that abstract low-level control.

In [None]:
class MotionPrimitive:
    """Base class for motion primitives"""
    
    def __init__(self, name: str, duration: float):
        self.name = name
        self.duration = duration  # seconds
        self.is_executing = False
        self.start_time = None
    
    def execute(self, robot_interface) -> bool:
        """
        Execute the motion primitive
        
        Args:
            robot_interface: Connection to real or simulated robot
        
        Returns:
            completed: True if primitive finished
        """
        raise NotImplementedError
    
    def reset(self):
        """Reset primitive state"""
        self.is_executing = False
        self.start_time = None

class LocomotionPrimitive(MotionPrimitive):
    """Locomotion primitive: walk with specified velocity"""
    
    def __init__(self, name: str, target_velocity: np.ndarray, duration: float = 1.0):
        super().__init__(name, duration)
        self.target_velocity = target_velocity  # [vx, vy, vw]
    
    def execute(self, robot_interface) -> bool:
        if not self.is_executing:
            self.is_executing = True
            self.start_time = time.time()
        
        elapsed = time.time() - self.start_time
        
        if elapsed < self.duration:
            # Send velocity command
            if hasattr(robot_interface, 'send_velocity'):
                robot_interface.send_velocity(self.target_velocity)
            return False  # Still executing
        else:
            # Completed
            self.reset()
            return True

class GesturePrimitive(MotionPrimitive):
    """Gesture primitive: pre-defined arm motion"""
    
    def __init__(self, name: str, gesture_id: int, duration: float = 3.0):
        super().__init__(name, duration)
        self.gesture_id = gesture_id
    
    def execute(self, robot_interface) -> bool:
        if not self.is_executing:
            self.is_executing = True
            self.start_time = time.time()
            
            # Trigger gesture
            if hasattr(robot_interface, 'execute_gesture'):
                robot_interface.execute_gesture(self.gesture_id)
        
        elapsed = time.time() - self.start_time
        
        if elapsed >= self.duration:
            self.reset()
            return True
        return False

class AudioPrimitive(MotionPrimitive):
    """Audio primitive: play sound or TTS"""
    
    def __init__(self, name: str, message: str, volume: float = 0.5):
        super().__init__(name, duration=2.0)
        self.message = message
        self.volume = volume
    
    def execute(self, robot_interface) -> bool:
        if not self.is_executing:
            self.is_executing = True
            self.start_time = time.time()
            
            # Play audio
            if hasattr(robot_interface, 'speak'):
                robot_interface.speak(self.message, self.volume)
        
        elapsed = time.time() - self.start_time
        
        if elapsed >= self.duration:
            self.reset()
            return True
        return False

class MotionPrimitiveLibrary:
    """Library of pre-defined motion primitives"""
    
    def __init__(self):
        self.primitives = {}
        self._initialize_primitives()
    
    def _initialize_primitives(self):
        """Create standard motion primitives"""
        # Locomotion primitives
        self.primitives['walk_forward'] = LocomotionPrimitive(
            'walk_forward', np.array([0.3, 0, 0]), duration=1.0
        )
        self.primitives['walk_backward'] = LocomotionPrimitive(
            'walk_backward', np.array([-0.2, 0, 0]), duration=1.0
        )
        self.primitives['strafe_left'] = LocomotionPrimitive(
            'strafe_left', np.array([0, 0.2, 0]), duration=1.0
        )
        self.primitives['strafe_right'] = LocomotionPrimitive(
            'strafe_right', np.array([0, -0.2, 0]), duration=1.0
        )
        self.primitives['rotate_left'] = LocomotionPrimitive(
            'rotate_left', np.array([0, 0, 0.5]), duration=1.0
        )
        self.primitives['rotate_right'] = LocomotionPrimitive(
            'rotate_right', np.array([0, 0, -0.5]), duration=1.0
        )
        
        # Gesture primitives
        self.primitives['wave'] = GesturePrimitive('wave', gesture_id=0, duration=3.0)
        self.primitives['handshake'] = GesturePrimitive('handshake', gesture_id=1, duration=4.0)
        self.primitives['clap'] = GesturePrimitive('clap', gesture_id=2, duration=3.0)
        
        # Audio primitives
        self.primitives['greet'] = AudioPrimitive('greet', "Hello, nice to meet you!")
        self.primitives['acknowledge'] = AudioPrimitive('acknowledge', "Understood")
    
    def get_primitive(self, name: str) -> Optional[MotionPrimitive]:
        """Retrieve primitive by name"""
        return self.primitives.get(name)
    
    def list_primitives(self) -> List[str]:
        """List all available primitives"""
        return list(self.primitives.keys())

# Initialize library
motion_lib = MotionPrimitiveLibrary()
print("Motion Primitive Library initialized\n")
print("Available primitives:")
for name in motion_lib.list_primitives():
    primitive = motion_lib.get_primitive(name)
    print(f"  - {name:20s} (duration: {primitive.duration:.1f}s)")

---
<a id='rewards'></a>
## 5. Reward Function Design

A well-designed reward function is critical for RL success. We'll use a **simple, sparse reward** approach.

### Reward Components

#### 1. Goal Reaching Reward (+100)
- Large positive reward when robot reaches goal
- Encourages task completion

#### 2. Progress Reward (+0.1 per step)
- Small reward for moving closer to goal
- Provides learning signal before goal is reached

#### 3. Command Following Reward (+0.5)
- Reward for following human controller commands
- Encourages responsive behavior

#### 4. Stability Penalty (-1.0 per fall)
- Negative reward if robot falls (large tilt)
- Ensures safety

#### 5. Collision Penalty (-0.5)
- Penalty for hitting obstacles
- Encourages obstacle avoidance

#### 6. Energy Penalty (-0.001 per action)
- Small penalty for movement
- Encourages efficiency

**Total Reward:**
```
r = r_goal + r_progress + r_command + r_stability + r_collision + r_energy
```

In [None]:
class RewardFunction:
    """Compute reward for high-level RL"""
    
    def __init__(self):
        # Reward weights
        self.w_goal = 100.0
        self.w_progress = 0.1
        self.w_command = 0.5
        self.w_stability = -1.0
        self.w_collision = -0.5
        self.w_energy = -0.001
        
        # Thresholds
        self.goal_radius = 0.5  # meters
        self.fall_angle = 0.5  # radians (~30 degrees)
        self.collision_distance = 0.3  # meters
        
        # Tracking
        self.previous_goal_distance = None
    
    def compute_reward(
        self,
        state: RobotState,
        action: ActionCommand,
        next_state: RobotState
    ) -> Tuple[float, Dict[str, float]]:
        """
        Compute reward for state-action-next_state transition
        
        Returns:
            total_reward: Scalar reward
            reward_breakdown: Dictionary with individual components
        """
        rewards = {}
        
        # 1. Goal reaching
        reached_goal = next_state.goal_distance < self.goal_radius
        rewards['goal'] = self.w_goal if reached_goal else 0.0
        
        # 2. Progress toward goal
        if self.previous_goal_distance is not None:
            progress = self.previous_goal_distance - next_state.goal_distance
            rewards['progress'] = self.w_progress * progress
        else:
            rewards['progress'] = 0.0
        
        self.previous_goal_distance = next_state.goal_distance
        
        # 3. Command following (alignment with human commands)
        # Measure how well action aligns with commanded velocity
        motion_cmd = action.to_motion_command()
        commanded_vel = state.cmd_velocity
        actual_vel = motion_cmd['velocity']
        
        # Cosine similarity (if non-zero commands)
        if np.linalg.norm(commanded_vel) > 0.01:
            alignment = np.dot(commanded_vel, actual_vel) / (
                np.linalg.norm(commanded_vel) * (np.linalg.norm(actual_vel) + 1e-6)
            )
            rewards['command'] = self.w_command * max(0, alignment)
        else:
            rewards['command'] = 0.0
        
        # 4. Stability penalty
        roll, pitch, _ = next_state.base_orientation
        fell = abs(roll) > self.fall_angle or abs(pitch) > self.fall_angle
        rewards['stability'] = self.w_stability if fell else 0.0
        
        # 5. Collision penalty
        min_obstacle_dist = np.min(next_state.obstacle_proximity)
        collided = min_obstacle_dist < self.collision_distance
        rewards['collision'] = self.w_collision if collided else 0.0
        
        # 6. Energy penalty (encourage efficiency)
        action_magnitude = np.linalg.norm(motion_cmd['velocity'])
        rewards['energy'] = self.w_energy * action_magnitude
        
        # Total reward
        total_reward = sum(rewards.values())
        
        return total_reward, rewards
    
    def reset(self):
        """Reset tracking variables"""
        self.previous_goal_distance = None

# Example reward computation
reward_fn = RewardFunction()

# Simulate scenario: robot moving toward goal
state = RobotState(
    base_orientation=np.array([0.05, 0.02, 0.0]),
    base_angular_velocity=np.zeros(3),
    base_linear_velocity=np.array([0.3, 0, 0]),
    joint_positions=np.zeros(29),
    joint_velocities=np.zeros(29),
    foot_contacts=np.array([1, 1]),
    goal_direction=np.array([1, 0, 0]),
    goal_distance=5.0,
    obstacle_proximity=np.ones(8) * 10.0,
    cmd_velocity=np.array([0.3, 0, 0]),
    cmd_mode=1,
    cmd_gesture=0
)

action = ActionCommand(HighLevelAction.WALK_FORWARD, 0.8, 0.0, 0.5)

next_state = RobotState(
    base_orientation=np.array([0.03, 0.01, 0.0]),
    base_angular_velocity=np.zeros(3),
    base_linear_velocity=np.array([0.3, 0, 0]),
    joint_positions=np.zeros(29),
    joint_velocities=np.zeros(29),
    foot_contacts=np.array([1, 1]),
    goal_direction=np.array([1, 0, 0]),
    goal_distance=4.7,  # Moved 0.3m closer
    obstacle_proximity=np.ones(8) * 10.0,
    cmd_velocity=np.array([0.3, 0, 0]),
    cmd_mode=1,
    cmd_gesture=0
)

reward, breakdown = reward_fn.compute_reward(state, action, next_state)

print("Reward Computation Example:\n")
print(f"Action: {action.get_description()}")
print(f"\nReward Breakdown:")
for component, value in breakdown.items():
    print(f"  {component:15s}: {value:+7.4f}")
print(f"\nTotal Reward: {reward:+7.4f}")

---
<a id='environment'></a>
## 6. Gymnasium Environment Implementation

We'll implement a Gymnasium environment that wraps everything together.

In [None]:
class G1HighLevelEnv(gym.Env):
    """
    High-level RL environment for Unitree G1
    
    State: 86D (robot obs + controller commands)
    Action: Discrete(10) + Box(3) (high-level actions + parameters)
    """
    
    def __init__(self, use_controller: bool = True, simulation: bool = True):
        super().__init__()
        
        self.use_controller = use_controller
        self.simulation = simulation
        
        # Initialize components
        self.controller = USBControllerInput() if use_controller else None
        self.motion_lib = MotionPrimitiveLibrary()
        self.reward_fn = RewardFunction()
        
        # Define spaces
        self.observation_space = spaces.Box(
            low=-np.inf,
            high=np.inf,
            shape=(86,),
            dtype=np.float32
        )
        
        # Action space: discrete action + continuous parameters
        self.action_space = spaces.Dict({
            'action_type': spaces.Discrete(10),
            'parameters': spaces.Box(
                low=np.array([0.0, -1.0, 0.0]),
                high=np.array([1.0, 1.0, 1.0]),
                dtype=np.float32
            )
        })
        
        # Episode parameters
        self.max_steps = 1000
        self.current_step = 0
        
        # Robot state (initialized in reset)
        self.state = None
        self.goal_position = np.array([5.0, 0.0, 0.0])  # 5m ahead
    
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        
        # Reset step counter
        self.current_step = 0
        
        # Reset reward function
        self.reward_fn.reset()
        
        # Initialize robot state
        self.state = RobotState(
            base_orientation=np.zeros(3),
            base_angular_velocity=np.zeros(3),
            base_linear_velocity=np.zeros(3),
            joint_positions=np.zeros(29),
            joint_velocities=np.zeros(29),
            foot_contacts=np.array([1, 1]),
            goal_direction=self.goal_position / np.linalg.norm(self.goal_position),
            goal_distance=np.linalg.norm(self.goal_position),
            obstacle_proximity=np.ones(8) * 10.0,
            cmd_velocity=np.zeros(3),
            cmd_mode=1,
            cmd_gesture=0
        )
        
        # Read controller if available
        if self.controller and self.controller.available:
            commands = self.controller.read_commands()
            self.state.cmd_velocity = commands['velocity']
            self.state.cmd_mode = commands['mode']
            self.state.cmd_gesture = commands['gesture']
        
        return self.state.to_array(), {}
    
    def step(self, action):
        # Parse action
        action_type = HighLevelAction(action['action_type'])
        velocity_scale, direction_bias, audio_volume = action['parameters']
        
        action_cmd = ActionCommand(
            action_type=action_type,
            velocity_scale=float(velocity_scale),
            direction_bias=float(direction_bias),
            audio_volume=float(audio_volume)
        )
        
        # Execute action (simulate robot dynamics)
        next_state = self._simulate_step(action_cmd)
        
        # Read new controller commands
        if self.controller and self.controller.available:
            commands = self.controller.read_commands()
            next_state.cmd_velocity = commands['velocity']
            next_state.cmd_mode = commands['mode']
            next_state.cmd_gesture = commands['gesture']
        
        # Compute reward
        reward, reward_breakdown = self.reward_fn.compute_reward(
            self.state, action_cmd, next_state
        )
        
        # Check termination
        self.current_step += 1
        
        goal_reached = next_state.goal_distance < self.reward_fn.goal_radius
        fell = abs(next_state.base_orientation[0]) > self.reward_fn.fall_angle or \
               abs(next_state.base_orientation[1]) > self.reward_fn.fall_angle
        
        terminated = goal_reached or fell
        truncated = self.current_step >= self.max_steps
        
        # Update state
        self.state = next_state
        
        info = {
            'goal_reached': goal_reached,
            'fell': fell,
            'goal_distance': next_state.goal_distance,
            'reward_breakdown': reward_breakdown
        }
        
        return next_state.to_array(), reward, terminated, truncated, info
    
    def _simulate_step(self, action: ActionCommand) -> RobotState:
        """
        Simulate robot dynamics for one timestep
        
        In real deployment, this would be replaced with actual robot interface
        """
        dt = 0.1  # 10 Hz control
        
        # Get motion command
        motion_cmd = action.to_motion_command()
        velocity = motion_cmd['velocity']
        
        # Simple kinematic update (placeholder physics)
        current_pos = self.goal_position - self.state.goal_direction * self.state.goal_distance
        
        # Update position based on velocity
        new_pos = current_pos + velocity[:2] * dt  # Only x, y (2D navigation)
        
        # Update goal distance
        new_goal_vector = self.goal_position[:2] - new_pos
        new_goal_distance = np.linalg.norm(new_goal_vector)
        new_goal_direction = np.append(
            new_goal_vector / (new_goal_distance + 1e-6), [0]
        )  # Normalize + add z=0
        
        # Add small noise to orientation (simulate walking dynamics)
        orientation_noise = np.random.normal(0, 0.02, size=3)
        new_orientation = self.state.base_orientation + orientation_noise
        
        # Create next state
        next_state = RobotState(
            base_orientation=new_orientation,
            base_angular_velocity=self.state.base_angular_velocity * 0.9,  # Damping
            base_linear_velocity=velocity,
            joint_positions=self.state.joint_positions,  # Simplified
            joint_velocities=self.state.joint_velocities * 0.9,
            foot_contacts=self.state.foot_contacts,
            goal_direction=new_goal_direction,
            goal_distance=new_goal_distance,
            obstacle_proximity=self.state.obstacle_proximity,
            cmd_velocity=self.state.cmd_velocity,
            cmd_mode=self.state.cmd_mode,
            cmd_gesture=self.state.cmd_gesture
        )
        
        return next_state
    
    def render(self):
        """Optional rendering (not implemented)"""
        pass
    
    def close(self):
        """Cleanup"""
        if self.controller and PYGAME_AVAILABLE:
            pygame.quit()

# Test environment
print("Creating high-level RL environment...")
env = G1HighLevelEnv(use_controller=False, simulation=True)

print(f"\nObservation space: {env.observation_space}")
print(f"Action space: {env.action_space}")

# Test episode
obs, info = env.reset()
print(f"\nInitial observation shape: {obs.shape}")

for i in range(5):
    action = {
        'action_type': 1,  # WALK_FORWARD
        'parameters': np.array([0.8, 0.0, 0.5])
    }
    obs, reward, terminated, truncated, info = env.step(action)
    print(f"Step {i+1}: reward={reward:.4f}, goal_dist={info['goal_distance']:.2f}m")
    
    if terminated or truncated:
        break

print("\n‚úì Environment test complete")

---
<a id='training'></a>
## 7. Training with PPO

Train the policy using Proximal Policy Optimization (PPO).

In [None]:
# Custom wrapper to flatten Dict action space for SB3
class FlattenedActionWrapper(gym.Wrapper):
    """Flatten Dict action space to Box for compatibility"""
    
    def __init__(self, env):
        super().__init__(env)
        
        # Flattened action: [action_type (1), parameters (3)] = 4D
        self.action_space = spaces.Box(
            low=np.array([0.0, 0.0, -1.0, 0.0]),
            high=np.array([9.0, 1.0, 1.0, 1.0]),
            dtype=np.float32
        )
    
    def step(self, action):
        # Convert flattened action back to dict
        action_type = int(np.clip(action[0], 0, 9))
        parameters = action[1:]
        
        dict_action = {
            'action_type': action_type,
            'parameters': parameters
        }
        
        return self.env.step(dict_action)

# Training callback to log progress
class TrainingCallback(BaseCallback):
    def __init__(self, verbose=0):
        super().__init__(verbose)
        self.episode_rewards = []
        self.episode_lengths = []
        self.current_episode_reward = 0
        self.current_episode_length = 0
    
    def _on_step(self) -> bool:
        self.current_episode_reward += self.locals['rewards'][0]
        self.current_episode_length += 1
        
        if self.locals['dones'][0]:
            self.episode_rewards.append(self.current_episode_reward)
            self.episode_lengths.append(self.current_episode_length)
            
            if len(self.episode_rewards) % 10 == 0:
                avg_reward = np.mean(self.episode_rewards[-10:])
                avg_length = np.mean(self.episode_lengths[-10:])
                print(f"Episode {len(self.episode_rewards)}: "
                      f"avg_reward={avg_reward:.2f}, avg_length={avg_length:.1f}")
            
            self.current_episode_reward = 0
            self.current_episode_length = 0
        
        return True

if SB3_AVAILABLE:
    print("üéì Starting PPO training...\n")
    
    # Create environment with wrapper
    train_env = FlattenedActionWrapper(G1HighLevelEnv(use_controller=False, simulation=True))
    
    # Create PPO model
    model = PPO(
        policy="MlpPolicy",
        env=train_env,
        learning_rate=3e-4,
        n_steps=2048,
        batch_size=64,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        verbose=1,
        device='cpu'  # Change to 'cuda' if GPU available
    )
    
    # Train
    callback = TrainingCallback()
    
    print("Training for 50,000 timesteps (increase for better performance)...\n")
    model.learn(
        total_timesteps=50000,
        callback=callback,
        progress_bar=True
    )
    
    # Save model
    model.save("g1_high_level_ppo")
    print("\n‚úì Model saved to g1_high_level_ppo.zip")
    
    # Plot training progress
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    
    # Episode rewards
    ax1.plot(callback.episode_rewards, alpha=0.3, label='Episode Reward')
    window = 10
    if len(callback.episode_rewards) > window:
        moving_avg = np.convolve(
            callback.episode_rewards, 
            np.ones(window)/window, 
            mode='valid'
        )
        ax1.plot(range(window-1, len(callback.episode_rewards)), moving_avg, 
                'r-', linewidth=2, label=f'{window}-Episode Moving Average')
    ax1.set_xlabel('Episode')
    ax1.set_ylabel('Total Reward')
    ax1.set_title('Training Progress: Rewards')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Episode lengths
    ax2.plot(callback.episode_lengths, alpha=0.3, label='Episode Length')
    if len(callback.episode_lengths) > window:
        moving_avg = np.convolve(
            callback.episode_lengths, 
            np.ones(window)/window, 
            mode='valid'
        )
        ax2.plot(range(window-1, len(callback.episode_lengths)), moving_avg, 
                'g-', linewidth=2, label=f'{window}-Episode Moving Average')
    ax2.set_xlabel('Episode')
    ax2.set_ylabel('Steps')
    ax2.set_title('Training Progress: Episode Length')
    ax2.legend()
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('training_progress.png', dpi=150)
    plt.show()
    
    print("\nüìä Training visualization saved to training_progress.png")
    
else:
    print("‚ö†Ô∏è  Stable-Baselines3 not available - skipping training")

---
<a id='deployment'></a>
## 8. Deployment to Real Robot

Deploy the trained policy to the physical Unitree G1.

In [None]:
class G1RobotInterface:
    """Interface to real Unitree G1 hardware"""
    
    def __init__(self, network_interface: str = "eth0"):
        self.network_interface = network_interface
        self.initialized = False
        
        if UNITREE_SDK_AVAILABLE:
            try:
                # Initialize DDS
                ChannelFactoryInitialize(0, network_interface)
                
                # Create clients
                self.loco_client = LocoClient()
                self.loco_client.SetTimeout(10.0)
                self.loco_client.Init()
                
                self.audio_client = AudioClient()
                self.audio_client.SetTimeout(10.0)
                self.audio_client.Init()
                
                self.arm_client = G1ArmActionClient()
                self.arm_client.SetTimeout(10.0)
                self.arm_client.Init()
                
                # Subscribe to state
                self.state_subscriber = ChannelSubscriber("rt/lowstate", LowState_)
                self.state_subscriber.Init(self._state_callback, 10)
                
                self.current_state = None
                self.initialized = True
                
                print(f"‚úì Connected to G1 on {network_interface}")
                
            except Exception as e:
                print(f"‚ö†Ô∏è  Failed to connect to robot: {e}")
        else:
            print("‚ö†Ô∏è  Unitree SDK not available")
    
    def _state_callback(self, msg: LowState_):
        """Store latest robot state"""
        self.current_state = msg
    
    def get_state(self) -> Optional[RobotState]:
        """
        Read current robot state
        
        Returns:
            state: RobotState object or None if unavailable
        """
        if not self.initialized or self.current_state is None:
            return None
        
        # Extract data from low-level state
        imu = self.current_state.imu_state
        motors = self.current_state.motor_state
        
        # Build RobotState
        state = RobotState(
            base_orientation=np.array(imu.rpy),
            base_angular_velocity=np.array(imu.gyroscope),
            base_linear_velocity=np.array([0, 0, 0]),  # Not directly available
            joint_positions=np.array([m.q for m in motors]),
            joint_velocities=np.array([m.dq for m in motors]),
            foot_contacts=np.array([1, 1]),  # Placeholder (need force sensors)
            goal_direction=np.array([1, 0, 0]),  # Set externally
            goal_distance=5.0,  # Set externally
            obstacle_proximity=np.ones(8) * 10.0,  # From depth camera
            cmd_velocity=np.zeros(3),  # From controller
            cmd_mode=1,
            cmd_gesture=0
        )
        
        return state
    
    def send_velocity(self, velocity: np.ndarray):
        """Send velocity command to robot"""
        if not self.initialized:
            return
        
        vx, vy, vw = velocity
        self.loco_client.Move(float(vx), float(vy), float(vw))
    
    def execute_gesture(self, gesture_id: int):
        """Execute pre-defined gesture"""
        if not self.initialized:
            return
        
        gesture_map = {
            0: "high wave",
            1: "shake hand",
            2: "clap"
        }
        
        gesture_name = gesture_map.get(gesture_id)
        if gesture_name and gesture_name in action_map:
            self.arm_client.ExecuteAction(action_map[gesture_name])
            time.sleep(0.1)
            self.arm_client.ExecuteAction(action_map["release arm"])
    
    def speak(self, message: str, volume: float = 0.5):
        """Text-to-speech output"""
        if not self.initialized:
            return
        
        # Set volume (0-100)
        self.audio_client.SetVolume(int(volume * 100))
        
        # Chinese TTS (G1 default)
        self.audio_client.TtsMaker(message, 0)
    
    def emergency_stop(self):
        """Emergency stop"""
        if not self.initialized:
            return
        
        self.loco_client.Damp()
        print("‚ö†Ô∏è  EMERGENCY STOP ACTIVATED")

class DeployedPolicy:
    """Deployed RL policy running on real robot"""
    
    def __init__(self, model_path: str, network_interface: str = "eth0"):
        # Load trained model
        if SB3_AVAILABLE:
            self.model = PPO.load(model_path)
            print(f"‚úì Loaded policy from {model_path}")
        else:
            self.model = None
            print("‚ö†Ô∏è  SB3 not available")
        
        # Connect to robot
        self.robot = G1RobotInterface(network_interface)
        
        # Controller for human input
        self.controller = USBControllerInput()
        
        # Goal tracking
        self.goal_position = np.array([5.0, 0.0, 0.0])
    
    def run(self, duration: float = 60.0):
        """
        Run policy on real robot
        
        Args:
            duration: How long to run (seconds)
        """
        if not self.robot.initialized or self.model is None:
            print("‚ö†Ô∏è  Cannot run: robot or model not initialized")
            return
        
        print(f"ü§ñ Running policy for {duration:.0f} seconds...")
        print("   Press START button on controller for emergency stop\n")
        
        # Stand up
        self.robot.loco_client.Squat2StandUp()
        time.sleep(2)
        
        start_time = time.time()
        step_count = 0
        
        while time.time() - start_time < duration:
            # Get robot state
            state = self.robot.get_state()
            if state is None:
                time.sleep(0.1)
                continue
            
            # Get controller commands
            if self.controller.available:
                commands = self.controller.read_commands()
                state.cmd_velocity = commands['velocity']
                state.cmd_mode = commands['mode']
                state.cmd_gesture = commands['gesture']
                
                # Check emergency stop
                if commands['emergency_stop']:
                    self.robot.emergency_stop()
                    break
            
            # Update goal tracking (would come from navigation system)
            current_pos = np.array([0, 0, 0])  # Placeholder
            goal_vec = self.goal_position - current_pos
            state.goal_distance = np.linalg.norm(goal_vec)
            state.goal_direction = goal_vec / (state.goal_distance + 1e-6)
            
            # Get action from policy
            obs = state.to_array()
            action_flat, _ = self.model.predict(obs, deterministic=True)
            
            # Parse action
            action_type = HighLevelAction(int(np.clip(action_flat[0], 0, 9)))
            velocity_scale, direction_bias, audio_volume = action_flat[1:]
            
            action_cmd = ActionCommand(
                action_type=action_type,
                velocity_scale=float(velocity_scale),
                direction_bias=float(direction_bias),
                audio_volume=float(audio_volume)
            )
            
            # Execute action
            motion_cmd = action_cmd.to_motion_command()
            
            if motion_cmd['is_gesture']:
                self.robot.execute_gesture(motion_cmd['gesture_id'])
            else:
                self.robot.send_velocity(motion_cmd['velocity'])
            
            # Log progress
            if step_count % 10 == 0:
                print(f"\rStep {step_count}: {action_cmd.get_description()}", 
                      end='', flush=True)
            
            step_count += 1
            time.sleep(0.1)  # 10 Hz control loop
        
        # Stop robot
        self.robot.loco_client.Move(0, 0, 0)
        print("\n\n‚úì Policy execution complete")

# Example deployment (requires real robot)
print("\nDeployment Example:\n")
print("To deploy trained policy to real G1:")
print("""\n```python
# Load and run policy
policy = DeployedPolicy(
    model_path='g1_high_level_ppo.zip',
    network_interface='eth0'
)

# Run for 60 seconds
policy.run(duration=60.0)
```""")
print("\n‚ö†Ô∏è  Ensure robot is in safe environment before running!")

---
<a id='applications'></a>
## 9. Example Applications

### Application 1: Interactive Tour Guide

Robot follows human operator and responds to gesture commands.

In [None]:
class TourGuideApplication:
    """
    Tour guide robot that follows human and performs gestures
    """
    
    def __init__(self, policy_path: str):
        self.deployed_policy = DeployedPolicy(policy_path)
        self.waypoints = [
            ("Entrance", np.array([0, 0, 0])),
            ("Exhibit A", np.array([5, 0, 0])),
            ("Exhibit B", np.array([5, 5, 0])),
            ("Exit", np.array([0, 5, 0]))
        ]
        self.current_waypoint = 0
    
    def start_tour(self):
        """
        Execute guided tour with narration
        """
        print("üé≠ Starting tour guide mode...")
        
        for name, position in self.waypoints:
            print(f"\nNavigating to: {name}")
            
            # Update goal
            self.deployed_policy.goal_position = position
            
            # Wave when arriving
            self.deployed_policy.robot.execute_gesture(0)  # Wave
            
            # Narration (would be location-specific)
            self.deployed_policy.robot.speak(f"Welcome to {name}")
            
            # Wait at location
            time.sleep(5)
        
        print("\n‚úì Tour complete!")

print("\nüìù Example: Tour Guide Application")
print("   - Robot follows pre-defined waypoints")
print("   - Waves and provides audio narration at each location")
print("   - Responds to human controller for manual override")

### Application 2: Warehouse Assistant

Robot navigates warehouse and performs pick-and-place tasks.

In [None]:
class WarehouseAssistant:
    """
    Warehouse robot for logistics tasks
    """
    
    def __init__(self, policy_path: str):
        self.deployed_policy = DeployedPolicy(policy_path)
        self.inventory = {
            'A1': 'Computer Monitor',
            'A2': 'Keyboard',
            'B1': 'Mouse',
            'B2': 'Headphones'
        }
    
    def pick_item(self, location: str) -> bool:
        """
        Navigate to location and pick item
        
        Returns:
            success: True if item retrieved
        """
        item_name = self.inventory.get(location, "Unknown")
        print(f"\nüì¶ Retrieving: {item_name} from {location}")
        
        # Navigate to location (would use real coordinates)
        # For demo, use placeholder
        self.deployed_policy.goal_position = np.array([5, 0, 0])
        
        # Audio feedback
        self.deployed_policy.robot.speak(f"Retrieving {item_name}")
        
        # Simulate pick (would use manipulation skills)
        time.sleep(3)
        
        print(f"‚úì Item retrieved: {item_name}")
        return True
    
    def deliver_item(self, destination: str) -> bool:
        """
        Deliver held item to destination
        """
        print(f"\nüöö Delivering to: {destination}")
        
        # Navigate to destination
        # ...
        
        # Place item
        self.deployed_policy.robot.speak("Delivery complete")
        
        return True

print("\nüìù Example: Warehouse Assistant")
print("   - Autonomous navigation to storage locations")
print("   - Audio confirmation of tasks")
print("   - Human operator can override via controller")

---
## Summary & Next Steps

This notebook demonstrated a **complete high-level RL framework** for the Unitree G1:

‚úÖ **State Space:** Combined robot sensors + human controller input (86D)  
‚úÖ **Action Space:** Abstract high-level actions (locomotion, gestures, audio)  
‚úÖ **Motion Primitives:** Pre-validated movement sequences  
‚úÖ **Reward Function:** Simple sparse rewards for goal reaching and command following  
‚úÖ **Training:** PPO implementation with Stable-Baselines3  
‚úÖ **Deployment:** Real robot interface with safety features  
‚úÖ **Applications:** Tour guide and warehouse assistant examples

### Key Advantages of High-Level RL

1. **Faster Learning:** Smaller action space than low-level joint control
2. **Better Generalization:** Abstractions transfer across tasks
3. **Safer:** Primitives are pre-tested and bounded
4. **Human-Interpretable:** Actions have semantic meaning
5. **Controller Integration:** Human can guide learning and provide real-time input

### Next Steps

1. **Expand Motion Library:** Add more primitives (stairs, doors, obstacles)
2. **Vision Integration:** Add camera observations to state space
3. **Multi-Task Learning:** Train single policy for multiple applications
4. **Hierarchical RL:** Add meta-controller for task planning
5. **Human Preference Learning:** Fine-tune policy using human feedback

### References

- **Hierarchical RL:** "Options Framework" (Sutton, Precup, Singh)
- **Human-in-the-Loop:** "Deep TAMER" (Warnell et al.)
- **PPO Algorithm:** "Proximal Policy Optimization" (Schulman et al.)
- **Motion Primitives:** "Dynamic Movement Primitives" (Schaal et al.)

---

*High-Level RL Framework for Unitree G1*  
*Enables rapid task learning with human guidance*