# Introduction to Reinforcement Learning: Pendulum Demo

[![Open In Colab <](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ShawnHymel/reinforcement-learning-demos/blob/main/rl-demo-pendulum.ipynb)

This tutorial acts as a solution to the inverted pendulum swingup challenge at the end of the Introduction to Reinforcement Learning video. Execute each of the cells and follow the directions. That being said, I highly recommend trying to find the solution yourself prior to reading this solution!

The goal is to apply some amount of torque to a freely swinging pendulum to cause it to swing up and stay in the upright position.

The action space is continuous (theta is the pendulum's angle between [-pi, pi]:

 * Apply an amount of torque in the counter-clockwise direction to the end of the pendulum in Newton-meters between the values [-2.0, 2.0]

The observation space is 3 continuous numerical values:

 0. x cartesian coordinate of the pendulum's end (*=cos(theta)*): [-1.0, 1.0]
 1. y cartesian coordinate of the pendulum's end (*=sin(theta)*): [-1.0, 1.0]
 2. Angular velocity: [-8.0, 8.0]

The reward is calculated based on the angle of the pendulum and the applied torque:

*r = -(theta<sup>2</sup> + 0.1 * theta_dt<sup>2</sup> + 0.001 * torque<sup>2</sup>)*

From this, we find that the minimum reward is -16.2736044 (pendulum in the downward position moving quickly and with maximum torque applied) wheres the maximum reward is 0 (pendulum in the upright position with 0 velocity and no torque applied).

Read more about the pendulum environment [here](https://gymnasium.farama.org/environments/classic_control/pendulum/).

This tutorial uses the *Proximal Policy Optimization* (PPO) algorithm to learn a policy that keeps the pendulum in the upright position. You can read about the Stable Baselines3 implementation and usage of PPO [here](https://stable-baselines3.readthedocs.io/en/master/modules/ppo.html).

In [None]:
!python -m pip install gymnasium==0.28.1
!python -m pip install stable-baselines3[extra]==2.0.0a1

In [None]:
import gymnasium as gym
import stable_baselines3 as sb3
import matplotlib.pyplot as plt
import cv2
import numpy as np

# Check versions
print(f"gym version: {gym.__version__}")
print(f"cv2 version: {cv2.__version__}")

In [None]:
# Create the environment
# https://gymnasium.farama.org/api/env/
env = gym.make('Pendulum-v1', render_mode='rgb_array')

In [None]:
# Reset the pendulum environment
# https://gymnasium.farama.org/environments/classic_control/pendulum/
obs, info = env.reset()
print(obs)
print(info)

In [None]:
# Render the environment (render is not the observation!)
frame = env.render()
print(frame.shape)
plt.imshow(frame)

In [None]:
# View environment's action and observation spaces
# https://gymnasium.farama.org/api/spaces/fundamental/
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"Random observation: {env.observation_space.sample()}")
print(f"Random action: {env.action_space.sample()}")

In [None]:
# Run a few times to see the pole fall
torque = np.array([0.0])
obs, reward, terminated, truncated, info = env.step(torque)
print(obs)
print(reward)
print(terminated)
frame = env.render()
plt.imshow(frame)

In [None]:
# Function that tests the model in the given environment
def test_model(env, model, video=None, msg=None):

    # Reset environment
    obs, info = env.reset()
    frame = env.render()
    ep_len = 0
    ep_rew = 0

    # Run episode until complete
    while True:

        # Provide observation to policy to predict the next action
        action, _ = model.predict(obs)

        # Perform action, update total reward
        obs, reward, terminated, truncated, info = env.step(action)
        ep_rew += reward

        # Record frame to video
        if video:
            frame = env.render()
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            frame = cv2.putText(
                frame,                    # Image
                msg,                      # Text to add
                (10, 25),                 # Origin of text in imagg
                cv2.FONT_HERSHEY_SIMPLEX, # Font
                1,                        # Font scale
                (0, 0, 0,),               # Color
                2,                        # Thickness
                cv2.LINE_AA               # Line type
            )
            video.write(frame)

        # Increase step counter
        ep_len += 1

        # Check to see if episode has ended
        if terminated or truncated:
            return ep_len, ep_rew

In [None]:
# Model that just predicts random actions
class DummyModel():

    # Save environment
    def __init__(self, env):
        self.env = env

    # Always output random action regardless of observation
    def predict(self, obs):
        action = self.env.action_space.sample()
        return action, None

In [None]:
# Recorder settings
FPS = 30
FOURCC = cv2.VideoWriter.fourcc('m', 'p', '4', 'v')
VIDEO_FILENAME = "1-random.mp4"

# Use frame from environment to compute resolution
width = frame.shape[1]
height = frame.shape[0]

# Create recorder
video = cv2.VideoWriter(VIDEO_FILENAME, FOURCC, FPS, (width, height))

# Try running a few episodes with the environment and random actions
dummy_model = DummyModel(env)
for ep in range(5):
    ep_len, ep_rew = test_model(env, dummy_model, video, f"Random, episode {ep}")
    print(f"Episode {ep} | length: {ep_len}, reward: {ep_rew}")

# Close the video writer
video.release()

In [None]:
# Initialize model
# PPO docs: https://stable-baselines3.readthedocs.io/en/master/modules/ppo.html
# Policy networks: https://stable-baselines.readthedocs.io/en/master/modules/policies.html
# Hyperparameters from: https://github.com/DLR-RM/rl-baselines3-zoo/blob/master/hyperparams/ppo.yml
model = sb3.PPO(
    'MlpPolicy',
    env,
    learning_rate=0.001,       # Learning rate of neural network (default: 0.0003)
    n_steps=1024,               # Number of steps per update (default: 2048)
    batch_size=64,              # Minibatch size for NN update (default: 64)
    gamma=0.9,                 # Discount factor (default: 0.99)
    ent_coef=0.0,               # Entropy, how much to explore (default: 0.0)
    use_sde=True,               # Use generalized State Dependent Exploration (default: False)
    sde_sample_freq=4,          # Number of steps before sampling new noise matrix (default -1)
    policy_kwargs={'net_arch': [64, 64]}, # 2 hidden layers, 1 output layer (default: [64, 64])
    verbose=0                   # Print training metrics (default: 0)
)

In [None]:
# Training and testing hyperparameters
NUM_ROUNDS = 20
NUM_TRAINING_STEPS_PER_ROUND = 5000
NUM_TESTS_PER_ROUND = 100
MODEL_FILENAME_BASE = "pendulum-ppo"
VIDEO_FILENAME = "2-training.mp4"

# Create recorder
video = cv2.VideoWriter(VIDEO_FILENAME, FOURCC, FPS, (width, height))

# Train and test the model for a number of rounds
avg_ep_lens = []
avg_ep_rews = []
for rnd in range(NUM_ROUNDS):

    # Train the model
    model.learn(total_timesteps=NUM_TRAINING_STEPS_PER_ROUND)

    # Save the model
    model.save(f"{MODEL_FILENAME_BASE}_{rnd}")

    # Test the model in several episodes
    avg_ep_len = 0
    avg_ep_rew = 0
    for ep in range(NUM_TESTS_PER_ROUND):

        # Only record the first test
        if ep == 0:
            ep_len, ep_rew = test_model(env, model, video, f"Round {rnd}")
        else:
            ep_len, ep_rew = test_model(env, model)

        # Accumulate average length and reward
        avg_ep_len += ep_len
        avg_ep_rew += ep_rew

    # Record and dieplay average episode length and reward
    avg_ep_len /= NUM_TESTS_PER_ROUND
    avg_ep_lens.append(avg_ep_len)
    avg_ep_rew /= NUM_TESTS_PER_ROUND
    avg_ep_rews.append(avg_ep_rew)
    print(f"Round {rnd} | average test length: {avg_ep_len}, average test reward: {avg_ep_rew}")

# Close the video writer
video.release()

In [None]:
# Plot average test episode lengths and rewards for each round
fig, axs = plt.subplots(1, 2)
axs[0].plot(avg_ep_lens)
axs[1].plot(avg_ep_rews)

In [None]:
# Model and video settings
MODEL_FILENAME = "pendulum-ppo_15"
VIDEO_FILENAME = "3-testing.mp4"

# Load the model
model = sb3.PPO.load(MODEL_FILENAME)

# Create recorder
video = cv2.VideoWriter(VIDEO_FILENAME, FOURCC, FPS, (width, height))

# Test the model
ep_len, ep_rew = test_model(env, model, video, MODEL_FILENAME)
print(f"Episode length: {ep_len}, reward: {ep_rew}")

# Close the video writer
video.release()

In [None]:
# We're done with the environment
env.close()