## CMA-ES for CartPole Torch (assignment version)
### Christian Igel, 2024

If you have suggestions for improvement, [let me know](mailto:igel@diku.dk).

You may need the following packages:

``pip install gymnasium[classic-control]``

``python -m pip install cma``

In [1]:
import gymnasium as gym  # Defines RL environments

import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (4,4)  # Set size of visualization
from IPython.display import clear_output  # For inline visualization

import torch
import torch.nn as nn
import torch.nn.functional as F
# Import Env from gymnasium
from gymnasium.core import Env
import cma

# Define task
env = gym.make('CartPole-v1')
state_space_dimension = env.observation_space.shape[0]
action_space_dimension = 1  # env.action_space.n - 1

Define the policy network:

In [2]:
# Model definition
class CartPolePolicyNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, use_bias=True):
        super(CartPolePolicyNet, self).__init__()
        # Define the hidden layer with tanh activation
        self.hidden = nn.Linear(input_size, hidden_size, bias=use_bias)
        self.tanh = nn.Tanh()
        
        # Define the output layer with linear activation
        self.output = nn.Linear(hidden_size, output_size, bias=use_bias)
        
    def forward(self, x):
        x = self.tanh(self.hidden(x))
        x = self.output(x)
        return x
    
input_size = 4  # Cart Position, Cart Velocity, Pole Angle, Pole Angular Velocity
hidden_size = 5  # Number of neurons in the hidden layer
output_size = 1  # Single output for the action spacve {0, 1} / {left, right}

# Create the policy network without bias
policy_net = CartPolePolicyNet(input_size, hidden_size, output_size, use_bias=False)


Compute number of parameters:

In [3]:
d = sum(
	param.numel() for param in policy_net.parameters()
)
print("Number of parameters:", d)

Number of parameters: 25


Helper function for visualization:

In [4]:
def visualize_policy(policy_net):
    env_render = gym.make('CartPole-v1', render_mode='rgb_array')
    state = env_render.reset()  # Forget about previous episode
    state_tensor = torch.Tensor( state[0].reshape((1, state_space_dimension)) )
    steps = 0
    while True:
        out = policy_net(state_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env_render.step(a)  # Simulate pole
        steps+=1
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) )
        clear_output(wait=True)
        plt.imshow(env_render.render())
        plt.show()
        print("step:", steps)
        if(terminated or truncated): 
            break
    env_render.close()
    return

In [5]:
# visualize_policy(policy_net)

Now we define the objective/reward function. 
When the task is solved the functions returns -1000.
One successful trial is sufficient.

In [6]:
def fitness_cart_pole(x, nn, env: Env):
    '''
    Returns negative accumulated reward for single pole, fully environment.

    Parameters:
        x: Parameter vector encoding the weights.
        nn: Parameterized model.
        env: Environment ('CartPole-v?').
    '''
    torch.nn.utils.vector_to_parameters(torch.Tensor(x), nn.parameters())  # Set the policy parameters
    
    state = env.reset()  # Forget about previous episode
    state_tensor = torch.Tensor( state[0].reshape((1, state_space_dimension)) )
          
    R = 0  # Accumulated reward
    while True:
        out = nn(state_tensor)
        a = int(out > 0)
        state, reward, terminated, truncated, _ = env.step(a)  # Simulate pole
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)) )
        R += reward  # Accumulate 
        if truncated:
            return -1000  # Episode ended, final goal reached, we consider minimization
        if terminated:
            return -R  # Episode ended, we consider minimization
    return -R  # Never reached  

Do the learning:

In [7]:
# Generate initial search point and initial hidden RNN states
initial_weights = np.random.normal(0, 0.01, d)  # Random parameters for initial policy, d denotes the number of weights
initial_sigma = .01 # Initial global step-size sigma

# Do the optimization
res = cma.fmin(fitness_cart_pole,  # Objective function
               initial_weights,  # Initial search point
               initial_sigma,  # Initial global step-size sigma
               args=([policy_net, env]),  # Arguments passed to the fitness function
               options={'ftarget': -999.9, 'tolflatfitness':1000, 'eval_final_mean':False})
# env.close()

# Set the policy parameters to the final solution
torch.nn.utils.vector_to_parameters(torch.Tensor(res[0]), policy_net.parameters())  

print("best solution found after", res[2], "evaluations")

(6_w,13)-aCMA-ES (mu_w=4.0,w_1=38%) in dimension 25 (seed=843498, Wed Mar  6 09:18:52 2024)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1     13 -1.000000000000000e+03 1.0e+00 9.29e-03  9e-03  9e-03 0:00.1
termination on ftarget=-999.9 (Wed Mar  6 09:18:52 2024)
final/bestever f-value = -1.000000e+03 -1.000000e+03 after 13/7 evaluations
incumbent solution: [-0.00580179 -0.00733487  0.02178249  0.00094471  0.00414907  0.00630938
 -0.00619232 -0.00623195 ...]
std deviations: [0.0093299  0.00926849 0.00937624 0.00923554 0.0092459  0.00929343
 0.0092678  0.00934622 ...]
best solution found after 7 evaluations


Render solution:

In [9]:
# visualize_policy(policy_net)

Learn more about CMA-ES optimization:

In [None]:
# Visualize the optimization 
# cma.plot();  

In [None]:
# Learn even more on CMA-ES
# cma.CMAOptions() 

In [23]:
def test_balance(policy_net, env, steps_to_balance=500):   
    state = env.reset(options={'low':-0.2, 'high': 0.2}) # Forget about previous episode
    state_tensor = torch.Tensor(state[0].reshape((1, state_space_dimension)))
    for t in range(steps_to_balance):  # Test for 500 steps        
        action_scores = policy_net(state_tensor)
        action = int(action_scores > 0)
        
        state, _, done, truncated, _ = env.step(action)
        state_tensor = torch.Tensor( state.reshape((1, state_space_dimension)))
        
        if done or truncated:
            return t+1
        
    return steps_to_balance  # Successfully balanced for all 500 steps

In [24]:
# Perform the experiment in a single cell

# Parameters
num_runs = 100
steps_to_balance = 500
results = {'with_bias': {'balanced_for': [], 'trained_for': []}, 'without_bias':  {'balanced_for': [], 'trained_for': []}}

for use_bias in [False, True]:
    for run in range(num_runs):
        policy_net = CartPolePolicyNet(input_size, hidden_size, output_size, use_bias=use_bias)
        
        num_params = sum(p.numel() for p in policy_net.parameters())
        
        initial_weights = np.random.normal(0, 0.01, num_params)
        initial_sigma = .01 
        # Do the optimization
        res = cma.fmin(fitness_cart_pole,  # Objective function
                    initial_weights,  # Initial search point
                    initial_sigma,  # Initial global step-size sigma
                    args=([policy_net, env]),  # Arguments passed to the fitness function
                    options={'ftarget': -999.9, 'tolflatfitness':1000, 'eval_final_mean':False})
        env.close()

        print("best solution found after", res[2], "evaluations")

        # balance_steps = visualize_policy(policy_net)
        # visualize_policy(policy_net)
        balance_steps = test_balance(policy_net, env, steps_to_balance)
        results['with_bias' if use_bias else 'without_bias']['balanced_for'].append(balance_steps)
        results['with_bias' if use_bias else 'without_bias']['trained_for'].append(res[2])
        print("Run:", run, "Use bias:", use_bias, "Balance steps:", balance_steps)

(6_w,13)-aCMA-ES (mu_w=4.0,w_1=38%) in dimension 25 (seed=869336, Wed Mar  6 10:05:20 2024)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1     13 -1.000000000000000e+03 1.0e+00 9.37e-03  9e-03  9e-03 0:00.1
termination on ftarget=-999.9 (Wed Mar  6 10:05:20 2024)
final/bestever f-value = -1.000000e+03 -1.000000e+03 after 13/6 evaluations
incumbent solution: [-0.00333103 -0.01300842  0.00977606 -0.00966793 -0.00064404 -0.01057781
 -0.01840609 -0.00592109 ...]
std deviations: [0.00937501 0.00934974 0.00929524 0.00936292 0.00932926 0.00934508
 0.00935556 0.00932207 ...]
best solution found after 6 evaluations
Run: 0 Use bias: False Balance steps: 12
(6_w,13)-aCMA-ES (mu_w=4.0,w_1=38%) in dimension 25 (seed=856429, Wed Mar  6 10:05:20 2024)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1     13 -1.230000000000000e+02 1.0e+00 9.37e-03  9e-03  9e-03 0:00.1
    2     26 -1.070000000000000e+02 1.1e+00 9.19e-03  9e-03  9e-03 0:00.2
  

In [25]:
# Calculate and print average performance
for k, v in results.items():
    average_performance = {k: np.mean(v) for k, v in v.items()}
    for key, val in average_performance.items():
        print(f"{k}, {key}: {val}")

with_bias, balanced_for: 61.6
with_bias, trained_for: 1969.87
without_bias, balanced_for: 61.23
without_bias, trained_for: 24.65
