In [None]:
import sys
import os

# Activate virtual environment
venv_path = os.path.expanduser("~/mujoco_ws/venv")
if os.path.exists(venv_path):
    sys.path.insert(0, os.path.join(venv_path, "lib", "python3.11", "site-packages"))
    print(f"✓ Virtual environment activated: {venv_path}")
    print(f"✓ Python executable: {sys.executable}")
else:
    print(f"⚠ Virtual environment not found at {venv_path}")
    print(f"Currently using Python: {sys.executable}")

## Setup and Imports

In [None]:
import numpy as np
import mujoco
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import time
import torch
from datetime import timedelta
import os

print(f"PyTorch using: {'cuda' if torch.cuda.is_available() else 'cpu'} device")
print(f"NumPy version: {np.__version__}")
print(f"MuJoCo version: {mujoco.__version__}")

## Environment: Touch the Box

In [None]:
class PickPlaceEnv(gym.Env):
    """Franka Panda gripper environment (inspired by panda-gym).
    
    Key improvements:
    - Uses joint position control with incremental targets (like panda-gym)
    - Limits action to 0.05 rad/step for smooth, learnable control
    - Observation includes gripper width (not individual fingers)
    - Uses proper neutral pose and joint limit awareness
    """
    
    def __init__(self, model_path='model/pick_place_scene.xml', render_mode=None):
        super().__init__()
        self.render_mode = render_mode
        self.viewer = None
        
        # Load model
        try:
            self.model = mujoco.MjModel.from_xml_path(model_path)
        except FileNotFoundError:
            raise FileNotFoundError(f"Model not found at {model_path}")
        
        self.data = mujoco.MjData(self.model)
        self.dt = self.model.opt.timestep
        
        # Franka Panda configuration (7 arm joints + 2 gripper fingers)
        self.arm_joint_indices = np.array([0, 1, 2, 3, 4, 5, 6])
        self.gripper_joint_indices = np.array([7, 8])
        
        # Joint forces (from panda-gym reference)
        self.joint_forces = np.array([87.0, 87.0, 87.0, 87.0, 12.0, 120.0, 120.0, 170.0, 170.0])
        
        # Neutral pose (from panda-gym)
        self.neutral_joint_values = np.array([0.00, 0.41, 0.00, -1.85, 0.00, 2.26, 0.79, 0.04, 0.04])
        
        # Action space: 7 arm joints + 1 gripper (incremental control [-1, 1])
        self.action_space = spaces.Box(
            low=np.array([-1.0]*8),
            high=np.array([1.0]*8),
            dtype=np.float32
        )
        
        # Observation space: EE position (3) + EE velocity (3) + gripper width (1) + box position (3) = 10D
        # Simplified observation like panda-gym
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(10,), dtype=np.float32
        )
        
        self.max_episode_steps = 500
        self.step_count = 0
        self.active_box = None
        
        # Hand/gripper link index
        self.hand_id = self.model.body('hand').id
        self.max_action_step = 0.05  # Limit control to 0.05 rad per step (like panda-gym)
    
    def _get_obs(self):
        """Get observation: EE position/velocity + gripper width + box position."""
        # End effector (hand body) position and velocity
        ee_pos = self.data.body(self.hand_id).xpos.copy()
        ee_vel = self.data.body(self.hand_id).cvel[:3].copy()  # Linear velocity
        
        # Gripper width (sum of both finger angles)
        gripper_width = self.data.qpos[7] + self.data.qpos[8]
        
        # Target box position
        box_pos = self.data.body(self.active_box).xpos.copy()
        
        obs = np.concatenate([
            ee_pos, ee_vel, [gripper_width], box_pos
        ]).astype(np.float32)
        
        return obs
    
    def reset(self, seed=None, options=None):
        """Reset environment to neutral pose with random box."""
        super().reset(seed=seed)
        mujoco.mj_resetData(self.model, self.data)
        
        # Set to neutral pose
        self.data.qpos[:len(self.neutral_joint_values)] = self.neutral_joint_values
        
        # Randomly select target box
        all_boxes = [
            'red_box_0', 'red_box_1', 'red_box_2',
            'blue_box_0', 'blue_box_1', 'blue_box_2',
            'green_box_0', 'green_box_1', 'green_box_2',
            'yellow_box_0', 'yellow_box_1', 'yellow_box_2'
        ]
        self.active_box = np.random.choice(all_boxes)
        self.step_count = 0
        
        # Settle physics
        for _ in range(10):
            mujoco.mj_step(self.model, self.data)
        
        obs = self._get_obs()
        return obs, {}
    
    def step(self, action):
        """Execute action: controlled descent from above with limited velocity."""
        action = np.clip(action, -1.0, 1.0)
        
        # Get current state
        ee_pos = self.data.body(self.hand_id).xpos.copy()
        box_pos = self.data.body(self.active_box).xpos.copy()
        
        # Current arm angles
        current_arm_angles = self.data.qpos[self.arm_joint_indices].copy()
        
        # Target: directly above box at safe height
        approach_height = box_pos[2] + 0.10  # 10cm above box to start approach
        target_xyz = np.array([box_pos[0], box_pos[1], approach_height])
        
        # Calculate errors
        pos_error = target_xyz - ee_pos
        horiz_error = np.linalg.norm(pos_error[:2])
        vert_error = pos_error[2]
        
        # Arm control based on distance to approach position
        if horiz_error > 0.02 or vert_error > 0.02:  # Still far from target
            # Scale toward target with action influence
            arm_ctrl = action[:7] * self.max_action_step * 0.4  # Reduced speed toward target
        else:
            # Close to target, very fine control
            arm_ctrl = action[:7] * self.max_action_step * 0.2  # Very slow fine motion
        
        target_arm_angles = current_arm_angles + arm_ctrl
        
        # Gripper control: slower, more controlled
        gripper_ctrl = action[7] * 0.01  # Very slow gripper (1cm per step max)
        current_gripper_width = self.data.qpos[7] + self.data.qpos[8]
        target_gripper_width = current_gripper_width + gripper_ctrl
        target_gripper_width = np.clip(target_gripper_width, 0, 0.04)
        
        # Combine target angles
        target_angles = np.concatenate([
            target_arm_angles,
            [target_gripper_width / 2, target_gripper_width / 2]
        ])
        
        # Control with adaptive damping based on proximity to target
        is_near_target = horiz_error < 0.05 and vert_error < 0.05
        self._control_joints(target_angles, slow_descent=is_near_target)
        
        # More substeps for smoother motion
        num_substeps = 15 if is_near_target else 10
        for _ in range(num_substeps):
            mujoco.mj_step(self.model, self.data)
        
        self.step_count += 1
        obs = self._get_obs()
        
        # Compute reward
        ee_pos_new = self.data.body(self.hand_id).xpos.copy()
        box_pos = self.data.body(self.active_box).xpos.copy()
        
        # Reward for approaching from above (xy alignment)
        horiz_dist = np.linalg.norm(ee_pos_new[:2] - box_pos[:2])
        vert_dist = ee_pos_new[2] - box_pos[2]
        
        reward = 0.0
        
        # Phase 1: Horizontal alignment (xy plane)
        if horiz_dist < 0.15:
            reward += 30.0 * max(0, 1.0 - horiz_dist / 0.15)
        
        # Phase 2: Vertical approach from above
        if horiz_dist < 0.08 and vert_dist > -0.02:  # Close horizontally and above box
            reward += 40.0 * max(0, 1.0 - (vert_dist - 0.06) / 0.06)
        
        # Touch reward: gentle contact with box top
        touch_dist = np.linalg.norm(ee_pos_new - box_pos)
        if touch_dist < 0.10:  # Close enough to touch
            reward += 100.0 * max(0, 1.0 - touch_dist / 0.10)
            # Bonus for coming from above (positive z trajectory)
            if vert_dist < 0.02:
                reward += 50.0
        
        # Penalize high joint velocities to discourage slamming
        joint_vel_penalty = -np.linalg.norm(self.data.qvel[:7]) * 0.005
        reward += joint_vel_penalty
        
        # Time penalty
        reward -= 0.0003
        
        terminated = self.step_count >= self.max_episode_steps
        
        return obs, reward, terminated, False, {}
    
    def _control_joints(self, target_angles, slow_descent=False):
        """Control joints using PD control with adaptive gains for smooth motion."""
        # Proportional and derivative gains (reduced during descent for smoother control)
        kp = 80.0 if slow_descent else 100.0  # Lower gain during descent
        kd = 15.0 if slow_descent else 10.0   # Higher damping for smoother descent
        
        # For arm joints (0-6): use PD control
        for i in range(7):
            current_angle = self.data.qpos[i]
            error = target_angles[i] - current_angle
            
            ctrl = kp * error
            
            vel = self.data.qvel[i]
            ctrl -= kd * vel
            
            # Clip to force limit
            force_limit = self.joint_forces[i]
            ctrl = np.clip(ctrl, -force_limit, force_limit)
            
            self.data.ctrl[i] = ctrl
        
        # For gripper (control index 7 maps to gripper tendon)
        gripper_width_cmd = target_angles[7] * target_angles[8]
        gripper_ctrl = int((gripper_width_cmd / 0.04) * 255)
        gripper_ctrl = np.clip(gripper_ctrl, 0, 255)
        self.data.ctrl[7] = gripper_ctrl


In [None]:
def train_pick_place(total_timesteps=100000):
    """Train Franka Panda gripper to touch randomly placed boxes (improved control)."""
    print("=" * 70)
    print("Training: Touch the randomly placed box (panda-gym style control)")
    print("=" * 70)
    print(f"Task: Train gripper to reach and touch one randomly placed box")
    print(f"Improvement: Incremental joint position control (0.05 rad/step max)")
    print(f"Device: {('CUDA' if torch.cuda.is_available() else 'CPU')}")
    print()
    
    # Create environment
    env = make_vec_env(
        lambda: PickPlaceEnv(model_path='model/pick_place_scene.xml'),
        n_envs=4
    )
    
    # Train with PPO
    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=3e-4,
        n_steps=1024,
        batch_size=64,
        n_epochs=20,
        gamma=0.99,
        gae_lambda=0.95,
        clip_range=0.2,
        verbose=1,
        policy_kwargs={"net_arch": [256, 256]}
    )
    
    print(f"Training for {total_timesteps:,} timesteps...\n")
    start_time = time.time()
    model.learn(total_timesteps=total_timesteps)
    elapsed = time.time() - start_time
    
    model.save("touch_box_ppo")
    print("\n" + "=" * 70)
    print(f"✓ Training complete! ({elapsed/60:.1f} minutes)")
    print(f"✓ Model saved as 'touch_box_ppo'")
    print("=" * 70)
    
    env.close()
    return model

print("✓ train_pick_place() function defined")


In [None]:
model_pp = train_pick_place(total_timesteps=400000)  # Train with better approach guidance

In [None]:
def evaluate_model(num_episodes=5, deterministic=True):
    """Evaluate the model - check how often it touches the box."""
    from stable_baselines3 import PPO
    
    print("=" * 70)
    print("Evaluating Touch Box Model (Improved Control)")
    print("=" * 70)
    
    model = PPO.load("touch_box_ppo")
    
    touches = 0
    
    for ep in range(num_episodes):
        print(f"\nEpisode {ep + 1}/{num_episodes}:")
        
        env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
        obs, _ = env.reset()
        box_name = env.active_box
        
        episode_reward = 0.0
        touched = False
        
        while env.step_count < env.max_episode_steps:
            action, _ = model.predict(obs, deterministic=deterministic)
            obs, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            
            # Get current distance from obs (indices 0:3 are EE pos, 7:10 are box pos)
            ee_pos = obs[:3]
            box_pos = obs[7:10]
            distance = np.linalg.norm(ee_pos - box_pos)
            
            if distance < 0.20:  # Increased from 0.15m
                touched = True
            
            if terminated or truncated:
                break
        
        if touched:
            touches += 1
        
        print(f"  Box: {box_name}")
        print(f"  Reward: {episode_reward:.2f}")
        print(f"  Touched: {'✓ YES' if touched else '✗ NO'}")
        
        env.close()
    
    print("\n" + "=" * 70)
    print(f"Success Rate: {touches}/{num_episodes} ({100*touches/num_episodes:.0f}%)")
    print("=" * 70)

print("✓ evaluate_model() function defined")


In [None]:
evaluate_model(num_episodes=10)

## Summary

**Current Task**: Train Franka Panda arm to touch a randomly-placed box
- Single box per episode (randomly selected from 12 available)
- Dense reaching reward: 50.0 × (1 - distance/0.8)
- Touch bonus: +100.0 when distance < 0.15m
- Time penalty: -0.001 per step
- 500 steps per episode
- Observation: 22D (arm state + gripper + end-effector + box position)
- Action: 8D continuous (7 arm joints + 1 gripper)

## Visualization

In [None]:
def visualize_model(num_episodes=3, deterministic=True):
    """Visualize the trained model."""
    from stable_baselines3 import PPO
    import mujoco.viewer
    
    print("=" * 70)
    print("Visualizing Touch Box Model")
    print("=" * 70)
    print(f"Episodes: {num_episodes}\n")
    
    model = PPO.load("touch_box_ppo")
    
    for ep in range(num_episodes):
        print(f"Episode {ep + 1}/{num_episodes}")
        
        env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
        obs, _ = env.reset()
        
        viewer = mujoco.viewer.launch_passive(env.model, env.data)
        
        try:
            while env.step_count < env.max_episode_steps:
                action, _ = model.predict(obs, deterministic=deterministic)
                obs, reward, terminated, truncated, _ = env.step(action)
                
                if terminated or truncated:
                    break
                
                viewer.sync()
                
        finally:
            viewer.close()
            env.close()
    
    print("=" * 70)
    print("Done!")
    print("=" * 70)

print("✓ visualize_model() function defined")


In [None]:
visualize_model(num_episodes=5)

## Debugging and Testing

In [None]:
def debug_environment():
    """Debug the environment to understand observations and actions (improved control)."""
    print("=" * 70)
    print("Environment Debugging Information")
    print("=" * 70)
    
    env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
    obs, _ = env.reset()
    
    print(f"\n✓ Environment initialized successfully (panda-gym style)")
    print(f"  Model: Franka Panda with parallel gripper")
    print(f"  Action space: {env.action_space.shape} (7 arm joints + 1 gripper)")
    print(f"  Observation space: {env.observation_space.shape} (simplified)")
    
    print(f"\nObservation breakdown (10D total):")
    print(f"  [0:3]    = End-effector (hand) position (m)")
    print(f"  [3:6]    = End-effector velocity (m/s)")
    print(f"  [6]      = Gripper width (m, 0 to 0.04)")
    print(f"  [7:10]   = Target box position (m)")
    
    print(f"\nInitial observation sample:")
    print(f"  EE position: {obs[:3]}")
    print(f"  EE velocity: {obs[3:6]}")
    print(f"  Gripper width: {obs[6]:.4f}")
    print(f"  Box position: {obs[7:10]}")
    
    # Test an action with incremental control
    action = np.zeros(8)
    action[0] = 0.5   # Move joint 1 (incremental, 0.025 rad max)
    action[7] = -0.5  # Close gripper (incremental, 0.025m max)
    
    print(f"\nTest action (move joint 1 incrementally, close gripper):")
    print(f"  Action: {action}")
    print(f"  Max step: 0.05 rad/step for arm, 0.05m/step for gripper")
    
    obs, reward, terminated, truncated, _ = env.step(action)
    print(f"  Reward: {reward:.4f}")
    print(f"  New EE position: {obs[:3]}")
    print(f"  New gripper width: {obs[6]:.4f}")
    
    print(f"\n" + "=" * 70)
    print("✓ Environment is working correctly!")
    print("=" * 70)
    
    env.close()

print("✓ debug_environment() function defined")


In [None]:
debug_environment()

In [None]:
# Quick test to verify new observation format
env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
obs, _ = env.reset()

print("Observation Space Test")
print("=" * 50)
print(f"Observation shape: {obs.shape} (expected (10,))")
print(f"Observation dtype: {obs.dtype}")
print(f"\nObservation values:")
print(f"  EE position (obs[0:3]): {obs[:3]}")
print(f"  EE velocity (obs[3:6]): {obs[3:6]}")
print(f"  Gripper width (obs[6]): {obs[6]:.4f}")
print(f"  Box position (obs[7:10]): {obs[7:10]}")
print(f"\nActive box: {env.active_box}")

# Test an action
action = np.array([0.1, 0, 0, 0, 0, 0, 0, -0.5])  # Move joint 1, close gripper
obs_new, reward, terminated, truncated, _ = env.step(action)
print(f"\nAfter action:")
print(f"  EE position changed: {obs_new[:3]}")
print(f"  Gripper width changed: {obs_new[6]:.4f}")

env.close()
print("\n✓ Observation format is correct!")


In [None]:
def analyze_distances():
    """Analyze minimum distances achieved during episodes."""
    from stable_baselines3 import PPO
    
    try:
        model = PPO.load("touch_box_ppo")
    except:
        print("Model not found. Train first with: train_pick_place()")
        return
    
    print("Distance Analysis")
    print("=" * 70)
    
    min_distances = []
    
    for ep in range(10):
        env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
        obs, _ = env.reset()
        
        ep_min_distance = float('inf')
        
        while env.step_count < env.max_episode_steps:
            action, _ = model.predict(obs, deterministic=True)
            obs, _, terminated, truncated, _ = env.step(action)
            
            ee_pos = obs[:3]
            box_pos = obs[7:10]
            distance = np.linalg.norm(ee_pos - box_pos)
            
            ep_min_distance = min(ep_min_distance, distance)
            
            if terminated or truncated:
                break
        
        min_distances.append(ep_min_distance)
        print(f"Episode {ep + 1}: min distance = {ep_min_distance:.4f}m")
        env.close()
    
    min_distances = np.array(min_distances)
    print("\n" + "=" * 70)
    print(f"Minimum distance statistics:")
    print(f"  Mean: {min_distances.mean():.4f}m")
    print(f"  Std: {min_distances.std():.4f}m")
    print(f"  Min: {min_distances.min():.4f}m")
    print(f"  Max: {min_distances.max():.4f}m")
    print(f"\nCurrent touch threshold: 0.15m")
    print(f"Episodes below 0.15m: {(min_distances < 0.15).sum()}/10")
    print(f"Episodes below 0.20m: {(min_distances < 0.20).sum()}/10")
    print(f"Episodes below 0.25m: {(min_distances < 0.25).sum()}/10")
    print("=" * 70)

analyze_distances()

In [None]:
def inspect_policy_actions():
    """Inspect what actions the trained policy produces (for debugging)."""
    from stable_baselines3 import PPO
    
    try:
        model = PPO.load("touch_box_ppo")
    except:
        print("Model not found. Train first with: train_pick_place()")
        return
    
    env = PickPlaceEnv(model_path='model/pick_place_scene.xml')
    obs, _ = env.reset()
    
    print("Policy Action Analysis")
    print("=" * 70)
    
    gripper_actions = []
    arm_joint_actions = []
    distances = []
    
    for i in range(100):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, _, _, _ = env.step(action)
        
        gripper_actions.append(action[7])
        arm_joint_actions.append(np.mean(np.abs(action[:7])))  # Mean absolute arm movement
        
        # Extract distance from observation (obs[0:3] is EE, obs[7:10] is box)
        ee_pos = obs[:3]
        box_pos = obs[7:10]
        distance = np.linalg.norm(ee_pos - box_pos)
        distances.append(distance)
    
    gripper_actions = np.array(gripper_actions)
    arm_actions = np.array(arm_joint_actions)
    distances = np.array(distances)
    
    print("Gripper Action Statistics:")
    print(f"  Mean: {gripper_actions.mean():.3f}")
    print(f"  Std: {gripper_actions.std():.3f}")
    print(f"  Min: {gripper_actions.min():.3f}")
    print(f"  Max: {gripper_actions.max():.3f}")
    print(f"  % closing (< -0.1): {100 * (gripper_actions < -0.1).mean():.1f}%")
    print(f"  % opening (> +0.1): {100 * (gripper_actions > 0.1).mean():.1f}%")
    print()
    print("Arm Movement Statistics:")
    print(f"  Mean absolute movement: {arm_actions.mean():.3f}")
    print(f"  Max movement: {arm_actions.max():.3f}")
    print()
    print("Distance to Box:")
    print(f"  Initial: {distances[0]:.3f}m")
    print(f"  Final: {distances[-1]:.3f}m")
    print(f"  Min: {distances.min():.3f}m")
    print(f"  Mean: {distances.mean():.3f}m")
    print("=" * 70)
    
    env.close()

inspect_policy_actions()

## Visualize the Scene

In [None]:
# Visualize the empty scene without training
import mujoco.viewer

model_path = 'model/pick_place_scene.xml'
model = mujoco.MjModel.from_xml_path(model_path)
data = mujoco.MjData(model)

# Run simulation for a few seconds to see the scene
with mujoco.viewer.launch_passive(model, data) as viewer:
    # Run for 2 seconds
    start = time.time()
    while viewer.is_running() and time.time() - start < 10.0:
        step_start = data.time
        while (data.time - step_start) < model.opt.timestep * 10:
            mujoco.mj_step(model, data)
        
        viewer.sync()
        time.sleep(0.001)