# CartPole Action Shaping

In [1]:
import math
from typing import Optional, Tuple, Union

import numpy as np
import pandas as pd

import gymnasium as gym
from gymnasium import logger, spaces
from gymnasium.envs.classic_control import utils
from gymnasium.error import DependencyNotInstalled
from gymnasium.experimental.vector import VectorEnv
from gymnasium.vector.utils import batch_space
from gymnasium.wrappers import TimeLimit

# CartPole_ActionShaping Environment

In [2]:
class CartPoleEnv_ActionShaping(gym.Env[np.ndarray, Union[int, np.ndarray]]):

    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 50,
    }

    def __init__(self, render_mode: Optional[str] = None):
        self.gravity = 9.8
        self.masscart = 1.0
        self.masspole = 0.1
        self.total_mass = self.masspole + self.masscart
        self.length = 0.5  # actually half the pole's length
        self.polemass_length = self.masspole * self.length
        self.force_mag = 10.0
        self.tau = 0.02  # seconds between state updates
        self.kinematics_integrator = "euler"

        # Angle at which to fail the episode
        self.theta_threshold_radians = 12 * 2 * math.pi / 360
        self.x_threshold = 2.4

        # Angle limit set to 2 * theta_threshold_radians so failing observation
        # is still within bounds.
        high = np.array(
            [
                self.x_threshold * 2,
                np.finfo(np.float32).max,
                self.theta_threshold_radians * 2,
                np.finfo(np.float32).max,
            ],
            dtype=np.float32,
        )

        # self.action_space = spaces.Discrete(2) Changed!!!
        ##########
        self.action_space = spaces.Discrete(3)
        ##########
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)

        self.render_mode = render_mode

        self.screen_width = 600
        self.screen_height = 400
        self.screen = None
        self.clock = None
        self.isopen = True
        self.state = None

        self.steps_beyond_terminated = None

    def step(self, action):
        assert self.action_space.contains(
            action
        ), f"{action!r} ({type(action)}) invalid"
        assert self.state is not None, "Call reset before using step method."
        x, x_dot, theta, theta_dot = self.state
        # force = self.force_mag if action == 1 else -self.force_mag Changed!!!
        ##########
        if action == 2:
            force = 0
        elif action == 1:
            force = self.force_mag
        else:
            force = -self.force_mag
        ##########
        costheta = math.cos(theta)
        sintheta = math.sin(theta)

        # For the interested reader:
        # https://coneural.org/florian/papers/05_cart_pole.pdf
        temp = (
            force + self.polemass_length * theta_dot**2 * sintheta
        ) / self.total_mass
        thetaacc = (self.gravity * sintheta - costheta * temp) / (
            self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
        )
        xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass

        if self.kinematics_integrator == "euler":
            x = x + self.tau * x_dot
            x_dot = x_dot + self.tau * xacc
            theta = theta + self.tau * theta_dot
            theta_dot = theta_dot + self.tau * thetaacc
        else:  # semi-implicit euler
            x_dot = x_dot + self.tau * xacc
            x = x + self.tau * x_dot
            theta_dot = theta_dot + self.tau * thetaacc
            theta = theta + self.tau * theta_dot

        self.state = (x, x_dot, theta, theta_dot)

        terminated = bool(
            x < -self.x_threshold
            or x > self.x_threshold
            or theta < -self.theta_threshold_radians
            or theta > self.theta_threshold_radians
        )

        if not terminated:
            reward = 1.0
        elif self.steps_beyond_terminated is None:
            # Pole just fell!
            self.steps_beyond_terminated = 0
            reward = 1.0
        else:
            if self.steps_beyond_terminated == 0:
                logger.warn(
                    "You are calling 'step()' even though this "
                    "environment has already returned terminated = True. You "
                    "should always call 'reset()' once you receive 'terminated = "
                    "True' -- any further steps are undefined behavior."
                )
            self.steps_beyond_terminated += 1
            reward = 0.0

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}

    def reset(
        self,
        *,
        seed: Optional[int] = None,
        options: Optional[dict] = None,
    ):
        super().reset(seed=seed)
        # Note that if you use custom reset bounds, it may lead to out-of-bound
        # state/observations.
        low, high = utils.maybe_parse_reset_bounds(
            options, -0.05, 0.05  # default low
        )  # default high
        self.state = self.np_random.uniform(low=low, high=high, size=(4,))
        self.steps_beyond_terminated = None

        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), {}

    def render(self):
        if self.render_mode is None:
            assert self.spec is not None
            gym.logger.warn(
                "You are calling render method without specifying any render mode. "
                "You can specify the render_mode at initialization, "
                f'e.g. gym.make("{self.spec.id}", render_mode="rgb_array")'
            )
            return

        try:
            import pygame
            from pygame import gfxdraw
        except ImportError as e:
            raise DependencyNotInstalled(
                "pygame is not installed, run `pip install gymnasium[classic-control]`"
            ) from e

        if self.screen is None:
            pygame.init()
            if self.render_mode == "human":
                pygame.display.init()
                self.screen = pygame.display.set_mode(
                    (self.screen_width, self.screen_height)
                )
            else:  # mode == "rgb_array"
                self.screen = pygame.Surface((self.screen_width, self.screen_height))
        if self.clock is None:
            self.clock = pygame.time.Clock()

        world_width = self.x_threshold * 2
        scale = self.screen_width / world_width
        polewidth = 10.0
        polelen = scale * (2 * self.length)
        cartwidth = 50.0
        cartheight = 30.0

        if self.state is None:
            return None

        x = self.state

        self.surf = pygame.Surface((self.screen_width, self.screen_height))
        self.surf.fill((255, 255, 255))

        l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
        axleoffset = cartheight / 4.0
        cartx = x[0] * scale + self.screen_width / 2.0  # MIDDLE OF CART
        carty = 100  # TOP OF CART
        cart_coords = [(l, b), (l, t), (r, t), (r, b)]
        cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
        gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
        gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))

        l, r, t, b = (
            -polewidth / 2,
            polewidth / 2,
            polelen - polewidth / 2,
            -polewidth / 2,
        )

        pole_coords = []
        for coord in [(l, b), (l, t), (r, t), (r, b)]:
            coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
            coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
            pole_coords.append(coord)
        gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
        gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))

        gfxdraw.aacircle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )
        gfxdraw.filled_circle(
            self.surf,
            int(cartx),
            int(carty + axleoffset),
            int(polewidth / 2),
            (129, 132, 203),
        )

        gfxdraw.hline(self.surf, 0, self.screen_width, carty, (0, 0, 0))

        self.surf = pygame.transform.flip(self.surf, False, True)
        self.screen.blit(self.surf, (0, 0))
        if self.render_mode == "human":
            pygame.event.pump()
            self.clock.tick(self.metadata["render_fps"])
            pygame.display.flip()

        elif self.render_mode == "rgb_array":
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
            )

    def close(self):
        if self.screen is not None:
            import pygame

            pygame.display.quit()
            pygame.quit()
            self.isopen = False

# CartPole_ActionShaping Learning

In [3]:
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.logger import configure
from stable_baselines3 import PPO
import os

In [4]:
model_dir = "models"
if not os.path.exists(model_dir):
    os.makedirs(model_dir)
    
log_dir = "logs"
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

result_dir = "results"
if not os.path.exists(result_dir):
    os.makedirs(result_dir)

In [5]:
model_actionshaping_dir = f"{model_dir}/ActionShaping"
if not os.path.exists(model_actionshaping_dir):
    os.makedirs(model_actionshaping_dir)

log_actionshaping_dir = f"{log_dir}/ActionShaping"
if not os.path.exists(log_actionshaping_dir):
    os.makedirs(log_actionshaping_dir)
    
result_actionshaping_dir = f"{result_dir}/ActionShaping"
if not os.path.exists(result_actionshaping_dir):
    os.makedirs(result_actionshaping_dir)
    
logger_actionshaping = configure(log_actionshaping_dir, ["stdout", "csv", "tensorboard"])

Logging to logs/ActionShaping


In [6]:
env_train_actionshaping = CartPoleEnv_ActionShaping()
env_train_actionshaping = TimeLimit(env_train_actionshaping, max_episode_steps=500)

In [7]:
model_actionshaping = PPO("MlpPolicy", env_train_actionshaping, verbose=1)
model_actionshaping.set_logger(logger_actionshaping)

checkpoint_callback = CheckpointCallback(save_freq=2048, save_path=model_actionshaping_dir)
model_actionshaping.learn(100000, callback=checkpoint_callback)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 22.9     |
|    ep_rew_mean     | 22.9     |
| time/              |          |
|    fps             | 2086     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 28           |
|    ep_rew_mean          | 28           |
| time/                   |              |
|    fps                  | 1339         |
|    iterations           | 2            |
|    time_elapsed         | 3            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0077794017 |
|    clip_fraction        | 0.091        |
|    clip_range           | 0.2          |
|    en

------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 159          |
|    ep_rew_mean          | 159          |
| time/                   |              |
|    fps                  | 1142         |
|    iterations           | 11           |
|    time_elapsed         | 19           |
|    total_timesteps      | 22528        |
| train/                  |              |
|    approx_kl            | 0.0064440807 |
|    clip_fraction        | 0.0564       |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.964       |
|    explained_variance   | 0.916        |
|    learning_rate        | 0.0003       |
|    loss                 | 3.21         |
|    n_updates            | 100          |
|    policy_gradient_loss | -0.00621     |
|    value_loss           | 16.7         |
------------------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_m

------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 319          |
|    ep_rew_mean          | 319          |
| time/                   |              |
|    fps                  | 1128         |
|    iterations           | 21           |
|    time_elapsed         | 38           |
|    total_timesteps      | 43008        |
| train/                  |              |
|    approx_kl            | 0.0072129667 |
|    clip_fraction        | 0.0689       |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.778       |
|    explained_variance   | 0.367        |
|    learning_rate        | 0.0003       |
|    loss                 | 0.0256       |
|    n_updates            | 200          |
|    policy_gradient_loss | -0.0105      |
|    value_loss           | 0.246        |
------------------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len

------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 455          |
|    ep_rew_mean          | 455          |
| time/                   |              |
|    fps                  | 1125         |
|    iterations           | 31           |
|    time_elapsed         | 56           |
|    total_timesteps      | 63488        |
| train/                  |              |
|    approx_kl            | 0.0052743545 |
|    clip_fraction        | 0.038        |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.747       |
|    explained_variance   | -0.0587      |
|    learning_rate        | 0.0003       |
|    loss                 | -0.0157      |
|    n_updates            | 300          |
|    policy_gradient_loss | -0.00167     |
|    value_loss           | 0.00283      |
------------------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len

------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 500          |
|    ep_rew_mean          | 500          |
| time/                   |              |
|    fps                  | 1124         |
|    iterations           | 41           |
|    time_elapsed         | 74           |
|    total_timesteps      | 83968        |
| train/                  |              |
|    approx_kl            | 0.0047179563 |
|    clip_fraction        | 0.0531       |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.72        |
|    explained_variance   | -0.484       |
|    learning_rate        | 0.0003       |
|    loss                 | -0.0103      |
|    n_updates            | 400          |
|    policy_gradient_loss | -0.00333     |
|    value_loss           | 9.04e-05     |
------------------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len

<stable_baselines3.ppo.ppo.PPO at 0x1a9dca08f70>

In [8]:
env_train_actionshaping.close()

In [9]:
model_learning_data_actionshaping = pd.read_csv("logs/ActionShaping/progress.csv")
model_id_best_actionshaping = (model_learning_data_actionshaping["rollout/ep_rew_mean"].idxmax() + 1) * 2048
model_path_best_actionshaping = f"{model_actionshaping_dir}/rl_model_{model_id_best_actionshaping}_steps.zip"
model_best_actionshaping = PPO.load(model_path_best_actionshaping)

# CartPole_ActionShaping Evaluation

In [10]:
env_test_actionshaping = CartPoleEnv_ActionShaping()
env_test_actionshaping = TimeLimit(env_test_actionshaping, max_episode_steps=500)

In [11]:
actions_actionshaping = []
rewards_actionshaping = []
steps_actionshaping = []

for i in range(100):
    step = 0
    score = 0
    truncated = False
    terminated = False
    observation, info = env_test_actionshaping.reset(seed=100+i)

    while not terminated and not truncated:
        action = model_best_actionshaping.predict(observation)[0]
        observation, reward, terminated, truncated, info = env_test_actionshaping.step(action)
        
        score += reward
        step += 1
        actions_actionshaping.append(int(action))
        
    rewards_actionshaping.append(score)
    steps_actionshaping.append(step)

In [12]:
env_test_actionshaping.close() 

In [13]:
np.savetxt(f"{result_actionshaping_dir}/rewards_actionshaping.txt", rewards_actionshaping)
np.savetxt(f"{result_actionshaping_dir}/steps_actionshaping.txt", steps_actionshaping)
np.savetxt(f"{result_actionshaping_dir}/actions_actionshaping.txt", actions_actionshaping)