# Training RL to do Cartpole Balancing

This notebook is part of the [AI for Beginners Curriculum](http://aka.ms/ai-beginners). It has been inspired by the [official PyTorch tutorial](https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html) and [this Cartpole PyTorch implementation](https://github.com/yc930401/Actor-Critic-pytorch).

In this example, we will use RL to train a model to balance a pole on a cart that can move left and right along a horizontal axis. We will use the [OpenAI Gym](https://www.gymlibrary.ml/) environment to simulate the pole.

> **Note**: You can run the code for this lesson locally (e.g., from Visual Studio Code), in which case the simulation will open in a new window. If you are running the code online, you may need to make some adjustments to the code, as described [here](https://towardsdatascience.com/rendering-openai-gym-envs-on-binder-and-google-colab-536f99391cc7).

We will start by ensuring that Gym is installed:


In [None]:
import sys
!{sys.executable} -m pip install gym

Now let's create the CartPole environment and see how to interact with it. An environment has the following properties:

* **Action space** is the set of possible actions we can take at each step of the simulation.
* **Observation space** is the range of observations we can perceive.


In [None]:
import gym

env = gym.make("CartPole-v1")

print(f"Action space: {env.action_space}")
print(f"Observation space: {env.observation_space}")

Let's see how the simulation works. The following loop runs the simulation until `env.step` returns the termination flag `done`. We will randomly select actions using `env.action_space.sample()`, which means the experiment will likely fail very quickly (the CartPole environment ends when the speed, position, or angle of the CartPole exceed certain limits).

> The simulation will open in a new window. You can run the code multiple times to observe its behavior.


In [None]:
env.reset()

done = False
total_reward = 0
while not done:
   env.render()
   obs, rew, done, info = env.step(env.action_space.sample())
   total_reward += rew
   print(f"{obs} -> {rew}")
print(f"Total reward: {total_reward}")

You can observe that the observations consist of 4 numbers. They are:
- Position of the cart
- Velocity of the cart
- Angle of the pole
- Rotation rate of the pole

`rew` represents the reward received at each step. In the CartPole environment, you earn 1 point for every simulation step, and the objective is to maximize the total reward, which means keeping the CartPole balanced for as long as possible without it falling.

In reinforcement learning, the goal is to train a **policy** $\pi$, which determines the action $a$ to take for each state $s$. Essentially, $a = \pi(s)$.

If you prefer a probabilistic approach, you can think of the policy as providing a set of probabilities for each action. For example, $\pi(a|s)$ would represent the probability of taking action $a$ in state $s$.

## Policy Gradient Method

In the simplest reinforcement learning algorithm, known as **Policy Gradient**, we train a neural network to predict the next action.


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch

num_inputs = 4
num_actions = 2

model = torch.nn.Sequential(
    torch.nn.Linear(num_inputs, 128, bias=False, dtype=torch.float32),
    torch.nn.ReLU(),
    torch.nn.Linear(128, num_actions, bias = False, dtype=torch.float32),
    torch.nn.Softmax(dim=1)
)

We will train the network by running many experiments, and updating our network after each run. Let's define a function that will run the experiment and return the results (so-called **trace**) - all states, actions (and their recommended probabilities), and rewards:


In [None]:
def run_episode(max_steps_per_episode = 10000,render=False):    
    states, actions, probs, rewards = [],[],[],[]
    state = env.reset()
    for _ in range(max_steps_per_episode):
        if render:
            env.render()
        action_probs = model(torch.from_numpy(np.expand_dims(state,0)))[0]
        action = np.random.choice(num_actions, p=np.squeeze(action_probs.detach().numpy()))
        nstate, reward, done, info = env.step(action)
        if done:
            break
        states.append(state)
        actions.append(action)
        probs.append(action_probs.detach().numpy())
        rewards.append(reward)
        state = nstate
    return np.vstack(states), np.vstack(actions), np.vstack(probs), np.vstack(rewards)

You can run one episode with untrained network and observe that total reward (AKA length of episode) is very low:


In [None]:
s, a, p, r = run_episode()
print(f"Total reward: {np.sum(r)}")

One of the tricky aspects of policy gradient algorithm is to use **discounted rewards**. The idea is that we compute the vector of total rewards at each step of the game, and during this process we discount the early rewards using some coefficient $gamma$. We also normalize the resulting vector, because we will use it as weight to affect our training:


In [None]:
eps = 0.0001

def discounted_rewards(rewards,gamma=0.99,normalize=True):
    ret = []
    s = 0
    for r in rewards[::-1]:
        s = r + gamma * s
        ret.insert(0, s)
    if normalize:
        ret = (ret-np.mean(ret))/(np.std(ret)+eps)
    return ret

Now let's do the actual training! We will run 300 episodes, and in each episode, we will do the following:

1. Execute the experiment and gather the trace.
2. Compute the difference (`gradients`) between the actions taken and the predicted probabilities. The smaller the difference, the more confident we are that the correct action was chosen.
3. Calculate discounted rewards and multiply the gradients by these discounted rewards. This ensures that steps with higher rewards have a greater impact on the final result compared to those with lower rewards.
4. The expected target actions for our neural network will be derived partly from the predicted probabilities during the run and partly from the calculated gradients. The `alpha` parameter will determine the extent to which gradients and rewards are considered—this is referred to as the *learning rate* of the reinforcement algorithm.
5. Finally, we train our network using the states and expected actions, and then repeat the process.


In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

def train_on_batch(x, y):
    x = torch.from_numpy(x)
    y = torch.from_numpy(y)
    optimizer.zero_grad()
    predictions = model(x)
    loss = -torch.mean(torch.log(predictions) * y)
    loss.backward()
    optimizer.step()
    return loss

In [None]:
alpha = 1e-4

history = []
for epoch in range(300):
    states, actions, probs, rewards = run_episode()
    one_hot_actions = np.eye(2)[actions.T][0]
    gradients = one_hot_actions-probs
    dr = discounted_rewards(rewards)
    gradients *= dr
    target = alpha*np.vstack([gradients])+probs
    train_on_batch(states,target)
    history.append(np.sum(rewards))
    if epoch%100==0:
        print(f"{epoch} -> {np.sum(rewards)}")

plt.plot(history)

Now let's run the episode with rendering to see the result:


In [None]:
_ = run_episode(render=True)

Hopefully, you can see that the pole can now balance quite well!

## Actor-Critic Model

The Actor-Critic model is an advancement of policy gradients, where we design a neural network to learn both the policy and the estimated rewards. This network will have two outputs (or you can think of it as two separate networks):
* **Actor** suggests the action to take by providing the probability distribution of states, similar to the policy gradient model.
* **Critic** estimates the potential reward from those actions. It outputs the total estimated future rewards for the given state.

Let's define such a model:


In [None]:
from itertools import count
import torch.nn.functional as F

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("CartPole-v1")

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
lr = 0.0001

class Actor(torch.nn.Module):
    def __init__(self, state_size, action_size):
        super(Actor, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = torch.nn.Linear(self.state_size, 128)
        self.linear2 = torch.nn.Linear(128, 256)
        self.linear3 = torch.nn.Linear(256, self.action_size)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        output = self.linear3(output)
        distribution = torch.distributions.Categorical(F.softmax(output, dim=-1))
        return distribution


class Critic(torch.nn.Module):
    def __init__(self, state_size, action_size):
        super(Critic, self).__init__()
        self.state_size = state_size
        self.action_size = action_size
        self.linear1 = torch.nn.Linear(self.state_size, 128)
        self.linear2 = torch.nn.Linear(128, 256)
        self.linear3 = torch.nn.Linear(256, 1)

    def forward(self, state):
        output = F.relu(self.linear1(state))
        output = F.relu(self.linear2(output))
        value = self.linear3(output)
        return value

We would need to slightly modify our `discounted_rewards` and `run_episode` functions:


In [None]:
def discounted_rewards(next_value, rewards, masks, gamma=0.99):
    R = next_value
    returns = []
    for step in reversed(range(len(rewards))):
        R = rewards[step] + gamma * R * masks[step]
        returns.insert(0, R)
    return returns

def run_episode(actor, critic, n_iters):
    optimizerA = torch.optim.Adam(actor.parameters())
    optimizerC = torch.optim.Adam(critic.parameters())
    for iter in range(n_iters):
        state = env.reset()
        log_probs = []
        values = []
        rewards = []
        masks = []
        entropy = 0
        env.reset()

        for i in count():
            env.render()
            state = torch.FloatTensor(state).to(device)
            dist, value = actor(state), critic(state)

            action = dist.sample()
            next_state, reward, done, _ = env.step(action.cpu().numpy())

            log_prob = dist.log_prob(action).unsqueeze(0)
            entropy += dist.entropy().mean()

            log_probs.append(log_prob)
            values.append(value)
            rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
            masks.append(torch.tensor([1-done], dtype=torch.float, device=device))

            state = next_state

            if done:
                print('Iteration: {}, Score: {}'.format(iter, i))
                break


        next_state = torch.FloatTensor(next_state).to(device)
        next_value = critic(next_state)
        returns = discounted_rewards(next_value, rewards, masks)

        log_probs = torch.cat(log_probs)
        returns = torch.cat(returns).detach()
        values = torch.cat(values)

        advantage = returns - values

        actor_loss = -(log_probs * advantage.detach()).mean()
        critic_loss = advantage.pow(2).mean()

        optimizerA.zero_grad()
        optimizerC.zero_grad()
        actor_loss.backward()
        critic_loss.backward()
        optimizerA.step()
        optimizerC.step()


Now we will run the main training loop. We will use manual network training process by computing proper loss functions and updating network parameters:


In [None]:

actor = Actor(state_size, action_size).to(device)
critic = Critic(state_size, action_size).to(device)
run_episode(actor, critic, n_iters=100)

In [None]:
env.close()

## Key Points

In this demo, we explored two reinforcement learning algorithms: the simple policy gradient and the more advanced actor-critic. These algorithms work with abstract concepts like state, action, and reward, which makes them adaptable to a wide range of environments.

Reinforcement learning enables us to discover the optimal strategy for solving a problem solely by observing the final reward. The advantage of not requiring labeled datasets allows us to run simulations repeatedly to refine our models. However, RL still presents numerous challenges, which you can delve into further if you choose to specialize in this fascinating field of AI.



---

**Disclaimer**:  
This document has been translated using the AI translation service [Co-op Translator](https://github.com/Azure/co-op-translator). While we aim for accuracy, please note that automated translations may include errors or inaccuracies. The original document in its native language should be regarded as the authoritative source. For critical information, professional human translation is advised. We are not responsible for any misunderstandings or misinterpretations resulting from the use of this translation.
