# Policy Gradient Controller for Robotics

This notebook demonstrates how to implement a policy gradient controller for robotic systems, as discussed in Chapter 9 of the Physical AI & Humanoid Robotics book.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import seaborn as sns

%matplotlib inline
sns.set_style("whitegrid")

## 1. Policy and Value Networks

First, let's define the neural network architectures for our policy gradient method.

In [None]:
class PolicyNetwork(nn.Module):
    """
    Policy network that maps states to action probabilities.
    For continuous action spaces, we output the parameters of a distribution.
    """
    def __init__(self, state_size, action_size, hidden_size=64):
        super(PolicyNetwork, self).__init__()
        self.fc_layers = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
        )
        
        # Output mean for each action dimension
        self.action_mean = nn.Linear(hidden_size, action_size)
        
        # Log standard deviation for each action dimension (learnable)
        self.action_log_std = nn.Parameter(torch.zeros(action_size))
    
    def forward(self, state):
        features = self.fc_layers(state)
        action_mean = torch.tanh(self.action_mean(features))  # Bound mean to [-1, 1]
        action_std = torch.exp(self.action_log_std)
        return action_mean, action_std


class ValueNetwork(nn.Module):
    """
    Value network that estimates the value of a state.
    """
    def __init__(self, state_size, hidden_size=64):
        super(ValueNetwork, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
    
    def forward(self, state):
        return self.network(state)

## 2. Policy Gradient Agent

Now let's implement the policy gradient agent that will learn to control our robotic system.

In [None]:
class PolicyGradientAgent:
    """
    Policy Gradient Agent that learns to control a robotic system.
    """
    def __init__(self, state_size, action_size, learning_rate=3e-4, gamma=0.99, entropy_coef=0.01):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.entropy_coef = entropy_coef
        
        # Initialize policy and value networks
        self.policy = PolicyNetwork(state_size, action_size)
        self.value = ValueNetwork(state_size)
        
        # Optimizers
        self.policy_optimizer = optim.Adam(self.policy.parameters(), lr=learning_rate)
        self.value_optimizer = optim.Adam(self.value.parameters(), lr=learning_rate)
        
        # Storage for episode data
        self.log_probs = []
        self.values = []
        self.rewards = []
        self.entropies = []
    
    def select_action(self, state):
        """
        Select an action based on the current policy.
        
        Args:
            state: Current state of the environment
            
        Returns:
            action: Selected action
            log_prob: Log probability of the selected action
            value: Estimated value of the state
        """
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        
        # Get action distribution parameters and state value
        action_mean, action_std = self.policy(state_tensor)
        value = self.value(state_tensor)
        
        # Create distribution and sample action
        dist = torch.distributions.Normal(action_mean, action_std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(dim=-1)
        entropy = dist.entropy().sum(dim=-1)
        
        return action.detach().cpu().numpy()[0], log_prob, value, entropy
    
    def store_transition(self, log_prob, value, entropy, reward):
        """
        Store transition information for later update.
        
        Args:
            log_prob: Log probability of the action taken
            value: Estimated value of the state
            entropy: Entropy of the action distribution
            reward: Reward received
        """
        self.log_probs.append(log_prob)
        self.values.append(value)
        self.entropies.append(entropy)
        self.rewards.append(torch.tensor([reward], dtype=torch.float32))
    
    def update_policy(self):
        """
        Update the policy based on the collected episode data.
        """
        if len(self.rewards) == 0:
            return
        
        # Compute discounted returns
        returns = deque(maxlen=len(self.rewards))
        R = torch.tensor([0.0], dtype=torch.float32)
        
        for i in reversed(range(len(self.rewards))):
            R = self.rewards[i] + self.gamma * R
            returns.appendleft(R)
        
        # Convert to tensors
        returns = torch.cat(list(returns))
        values = torch.cat(self.values)
        log_probs = torch.cat(self.log_probs)
        entropies = torch.cat(self.entropies)
        
        # Calculate advantages
        advantages = returns - values.detach()
        
        # Calculate policy loss
        policy_loss = -(log_probs * advantages.detach()).mean() - self.entropy_coef * entropies.mean()
        
        # Calculate value loss
        value_loss = nn.MSELoss()(values, returns)
        
        # Update networks
        self.policy_optimizer.zero_grad()
        policy_loss.backward()
        # Clip gradients to prevent large updates
        torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 40)
        self.policy_optimizer.step()
        
        self.value_optimizer.zero_grad()
        value_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.value.parameters(), 40)
        self.value_optimizer.step()
        
        # Reset storage
        self.log_probs = []
        self.values = []
        self.entropies = []
        self.rewards = []

## 3. Environment: Simple Robotic Arm

Now let's create a simple robotic environment to test our policy gradient controller.

In [None]:
class SimpleRoboticArmEnv:
    """
    Simplified environment for a 2-DOF robotic arm.
    The goal is to move the end-effector to a target position.
    """
    def __init__(self, max_steps=100):
        self.arm_length = [0.5, 0.4]  # Length of each arm segment
        self.state_size = 4  # [joint1_angle, joint2_angle, target_x, target_y]
        self.action_size = 2  # [change_in_joint1, change_in_joint2]
        self.max_steps = max_steps
        self.step_count = 0
        
        # Reset environment
        self.reset()
    
    def reset(self):
        """Reset the environment to a random state."""
        # Random starting joint angles
        self.joint1 = np.random.uniform(-np.pi/2, np.pi/2)
        self.joint2 = np.random.uniform(-np.pi/2, np.pi/2)
        
        # Random target position (within reachable range)
        self.target_x = np.random.uniform(-0.8, 0.8)
        self.target_y = np.random.uniform(-0.2, 0.8)  # Mostly upper half-plane
        
        self.step_count = 0
        return self._get_state()
    
    def _get_state(self):
        """Get current state representation."""
        return np.array([self.joint1, self.joint2, self.target_x, self.target_y])
    
    def _get_end_effector_position(self):
        """Calculate end-effector position based on joint angles."""
        # Forward kinematics for 2-DOF arm
        x = self.arm_length[0] * np.cos(self.joint1) + \
            self.arm_length[1] * np.cos(self.joint1 + self.joint2)
        y = self.arm_length[0] * np.sin(self.joint1) + \
            self.arm_length[1] * np.sin(self.joint1 + self.joint2)
        return x, y
    
    def step(self, action):
        """
        Execute an action in the environment.
        
        Args:
            action: [delta_joint1, delta_joint2]
            
        Returns:
            next_state: New state after executing action
            reward: Scalar reward
            done: Whether the episode is finished
        """
        # Apply action (with clamping to prevent excessive movements)
        self.joint1 += np.clip(action[0], -0.3, 0.3)
        self.joint2 += np.clip(action[1], -0.3, 0.3)
        
        # Clamp joint angles to reasonable ranges
        self.joint1 = np.clip(self.joint1, -np.pi, np.pi)
        self.joint2 = np.clip(self.joint2, -np.pi, np.pi)
        
        # Get end-effector position
        ee_x, ee_y = self._get_end_effector_position()
        
        # Calculate distance to target
        dist_to_target = np.sqrt((ee_x - self.target_x)**2 + (ee_y - self.target_y)**2)
        
        # Reward is negative distance (higher reward = closer to target)
        reward = -dist_to_target
        
        # Additional shaping reward to encourage movement toward target
        if hasattr(self, 'prev_dist') and self.prev_dist is not None:
            if dist_to_target < self.prev_dist:
                reward += 0.1  # Small bonus for improvement
            else:
                reward -= 0.05  # Small penalty for getting farther
        
        self.prev_dist = dist_to_target
        
        # Check if target is reached
        done = dist_to_target < 0.1 or self.step_count >= self.max_steps
        self.step_count += 1
        
        return self._get_state(), reward, done, {}

## 4. Training Loop

Now let's create the training loop for our policy gradient agent.

In [None]:
def train_policy_gradient_agent(episodes=500):
    """
    Train the policy gradient agent on the robotic arm environment.
    
    Args:
        episodes: Number of training episodes
        
    Returns:
        agent: Trained agent
        episode_rewards: List of rewards for each episode
    """
    env = SimpleRoboticArmEnv(max_steps=100)
    agent = PolicyGradientAgent(
        state_size=env.state_size,
        action_size=env.action_size,
        learning_rate=3e-4,
        gamma=0.99,
        entropy_coef=0.01
    )
    
    episode_rewards = []
    episode_lengths = []
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        done = False
        
        while not done:
            # Select action based on current policy
            action, log_prob, value, entropy = agent.select_action(state)
            
            # Execute action in environment
            next_state, reward, done, _ = env.step(action)
            
            # Store transition
            agent.store_transition(log_prob, value, entropy, reward)
            
            state = next_state
            episode_reward += reward
        
        # Update policy after episode
        agent.update_policy()
        episode_rewards.append(episode_reward)
        episode_lengths.append(env.step_count)
        
        if episode % 50 == 0:
            avg_reward = np.mean(episode_rewards[-50:]) if episode > 0 else episode_reward
            avg_length = np.mean(episode_lengths[-50:]) if episode > 0 else env.step_count
            print(f"Episode {episode}, Average Reward: {avg_reward:.2f}, Average Length: {avg_length:.1f}")
    
    return agent, episode_rewards, episode_lengths

# Train the agent
print("Training the Policy Gradient agent...")
trained_agent, rewards, lengths = train_policy_gradient_agent(episodes=300)
print("Training completed!")

## 5. Visualization of Training Progress

Let's visualize how the agent learned over time.

In [None]:
# Plot training progress
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Episode rewards over time
axes[0, 0].plot(rewards)
axes[0, 0].set_title('Episode Rewards Over Time')
axes[0, 0].set_xlabel('Episode')
axes[0, 0].set_ylabel('Total Reward')
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Smoothed rewards
window_size = 20
if len(rewards) > window_size:
    smoothed_rewards = [np.mean(rewards[i:i+window_size]) 
                       for i in range(len(rewards) - window_size + 1)]
    axes[0, 1].plot(smoothed_rewards, label='Smoothed')
    axes[0, 1].set_title('Smoothed Episode Rewards (Window {})'.format(window_size))
    axes[0, 1].set_xlabel('Episode')
    axes[0, 1].set_ylabel('Average Reward')
    axes[0, 1].grid(True, alpha=0.3)
else:
    axes[0, 1].plot(rewards)
    axes[0, 1].set_title('Episode Rewards')
    axes[0, 1].set_xlabel('Episode')
    axes[0, 1].set_ylabel('Total Reward')
    axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Episode lengths
axes[1, 0].plot(lengths)
axes[1, 0].set_title('Episode Lengths Over Time')
axes[1, 0].set_xlabel('Episode')
axes[1, 0].set_ylabel('Episode Length')
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Final performance histogram
final_rewards = rewards[-50:]  # Last 50 episodes
axes[1, 1].hist(final_rewards, bins=15, edgecolor='black', alpha=0.7)
axes[1, 1].set_title('Distribution of Rewards (Last 50 Episodes)')
axes[1, 1].set_xlabel('Reward')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print final statistics
print(f"\nTraining Summary:")
print(f"Final 50 episodes - Average reward: {np.mean(rewards[-50:]):.2f}")
print(f"Final 50 episodes - Average length: {np.mean(lengths[-50:]):.1f}")
print(f"Overall average reward: {np.mean(rewards):.2f}")

## 6. Testing the Trained Agent

Now let's test the trained agent to see how well it performs.

In [None]:
def test_trained_agent(agent, episodes=5):
    """
    Test the trained agent on the environment.
    
    Args:
        agent: Trained PolicyGradientAgent
        episodes: Number of test episodes
    """
    env = SimpleRoboticArmEnv(max_steps=150)  # Slightly longer max steps for testing
    
    success_count = 0
    
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        steps = 0
        done = False
        trajectory = []  # Store trajectory for visualization
        
        # Store initial position
        ee_x, ee_y = env._get_end_effector_position()
        trajectory.append((ee_x, ee_y))
        
        print(f"\nTest Episode {episode + 1}")
        print(f"Target: ({env.target_x:.2f}, {env.target_y:.2f})")
        print(f"Initial EE: ({ee_x:.2f}, {ee_y:.2f})")
        
        while not done and steps < 150:  # Limit steps for testing
            action, _, _, _ = agent.select_action(state)
            
            state, reward, done, _ = env.step(action)
            
            # Store trajectory position
            ee_x, ee_y = env._get_end_effector_position()
            trajectory.append((ee_x, ee_y))
            
            total_reward += reward
            steps += 1
            
            if steps % 20 == 0:  # Print progress every 20 steps
                dist_to_target = np.sqrt((ee_x - env.target_x)**2 + (ee_y - env.target_y)**2)
                print(f"  Step {steps}: EE at ({ee_x:.2f}, {ee_y:.2f}), "
                      f"Dist to target: {dist_to_target:.3f}, "
                      f"Reward: {reward:.2f}")
        
        final_dist = np.sqrt((ee_x - env.target_x)**2 + (ee_y - env.target_y)**2)
        success = final_dist < 0.1
        if success:
            success_count += 1
        
        print(f"  Final: EE at ({ee_x:.2f}, {ee_y:.2f}), "
              f"Final distance: {final_dist:.3f}, "
              f"Total reward: {total_reward:.2f}")
        print(f"  Success: {'Yes' if success else 'No'}")
        
        # Plot the trajectory
        traj_x, traj_y = zip(*trajectory)
        plt.figure(figsize=(8, 8))
        plt.plot(traj_x, traj_y, 'b-', linewidth=2, label='Robot Path')
        plt.plot(env.target_x, env.target_y, 'go', markersize=15, label='Target')
        plt.plot(trajectory[0][0], trajectory[0][1], 'ro', markersize=10, label='Start')
        plt.plot(trajectory[-1][0], trajectory[-1][1], 'mo', markersize=10, label='End')
        plt.axis('equal')
        plt.grid(True, alpha=0.3)
        plt.title(f'Test Episode {episode + 1} - Success: {"Yes" if success else "No"}')
        plt.xlabel('X Position')
        plt.ylabel('Y Position')
        plt.legend()
        plt.show()
    
    success_rate = success_count / episodes
    print(f"\nOverall Success Rate: {success_rate:.1%} ({success_count}/{episodes})")

# Test the trained agent
print("Testing the trained agent...")
test_trained_agent(trained_agent, episodes=3)

## 7. Summary and Conclusions

In this notebook, we implemented and demonstrated a Policy Gradient controller for robotic systems:

1. We created policy and value networks using PyTorch
2. We implemented a Policy Gradient Agent that learns to control a robotic arm
3. We created a simple robotic arm environment for training and testing
4. We trained the agent and visualized the learning progress
5. We tested the trained agent to see how well it performs

Policy gradient methods are a key approach in reinforcement learning for robotics, allowing robots to learn complex behaviors through trial and error. The approach learns a direct mapping from states to actions (or action probabilities) without needing to model the environment explicitly.