In [2]:
pip install box2d

Collecting box2d
  Using cached Box2D-2.3.2.tar.gz (427 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: box2d
  Building wheel for box2d (setup.py): started
  Building wheel for box2d (setup.py): finished with status 'error'
  Running setup.py clean for box2d
Failed to build box2d
Installing collected packages: box2d
  Running setup.py install for box2d: started
  Running setup.py install for box2d: finished with status 'error'
Note: you may need to restart the kernel to use updated packages.


  error: subprocess-exited-with-error
  
  × python setup.py bdist_wheel did not run successfully.
  │ exit code: 1
  ╰─> [28 lines of output]
      Using setuptools (version 67.6.0).
      running bdist_wheel
      running build
      running build_py
      creating build
      creating build\lib.win-amd64-cpython-310
      creating build\lib.win-amd64-cpython-310\Box2D
      copying library\Box2D\Box2D.py -> build\lib.win-amd64-cpython-310\Box2D
      copying library\Box2D\__init__.py -> build\lib.win-amd64-cpython-310\Box2D
      creating build\lib.win-amd64-cpython-310\Box2D\b2
      copying library\Box2D\b2\__init__.py -> build\lib.win-amd64-cpython-310\Box2D\b2
      running build_ext
      building 'Box2D._Box2D' extension
      swigging Box2D\Box2D.i to Box2D\Box2D_wrap.cpp
      swig.exe -python -c++ -IBox2D -small -O -includeall -ignoremissing -w201 -globals b2Globals -outdir library\Box2D -keyword -w511 -D_SWIG_KWARGS -o Box2D\Box2D_wrap.cpp Box2D\Box2D.i
      error: Micros

In [4]:
pip install gymnasium[box2d]

Collecting pygame==2.1.3.dev8
  Using cached pygame-2.1.3.dev8-cp310-cp310-win_amd64.whl (10.6 MB)
Collecting box2d-py==2.3.5
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py): started
  Building wheel for box2d-py (setup.py): finished with status 'error'
  Running setup.py clean for box2d-py
Failed to build box2d-py
Installing collected packages: box2d-py, pygame
  Running setup.py install for box2d-py: started
  Running setup.py install for box2d-py: finished with status 'error'
Note: you may need to restart the kernel to use updated packages.


  error: subprocess-exited-with-error
  
  × python setup.py bdist_wheel did not run successfully.
  │ exit code: 1
  ╰─> [28 lines of output]
      Using setuptools (version 67.6.0).
      running bdist_wheel
      running build
      running build_py
      creating build
      creating build\lib.win-amd64-cpython-310
      creating build\lib.win-amd64-cpython-310\Box2D
      copying library\Box2D\Box2D.py -> build\lib.win-amd64-cpython-310\Box2D
      copying library\Box2D\__init__.py -> build\lib.win-amd64-cpython-310\Box2D
      creating build\lib.win-amd64-cpython-310\Box2D\b2
      copying library\Box2D\b2\__init__.py -> build\lib.win-amd64-cpython-310\Box2D\b2
      running build_ext
      building 'Box2D._Box2D' extension
      swigging Box2D\Box2D.i to Box2D\Box2D_wrap.cpp
      swig.exe -python -c++ -IBox2D -small -O -includeall -ignoremissing -w201 -globals b2Globals -outdir library\Box2D -keyword -w511 -D_SWIG_KWARGS -o Box2D\Box2D_wrap.cpp Box2D\Box2D.i
      error: Micros

## Lunar Lander with REINFORCE
### Christian Igel, 2023

If you have suggestions for improvements, [let me know](mailto:igel@diku.dk).

Imports:

In [9]:
import gymnasium as gym

from tqdm.notebook import tqdm, trange  # Progress bar

import numpy as np
import matplotlib.pyplot as plt

We need [the `gymnasium` package](https://gymnasium.farama.org/).
From this package, we create the Cart-Pole game environment:

In [6]:
env_visual = gym.make('LunarLander-v2', render_mode="human")
action_size = 4
state_size = 8

DependencyNotInstalled: Box2D is not installed, run `pip install gymnasium[box2d]`

Let's just test the environment first:

In [None]:
test_episodes = 5
for _ in range(test_episodes):
    R = 0
    state, _ = env_visual.reset()  # Environment starts in a random state, cart and pole are moving
    print("initial state:", state)
    while True:  # Environment sets "truncated" to true after 500 steps 
        # Uncomment the line below to watch the simulation
        env_visual.render()
        state, reward, terminated, truncated, _ = env_visual.step(env_visual.action_space.sample()) #  Take a random action
        R += reward  # Accumulate reward
        if terminated or truncated:
            print("return: ", R)
            env_visual.reset()
            break

## REINFORCE

Let's define a policy class for a simple softmax policy for real-valued feature vectors and discrete actions.
The preference for an action is just a linear function of the input features.
It is not trivial that this simple policy is powerful enough to solve the tasks without addional processing of the input features. However, it is indeed possible to get reasonable policies in this setting.

In [None]:
class Softmax_policy:
    def __init__(self, no_actions, no_features):
        """
        Initialize softmax policy for discrete actions
        :param no_actions: number of actions
        :param no_features: dimensionality of feature vector representing a state
        """        
        self.no_actions = no_actions
        self.no_features = no_features

        # Initialize policy parameters to zero
        self.theta = np.zeros([no_actions, no_features])
        
    def pi(self, s):
        """
        Compute action probabilities in a given state
        :param s: state feature vector
        :return: an array of action probabilities
        """
        # Compute action preferences for the given feature vector
        preferences = self.theta.dot(s)
        # Convert overflows to underflows
        preferences = preferences - preferences.max()
        # Convert the preferences into probabilities
        exp_prefs = np.exp(preferences)
        return exp_prefs / np.sum(exp_prefs)
    
    def inc(self, delta):
        """
        Change the parameters by addition, e.g. for initialization or parameter updates 
        :param delta: values to be added to parameters
        """
        self.theta += delta

    def sample_action(self, s):
        """
        Sample an action in a given state
        :param s: state feature vector
        :return: action
        """
        return np.random.choice(self.no_actions, p=self.pi(s))

    def gradient_log_pi(self, s, a):
        """
        Computes the gradient of the logarithm of the policy
        :param s: state feature vector
        :param a: action
        :return: gradient of the logarithm of the policy
        """
        return 0

    def gradient_log_pi_test(self, s, a, eps=0.1):
        """
        Numerically approximates the gradient of the logarithm of the policy
        :param s: state feature vector
        :param a: action
        :return: approximate gradient of the logarithm of the policy
        """
        theta_correct = np.copy(self.theta)
        log_pi = np.log(self.pi(s)[a])
        d = np.zeros([self.no_actions, self.no_features])
        for i in range(self.no_actions):
            for j in range(self.no_features):
                self.theta[i,j] += eps
                log_pi_eps = np.log(self.pi(s)[a])
                d[i,j] = (log_pi_eps - log_pi) / eps
                self.theta = np.copy(theta_correct)
        return d
  
    
    

Verify gradient implementation:

In [None]:
env = gym.make('LunarLander-v2')
s = env.reset()[0]
pi = Softmax_policy(action_size, state_size)
tolerance = 0.001  # Absolute tolerance for difference in each gradient component
epsilon = 0.0001
for _ in range(10):
    pi.inc(10.*np.random.rand(action_size, state_size))
    for a in range(action_size):
        if not np.isclose(pi.gradient_log_pi(s, a), pi.gradient_log_pi_test(s, a, epsilon), atol=tolerance).all():
            print("derivative test for action", a)
            print(pi.gradient_log_pi(s, 0))
            print(pi.gradient_log_pi_test(s, 0))

Do the learning:

In [None]:
alpha = 0.00005  # Learning rate

no_episodes = 20000  # Number of episodes
total_reward_list = []  # Returns for the individual episodes
pi = Softmax_policy(action_size, state_size)  # Policy

# Do the learning
for e in trange(no_episodes):  #  Loop over episodes
    R = []  # Store rewards r_1, ..., r_T
    S = []  # Store actions a_0, ..., a_{T-1}
    A = []  # Store states s_0, ..., s_{T-1}
    state = env.reset()[0]  # Environment starts in a random state, cart and pole are moving
    while True:  # Environment sets "done" to true after 200 steps 
        S.append(state)
        
        action = pi.sample_action(state)  # Take an action following pi
        A.append(action)
        
        state, reward, terminated, truncated, _ = env.step(action)  # Observe reward and new state
        R.append(reward)
                
        if terminated or truncated:  # Failed or succeeded?
            break
            
    R = np.array(R)
    total_reward_list.append((e, R.sum()))
    
    for t in range(R.size):
        R_t = R[t:].sum()  # Accumulated future reward
        Delta = alpha * R_t * pi.gradient_log_pi(S[t], A[t])  # REINFORCE update
        pi.inc(Delta)  # Apply update
    

Plot learning process:

In [None]:
# Moving average for smoothing plot
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, x[0]*np.ones(N)))
    return (cumsum[N:] - cumsum[:-N]) / N

eps, rews = np.array(total_reward_list).T
smoothed_rews = running_mean(rews, 10)
plt.plot(eps, smoothed_rews)
plt.plot(eps, rews, color='grey', alpha=0.3)
plt.xlabel('Episode')
plt.ylabel('Accumulated Reward');

Visualize policy:

In [None]:
state = env_visual.reset()[0]  # Environment starts in a random state, cart and pole are moving
R = 0
while True:  # Environment sets "truncated" to true after 500 steps 
        env_visual.render()
        state, reward, terminated, truncated, _ = env_visual.step( pi.sample_action(state) ) #  Take a  action
        R += reward  # Accumulate reward
        if terminated or truncated:
            print("return: ", R)
            break