# Pendulum
## Description

The inverted pendulum swingup problem is based on the classic problem in control theory.
The system consists of a pendulum attached at one end to a fixed point, and the other end being free.
The pendulum starts in a random position and the goal is to apply torque on the free end to swing it
into an upright position, with its center of gravity right above the fixed point.

The diagram below specifies the coordinate system used for the implementation of the pendulum's
dynamic equations.

![Pendulum Coordinate System](https://gymnasium.farama.org/_images/pendulum.png)

-  `x-y`: cartesian coordinates of the pendulum's end in meters.
- `theta` : angle in radians.
- `tau`: torque in `N m`. Defined as positive _counter-clockwise_.

### Action Space

The action is a `ndarray` with shape `(1,)` representing the torque applied to free end of the pendulum.

| Num | Action | Min  | Max |
|-----|--------|------|-----|
| 0   | Torque | -2.0 | 2.0 |


### Observation Space

The observation is a `ndarray` with shape `(3,)` representing the x-y coordinates of the pendulum's free
end and its angular velocity.

| Num | Observation      | Min  | Max |
|-----|------------------|------|-----|
| 0   | x = cos(theta)   | -1.0 | 1.0 |
| 1   | y = sin(theta)   | -1.0 | 1.0 |
| 2   | Angular Velocity | -8.0 | 8.0 |

### Rewards

The reward function is defined as:

*r = -(theta<sup>2</sup> + 0.1 * theta_dt<sup>2</sup> + 0.001 * torque<sup>2</sup>)*

where `$\theta$` is the pendulum's angle normalized between *[-pi, pi]* (with 0 being in the upright position).
Based on the above equation, the minimum reward that can be obtained is
*-(pi<sup>2</sup> + 0.1 * 8<sup>2</sup> + 0.001 * 2<sup>2</sup>) = -16.2736044*,
while the maximum reward is zero (pendulum is upright with zero velocity and no torque applied).

### Starting State

The starting state is a random angle in *[-pi, pi]* and a random angular velocity in *[-1,1]*.

### Episode Truncation

The episode truncates at 200 time steps.

### Arguments

- `g`: acceleration of gravity measured in *(m s<sup>-2</sup>)* used to calculate the pendulum dynamics.
    The default value is g = 10.0 .

```python
import gymnasium as gym
gym.make('Pendulum-v1', g=9.81)
```

On reset, the `options` parameter allows the user to change the bounds used to determine
the new random state.

### Version History

* v1: Simplify the math equations, no difference in behavior.
* v0: Initial versions release (1.0.0)


In [1]:
import gymnasium as gym
from gymnasium.envs.classic_control.pendulum import PendulumEnv
for k,v in gym.envs.registry.items():
    print(str(k)+"\t"+str(v))

CartPole-v0	EnvSpec(id='CartPole-v0', entry_point='gymnasium.envs.classic_control.cartpole:CartPoleEnv', reward_threshold=195.0, nondeterministic=False, max_episode_steps=200, order_enforce=True, autoreset=False, disable_env_checker=False, apply_api_compatibility=False, kwargs={}, namespace=None, name='CartPole', version=0, additional_wrappers=(), vector_entry_point='gymnasium.envs.classic_control.cartpole:CartPoleVectorEnv')
CartPole-v1	EnvSpec(id='CartPole-v1', entry_point='gymnasium.envs.classic_control.cartpole:CartPoleEnv', reward_threshold=475.0, nondeterministic=False, max_episode_steps=500, order_enforce=True, autoreset=False, disable_env_checker=False, apply_api_compatibility=False, kwargs={}, namespace=None, name='CartPole', version=1, additional_wrappers=(), vector_entry_point='gymnasium.envs.classic_control.cartpole:CartPoleVectorEnv')
MountainCar-v0	EnvSpec(id='MountainCar-v0', entry_point='gymnasium.envs.classic_control.mountain_car:MountainCarEnv', reward_threshold=-110.

In [2]:
env=gym.make("Pendulum-v1")
print("Observation space: ",env.observation_space)
print("Action space: ",env.action_space)

Observation space:  Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
Action space:  Box(-2.0, 2.0, (1,), float32)


In [3]:
import torch
import torch.nn as nn
class Actor(nn.Module):
    def __init__(self,num_observations,num_actions,action_bound) -> None:
        super().__init__()
        self.num_hidden=128
        self.linear_1=nn.Linear(num_observations,self.num_hidden)
        self.selu=nn.SELU()
        self.linear_2=nn.Linear(self.num_hidden,num_actions)
        self.tanh=nn.Tanh()
        self.action_bound=action_bound
    
    def forward(self,x:torch.Tensor)->torch.Tensor:
        x=self.linear_1(x)
        x=self.selu(x)
        x=self.linear_2(x)
        x=self.tanh(x)
        x=self.action_bound*x
        return x
    
class Critic(nn.Module):
    def __init__(self,num_observations,num_actions) -> None:
        super().__init__()
        self.num_hidden=128
        self.linear_1=nn.Linear(num_observations+num_actions,self.num_hidden)
        self.selu=nn.SELU()
        self.linear_2=nn.Linear(self.num_hidden,1)
    
    def forward(self,state:torch.Tensor,action:torch.Tensor)->torch.Tensor:
        x=torch.cat([state,action],dim=1)
        x=self.linear_1(x)
        x=self.selu(x)
        x=self.linear_2(x)
        return x

In [4]:
from collections import namedtuple,deque
import random
from typing import List
Transition=namedtuple("Transition",["state","action","next_state","next_action","reward"])
class Experience:
    def __init__(self,maxlen=1024) -> None:
        self.experience=deque([],maxlen)
    def append(self,transition:Transition)->None:
        self.experience.append(transition)
    def __len__(self)->int:
        return len(self.experience)
    def sample(self,batch_size:int)->List[Transition]:
        return random.sample(self.experience,batch_size)

In [5]:
def update_actor_critic(actor: Actor, actor_target: Actor,
                        critic: Critic, critic_target: Critic,
                        experience: Experience, batch_size: int,
                        optimizer_actor: torch.optim.Optimizer, optimizer_critic: torch.optim.Optimizer,
                        discount_factor: float,
                        tau: float,
                        device: str):
    batch = experience.sample(batch_size)
    batch_state, batch_action, batch_next_state, batch_next_action, batch_reward = zip(*batch)
    mask_batch_next_state_is_not_none = [next_state is not None for next_state in batch_next_state]
    batch_not_none_next_state = [next_state for next_state in batch_next_state if next_state is not None]

    batch_state = torch.tensor(batch_state, dtype=torch.float32, device=device)
    batch_action = torch.tensor(batch_action, dtype=torch.float32, device=device).reshape(-1, 1)
    batch_not_none_next_state = torch.tensor(batch_not_none_next_state, dtype=torch.float32, device=device)
    batch_reward = torch.tensor(batch_reward, dtype=torch.float32, device=device).reshape((-1, 1))
    mask_batch_next_state_is_not_none = torch.tensor(mask_batch_next_state_is_not_none, dtype=torch.bool, device=device)

    value_state = critic(batch_state, batch_action)
    value_next_state = torch.zeros(batch_reward.size(), dtype=torch.float32, device=device)
    # with torch.no_grad():
    value_next_state[mask_batch_next_state_is_not_none] = critic_target(batch_not_none_next_state, actor_target(batch_not_none_next_state))
    td_target = batch_reward+discount_factor*value_next_state

    loss_critic_fn = nn.MSELoss()
    loss_critic = loss_critic_fn(value_state, td_target)
    optimizer_critic.zero_grad()
    loss_critic.backward()
    optimizer_critic.step()

    loss_actor = -torch.mean(critic(batch_state, actor(batch_state)))
    optimizer_actor.zero_grad()
    loss_actor.backward()
    optimizer_actor.step()

    actor_state_dict = actor.state_dict()
    actor_target_state_dict = actor_target.state_dict()
    for key in actor_target_state_dict:
        actor_target_state_dict[key] = actor_target_state_dict[key]*(1-tau)+actor_state_dict[key]*tau
    actor_target.load_state_dict(actor_target_state_dict)
    
    critic_state_dict = critic.state_dict()
    critic_target_state_dict = critic_target.state_dict()
    for key in critic_target_state_dict:
        critic_target_state_dict[key] = critic_target_state_dict[key]*(1-tau)+critic_state_dict[key]*tau
    critic_target.load_state_dict(critic_target_state_dict)
    
    return loss_actor.item(),loss_critic.item()

In [6]:
from collections import defaultdict
import numpy as np


def actor_critic_learning(env: gym.Env, num_episodes: int = 1000,
                          experience_maxlen: int = 1024, batch_size: int = 128,
                          epsilon: float = 0.01,
                          learning_rate_actor: float = 1e-3, learning_rate_critic: float = 1e-4,
                          discount_factor: float = 0.9,
                          tau=0.05, device: str = "cpu",
                          print_step=100):
    actor = Actor(env.observation_space.shape[0], env.action_space.shape[0], env.action_space.high.item()).to(device)
    actor_target = Actor(env.observation_space.shape[0], env.action_space.shape[0], env.action_space.high.item()).to(device)
    actor_target.load_state_dict(actor.state_dict())

    critic = Critic(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
    critic_target = Critic(env.observation_space.shape[0], env.action_space.shape[0]).to(device)
    critic_target.load_state_dict(critic.state_dict())

    optimizer_actor = torch.optim.SGD(actor.parameters(), lr=learning_rate_actor)
    optimizer_critic = torch.optim.SGD(critic.parameters(), lr=learning_rate_critic)

    experience = Experience(experience_maxlen)

    reward_per_episode = defaultdict(float)

    loss_actor = 0.0
    loss_critic = 0.0

    for episode_i in range(num_episodes):
        state = env.reset()[0]
        is_truncated = False
        while not is_truncated:
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device)
            action = actor(state_tensor).item()+epsilon*np.random.randn()
            next_state, reward, is_terminated, is_truncated, info = env.step([action])
            if is_truncated:
                reward = 0.0
                next_state = None
            experience.append(Transition(state, action, next_state, None, reward))
            state = next_state

            reward_per_episode[episode_i] += reward

            if len(experience) > batch_size:
                loss_actor, loss_critic = update_actor_critic(actor, actor_target, critic, critic_target, experience, batch_size, optimizer_actor, optimizer_critic, discount_factor, tau, device)

        if episode_i % print_step == 0:
            print("episode %d, loss_actor %f, loss_critic %f, reawrd %f" % (episode_i, loss_actor, loss_critic, reward_per_episode[episode_i]))

In [7]:
actor_critic_learning(env, num_episodes=1500, experience_maxlen=1024, batch_size=128, epsilon=0.01, learning_rate_actor=3e-3, learning_rate_critic=3e-4, discount_factor=0.9, tau=0.05, device="cuda" if torch.cuda.is_available() else "cpu", print_step=1)

  batch_state = torch.tensor(batch_state, dtype=torch.float32, device=device)


episode 0, loss_actor 10.512185, loss_critic 30.276833, reawrd -1770.783230
episode 1, loss_actor 27.716431, loss_critic 12.491794, reawrd -1518.686475
episode 2, loss_actor 44.098740, loss_critic 21.519272, reawrd -1169.299229
episode 3, loss_actor 56.730980, loss_critic 91.141937, reawrd -1488.669692
episode 4, loss_actor 64.612373, loss_critic 2.155486, reawrd -1406.448515
episode 5, loss_actor 67.242676, loss_critic 45.969559, reawrd -1493.003400
episode 6, loss_actor 66.703239, loss_critic 44.321308, reawrd -1498.863705
episode 7, loss_actor 69.414383, loss_critic 45.792313, reawrd -1402.397442
episode 8, loss_actor 67.405685, loss_critic 81.629982, reawrd -1266.794407
episode 9, loss_actor 66.223236, loss_critic 82.468948, reawrd -1377.986299
episode 10, loss_actor 63.418789, loss_critic 1.987171, reawrd -1143.166934
episode 11, loss_actor 63.779243, loss_critic 52.410198, reawrd -1488.604266
episode 12, loss_actor 59.983757, loss_critic 30.469446, reawrd -1050.722646
episode 13,