# A3C Implementation

## A3C Algorithm Demo with CartPole in Gymnasium

Implementation of Asynchronous Advantage Actor-Critic (A3C) algorithm to solve CartPole-v1 environment.

**Objective**: Balance the pole as long as possible while keeping the cart within boundaries.

### Lib imports

In [10]:
%pip install gymnasium torch numpy matplotlib pygame

Defaulting to user installation because normal site-packages is not writeable
Collecting pygame
  Using cached pygame-2.6.1-cp313-cp313-win_amd64.whl.metadata (13 kB)
Downloading pygame-2.6.1-cp313-cp313-win_amd64.whl (10.6 MB)
   ---------------------------------------- 0.0/10.6 MB ? eta -:--:--
   - -------------------------------------- 0.5/10.6 MB 4.6 MB/s eta 0:00:03
   ---- ----------------------------------- 1.3/10.6 MB 4.0 MB/s eta 0:00:03
   --------- ------------------------------ 2.6/10.6 MB 4.7 MB/s eta 0:00:02
   ------------- -------------------------- 3.7/10.6 MB 5.1 MB/s eta 0:00:02
   ------------------ --------------------- 5.0/10.6 MB 5.4 MB/s eta 0:00:02
   -------------------- ------------------- 5.5/10.6 MB 5.0 MB/s eta 0:00:02
   ------------------------ --------------- 6.6/10.6 MB 4.8 MB/s eta 0:00:01
   ------------------------------ --------- 8.1/10.6 MB 5.2 MB/s eta 0:00:01
   ----------------------------------- ---- 9.4/10.6 MB 5.4 MB/s eta 0:00:01
   ------

In [3]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output

#### Seed

In [None]:
torch.manual_seed(42)
np.random.seed(42)

### Neural Network Architecture
Actor-Critic network with:
- Shared layers for feature extraction
- Actor head for action policy
- Critic head for state value estimation

In [4]:
class PolicyNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_size=128):
        super().__init__()
        
        self.shared = nn.Sequential(
            nn.Linear(state_dim, hidden_size),
            nn.ReLU()
        )
        
        self.actor = nn.Linear(hidden_size, action_dim)
        self.critic = nn.Linear(hidden_size, 1)
        
        # Xavier initialization for stability
        for layer in self.modules():
            if isinstance(layer, nn.Linear):
                nn.init.xavier_normal_(layer.weight)

    def forward(self, state):
        shared_out = self.shared(state)
        action_probs = torch.softmax(self.actor(shared_out), dim=-1)
        state_value = self.critic(shared_out).squeeze()
        return action_probs, state_value

### Setup

In [5]:
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

In [6]:
# Hyperparameters
LR = 0.001
GAMMA = 0.99
ENTROPY_COEFF = 0.01
EPISODES = 300

# Initialize components
policy = PolicyNetwork(state_dim, action_dim)
optimizer = optim.Adam(policy.parameters(), lr=LR)

# Reward tracking
episode_rewards = []
losses = []

### 5. Training Function
Episode training process including:
1. Experience collection
2. Advantage and return calculations
3. Model parameter updates

In [7]:
def train_episode():
    state, _ = env.reset()
    transitions = []
    
    # Experience collection
    while True:
        state_tensor = torch.FloatTensor(state)
        
        with torch.no_grad():
            action_probs, value = policy(state_tensor)
            
        action = torch.multinomial(action_probs, 1).item()
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        
        transitions.append({
            'state': state,
            'action': action,
            'reward': reward,
            'done': done,
            'value': value
        })
        
        state = next_state
        if done:
            break
    
    # Calculate discounted returns
    rewards = [t['reward'] for t in transitions]
    dones = [t['done'] for t in transitions]
    
    returns = []
    R = 0
    for r, done in zip(reversed(rewards), reversed(dones)):
        R = r + GAMMA * R * (not done)
        returns.insert(0, R)
    
    # Prepare training data
    states = torch.FloatTensor(np.array([t['state'] for t in transitions]))
    actions = torch.LongTensor([t['action'] for t in transitions])
    returns = torch.FloatTensor(returns)
    
    # Calculate losses
    optimizer.zero_grad()
    
    action_probs, values = policy(states)
    advantages = returns - values.detach()
    
    # Policy Loss
    log_probs = torch.log(action_probs.gather(1, actions.unsqueeze(1)))
    policy_loss = -(log_probs.squeeze() * advantages).mean()
    
    # Value Loss
    value_loss = 0.5 * (returns - values).pow(2).mean()
    
    # Entropy regularization
    entropy = -(action_probs * torch.log(action_probs)).sum(dim=1).mean()
    
    # Total loss
    total_loss = policy_loss + value_loss - ENTROPY_COEFF * entropy
    
    # Backpropagation
    total_loss.backward()
    optimizer.step()
    
    return sum(rewards), total_loss.item()

### 6. Model Training
Training process with live progress visualization

In [8]:
# Configure live plotting
plt.figure(figsize=(12, 6))
ax1 = plt.subplot(121)
ax2 = plt.subplot(122)

for episode in range(EPISODES):
    reward, loss = train_episode()
    episode_rewards.append(reward)
    losses.append(loss)
    
    # Update plots every 10 episodes
    if (episode + 1) % 10 == 0:
        clear_output(wait=True)
        
        # Reward plot
        ax1.clear()
        ax1.plot(episode_rewards, label='Episode reward')
        ax1.plot(np.convolve(episode_rewards, np.ones(10)/10, mode='valid'), 
                label='Moving average (10)')
        ax1.set_title("Training Progress")
        ax1.set_xlabel("Episode")
        ax1.set_ylabel("Reward")
        ax1.legend()
        ax1.grid(True)
        
        # Loss plot
        ax2.clear()
        ax2.plot(losses)
        ax2.set_title("Loss Evolution")
        ax2.set_xlabel("Update step")
        ax2.set_ylabel("Total loss")
        ax2.grid(True)
        
        plt.tight_layout()
        plt.show()
        
        print(f"Episode {episode+1}/{EPISODES}")
        print(f"Last reward: {reward}")
        print(f"Last 10 avg: {np.mean(episode_rewards[-10:])}")

<Figure size 640x480 with 0 Axes>

Episode 300/300
Last reward: 63.0
Last 10 avg: 56.0


### 7. Trained Model Demonstration
Visualization of learned behavior

In [13]:
import time

def run_demonstration(num_episodes=3, fps=30):
    """Run demonstration with adjustable speed and multiple episodes
    
    Args:
        num_episodes (int): Number of episodes to demonstrate
        fps (int): Frames per second (controls animation speed)
    """
    test_env = gym.make("CartPole-v1", render_mode="human")
    episode_counter = 0
    frame_delay = 1.0 / fps  # Calculate delay between frames
    
    try:
        while episode_counter < num_episodes:
            state, _ = test_env.reset()
            episode_reward = 0
            terminated = truncated = False
            
            while not (terminated or truncated):
                start_time = time.time()
                
                # Get action from policy
                with torch.no_grad():
                    action_probs, _ = policy(torch.FloatTensor(state))
                action = torch.argmax(action_probs).item()
                
                # Step environment
                state, reward, terminated, truncated, _ = test_env.step(action)
                episode_reward += reward
                
                # Maintain frame rate
                elapsed = time.time() - start_time
                remaining_delay = frame_delay - elapsed
                if remaining_delay > 0:
                    time.sleep(remaining_delay)
            
            print(f"Episode {episode_counter+1}:")
            print(f"  Duration: {episode_reward} frames")
            print(f"  {'Success' if episode_reward >= 195 else 'Failure'}")
            print("-" * 40)
            
            episode_counter += 1
            
            # Pause between episodes
            if episode_counter < num_episodes:
                print("Next episode starting in 2 seconds...")
                time.sleep(2)
    
    finally:
        test_env.close()
        print("Demonstration completed. Environment closed.")

# Run demonstration with parameters
run_demonstration(
    num_episodes=5,  # Number of episodes to show
    fps=25           # Animation speed (typical movie frame rate)
)

Episode 1:
  Duration: 69.0 frames
  Failure
----------------------------------------
Next episode starting in 2 seconds...
Episode 2:
  Duration: 57.0 frames
  Failure
----------------------------------------
Next episode starting in 2 seconds...
Episode 3:
  Duration: 39.0 frames
  Failure
----------------------------------------
Next episode starting in 2 seconds...
Episode 4:
  Duration: 51.0 frames
  Failure
----------------------------------------
Next episode starting in 2 seconds...
Episode 5:
  Duration: 51.0 frames
  Failure
----------------------------------------
Demonstration completed. Environment closed.


### 8. Results Analysis
- Maximum possible reward: 500
- Model should achieve rewards > 400 after ~200 episodes
- Loss should stabilize when model converges

**Key Components:**
- Advantage calculation reduces variance
- Entropy regularization maintains exploration
- Shared network features improve sample efficiency

**References:**
1. [Policy Gradient Algorithms - Lilian Weng](https://lilianweng.github.io/posts/2018-04-08-policy-gradient/)
2. Mnih et al. (2016). Asynchronous Methods for Deep Reinforcement Learning
3. Schulman et al. (2015). Trust Region Policy Optimization