In [2]:
import gymnasium as gym
gym.__version__

'0.29.0'

In [58]:
env = gym.make("LunarLander-v2")

From [docs](https://gymnasium.farama.org/environments/box2d/lunar_lander/)

## Action Space
There are four discrete actions available:

0: do nothing

1: fire left orientation engine

2: fire main engine

3: fire right orientation engine


In [4]:
env.action_space

Discrete(4)


## Observation Space
The state is an 8-dimensional vector: the coordinates of the lander in x & y, its linear velocities in x & y, its angle, its angular velocity, and two booleans that represent whether each leg is in contact with the ground or not.


In [5]:
print(env.observation_space.shape)
env.observation_space

(8,)


Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32)

## Taking a step

In [6]:
env.reset()
observation, reward, terminated, truncated, info = env.step(0)

## Reinforcement Learning Objective

Our performance measure: 

$$
J(\theta) = \mathbb{E} \left[ \sum_{t=0}^{T-1}r_{t+1} \right]
$$

and our update rule:

$$
\theta \leftarrow \theta + \frac{\partial}{\partial \theta} J(\theta)
$$

The gradient
$$
\nabla J (\theta) = \mathbb{E}_\pi \left[ \sum_a q_\pi(S_t, a) \nabla \pi (a | S_t, \theta) \right]
$$
$$
\nabla J (\theta) = \mathbb{E}_\pi \left[ \sum_a \pi (a | S_t, \theta) q_\pi(S_t, a) \frac{\nabla \pi (a | S_t, \theta)}{\pi (a | S_t, \theta)} \right]
$$

If we sample $A_t \sim \pi$, then we just replace the expectation over $a$ with the sample $A_t$. So we're doing this swap from expectation to sample:
$$
\sum_a \pi (a | S_t, \theta) q_\pi(S_t, a) \rightarrow q_\pi(S_t, A_t)
$$
which then simplifies $\nabla J(\theta)$ to

$$
\nabla J (\theta) = \mathbb{E}_\pi \left[ q_\pi(S_t, A_t) \frac{\nabla \pi (A_t | S_t, \theta)}{\pi (A_t | S_t, \theta)} \right]
$$

and by $\mathbb{E}_\pi [G_t | S_t, A_t] = q_\pi (S_t, A_t)$,

$$
\nabla J (\theta) = \mathbb{E}_\pi \left[ G_t \frac{\nabla \pi (a | S_t, \theta)}{\pi (a | S_t, \theta)} \right]
$$

We're going to make one more simplification: note that $\nabla \ln x = \frac{\nabla x}{x}$, so
$$
\nabla J (\theta) = \mathbb{E}_\pi \left[ G_t \nabla \ln \pi (a | S_t, \theta) \right]
$$

Now, we can actually calculate the value in brackets at each time step, and can then use it to update $\theta$:

$$
\theta_{t+1} = \theta_t + \alpha G_t \nabla \ln \pi(A_t | S_t, \theta_t)
$$

We're now going to generate an episode $S_0, A_0, R_1,...,S_{T-1}, A_{T-1}, R_T$.

In [86]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# initialize policy network
# takes in a state, determines the next action
class PolicyNetwork(nn.Module):
    def __init__(self, shape):
        super().__init__()
        self.w = nn.Linear(shape[0], shape[1])
        self.layer = nn.Sequential(
            self.w,
            nn.ReLU()
        )
        self.optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)

    def forward(self, x):
        logits = self.layer(x)
        return logits
    
    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        logits = self.forward(state).squeeze(0)
        probs = F.softmax(logits, dim=-1)
        action = torch.multinomial(probs, 1)
        return action.item(), logits
    
network_shape = env.observation_space.shape[0], (env.action_space.n - env.action_space.start)
model = PolicyNetwork(network_shape)

Here is a full episode of the moon landing.

In [89]:
import numpy as np

observation, _ = env.reset()
step = 0

observations = []
actions = []
rewards = []
logits = []

while True:
    action, _logits = model.act(observation)
    observation, reward, terminated, truncated, info = env.step(action)

    observations.append(observation)
    actions.append(action)
    rewards.append(reward)
    logits.append(_logits.detach().numpy())

    if terminated:
        observation = env.reset()
        break
    step += 1

observations = torch.tensor(observations, dtype=torch.float)
actions = torch.tensor(actions, dtype=torch.float)
rewards = torch.tensor(rewards, dtype=torch.float)
probs = torch.tensor(np.array(probs).flatten(), dtype=torch.float).reshape(-1, env.action_space.n)

print("Total steps: {}".format(step))

Total steps: 89


Now, at the end of the episode, we need to determine $G$, the rewards after the end of each episode.

In [101]:
def discount_rewards(rewards, gamma):
    discounted_rewards = torch.zeros_like(rewards, requires_grad=True)
    R = 0
    for t in reversed(range(len(rewards))):
        R = R * gamma + rewards[t]
        discounted_rewards[t] = R
    return discounted_rewards

gamma = 0.99
discounted_rewards = discount_rewards(rewards, gamma)

RuntimeError: a view of a leaf Variable that requires grad is being used in an in-place operation.

In [100]:
discounted_rewards.backward()

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn