# Continuous Pendulum Control
Zero reward is the best condition for the pendulum control

In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

import sys
import os

sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from mvp.env_pendulum import PendulumEnv

from itertools import count
import torch
import gym
from gym.envs.registration import register

import matplotlib
import matplotlib.pyplot as plt
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display
plt.ion()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Pendulum Control Gym

## Description
The inverted pendulum swingup problem is a classic problem in control theory. The system consists of a pendulum attached at one end to a fixed point, with the other end being free. The pendulum starts in a random position, and the goal is to apply torque to the free end to swing it into an upright position, where its center of gravity is right above the fixed point.

### Pendulum Coordinate System

- **x-y**: Cartesian coordinates of the pendulum’s end in meters.
- **theta**: Angle in radians.
- **tau**: Torque in N·m, defined as positive counter-clockwise.

### Action Space
The action is an ndarray with shape `(1,)` representing the torque applied to the free end of the pendulum.

| Num | Action | Min | Max |
|-----|--------|-----|-----|
| 0   | Torque | -2.0| 2.0 |

### Observation Space
The observation is an ndarray with shape `(3,)` representing the x-y coordinates of the pendulum’s free end and its angular velocity.

| Num | Observation      | Min | Max |
|-----|------------------|-----|-----|
| 0   | x = cos(theta)   | -1.0| 1.0 |
| 1   | y = sin(theta)   | -1.0| 1.0 |
| 2   | Angular Velocity | -8.0| 8.0 |

### Rewards
The reward function is defined as:

$r = -(theta^2 + 0.1 * theta_dt^2 + 0.001 * torque^2)$

where `theta` is the pendulum’s angle normalized between `[-pi, pi]` (with 0 being in the upright position). The minimum reward that can be obtained is `-(pi^2 + 0.1 * 8^2 + 0.001 * 2^2) = -16.2736044`, while the maximum reward is zero (pendulum is upright with zero velocity and no torque applied).

In [2]:
register(
    id='Pendulum-v0',
    entry_point='mvp.env_pendulum:PendulumEnv',
    max_episode_steps=1000)
env = gym.make('Pendulum-v0')

  logger.warn(


In [3]:
env.action_space

Box(-2.0, 2.0, (1,), float32)

In [4]:
env.reward_range

(-inf, inf)

In [5]:
env.observation_space

Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)

# Evaluation of Model

In [None]:
from mvp.ppo_continuous import ActorCritic

path = os.path.join(os.getcwd(), "..", "mvp", "params", "pendulum_ppo_continuous.pth")

env = PendulumEnv(render_mode="human")
n_actions = env.action_space.shape[0]
state, info = env.reset()
n_observations = len(state)
num_eval_episodes = 10

model = ActorCritic(n_observations, n_actions).to(device)
model.load_state_dict(torch.load(path, map_location=device))
model.eval()

for i_episode in range(num_eval_episodes):
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        env.render()
        # Get action from the model (assuming it outputs action_mean, action_std, and value)
        with torch.no_grad():
            action_mean, _, _ = model(state)
        
        # Take the mean action (no sampling here for deterministic behavior)
        action = action_mean.cpu().numpy()[0]
        print(action)

        observation, reward, terminated, truncated, _ = env.step(action)
        if terminated or truncated:
            print(f"Episode finished after {t+1} timesteps")
            break

        state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
env.close()

In [8]:
from mvp.fmppo_continuous import ActorCriticUPN

path = os.path.join(os.getcwd(), "..", "mvp", "params", "pendulum_ppo_upn_continuous.pth")

env = PendulumEnv(render_mode="human")
n_actions = env.action_space.shape[0]
state, _ = env.reset()
n_observations = len(state)
num_eval_episodes = 10

# Create and load the model
model = ActorCriticUPN(n_observations, n_actions).to(device)
model.load_state_dict(torch.load(path, map_location=device))
model.eval()

for i_episode in range(num_eval_episodes):
    state, _ = env.reset()
    state = torch.FloatTensor(state).unsqueeze(0).to(device)
    for t in count():
        env.render()
        # Get action from the model
        with torch.no_grad():
            action_mean, _, _ = model(state)
        
        # Take the mean action (no sampling here for deterministic behavior)
        action = action_mean.cpu().numpy()[0]
        print(action)

        next_state, reward, terminated, truncated, _ = env.step(action)
        if terminated or truncated:
            print(f"Episode finished after {t+1} timesteps")
            break

        state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
        
env.close()

[-0.39533386]
[-0.40201777]
[-0.4065948]
[-0.41026467]
[-0.41303012]
[-0.41503626]
[-0.41654584]
[-0.4176262]
[-0.4180265]
[-0.4174115]
[-0.41591188]
[-0.41306525]
[-0.40932482]
[-0.403414]
[-0.40000105]
[-0.39917445]
[-0.39850575]
[-0.39685243]
[-0.3957921]
[-0.39484376]
[-0.3931168]
[-0.39055592]
[-0.3874311]
[-0.38381696]
[-0.37985164]
[-0.375094]
[-0.37066114]
[-0.3662364]
[-0.36483496]
[-0.36597803]
[-0.3687826]
[-0.37680963]
[-0.38630557]
[-0.3957914]
[-0.40228394]
[-0.4068883]
[-0.41052338]
[-0.41326147]
[-0.41525012]
[-0.4167564]
[-0.41786176]
[-0.41831136]
[-0.41770178]
[-0.41623068]
[-0.4133582]
[-0.40954986]
[-0.4036768]
[-0.40041083]
[-0.39961144]
[-0.39888108]
[-0.3972978]
[-0.39629805]
[-0.3954124]
[-0.39342123]
[-0.3907783]
[-0.38769904]
[-0.38402343]
[-0.38003206]
[-0.37526643]
[-0.3706262]
[-0.36609763]
[-0.36444962]
[-0.36566842]
[-0.36854452]
[-0.3767456]
[-0.38644177]
[-0.39620495]
[-0.40244985]
[-0.4070899]
[-0.41070265]
[-0.4134178]
[-0.41540846]
[-0.41694075]
[-0

KeyboardInterrupt: 

In [None]:
from mvp.dqn_networks import DQN

path = os.path.join(os.getcwd(), "..", "mvp", "params", "pendulum_dqn_discrete_retrain.pth")

env = PendulumEnv(render_mode="human")
ACTION_MAP = np.linspace(-2, 2, 5)  # 5 actions ranging from -2 to 2
n_actions = len(ACTION_MAP)
state, info = env.reset()
n_observations = len(state)

model = DQN(n_observations, n_actions).to(device)
model.load_state_dict(torch.load(path, map_location=device))
model.eval()

num_eval_episodes = 10
for i_episode in range(num_eval_episodes):
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        env.render()

        action_idx = model(state).max(1)[1]
        actual_action = ACTION_MAP[action_idx.item()]
        print(action)

        observation, reward, terminated, truncated, _ = env.step([actual_action])
        if terminated or truncated:
            print(f"Episode finished after {t+1} timesteps")
            break

        state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
        
env.close()