In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time

import gymnasium as gym
from gymnasium import spaces
import stable_baselines3 as sb

import pygame
import pygame.font

from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_util import make_vec_env
# from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_checker import check_env



In [2]:
BLACK = (0  ,0  ,0  )
WHITE = (255,255,255)
GREEN = (0  ,150,40 )

WIDTH, HEIGHT = 500, 300
CART_SIZE = (40,20)
CART_POS  = (WIDTH/2-20, HEIGHT/2-10) # (screen- cart)/2

gamma     = .1
gammath   = .1
L     = .2
G     = .98

def RK4(fun, x, dt, t = 0, a = 0):

    k1 = fun(t,      x,         a)
    k2 = fun(t+dt/2, x+dt*k1/2, a)
    k3 = fun(t+dt/2, x+dt*k2/2, a)
    k4 = fun(t+dt,   x+dt*k3,   a)

    y = x + dt/6*(k1+2*k2+2*k3+k4)
    return y

# Todo esto deberia ir dentro del Env eventualmente

def cart_evol(t, x, a = 0):
    vDot = a - gamma * x[1]
    xDot = x[1]
    return np.array([xDot, vDot])

def pend_evol(t, x, a = 0):
    thDotDot = (a * np.cos(x[0]) - G * np.sin(x[0]))/L - gammath * x[1]
    thDot    = x[1]
    return np.array([thDot, thDotDot])

def get_pos_pend(ang, x):
    l = L*WIDTH
    return x[0] - l*np.sin(ang), x[1] + l*np.cos(ang)

def draw_player(screen, cart, pend_pos):
    pygame.draw.line(screen, BLACK,
                    cart.center, pend_pos,
                    width = 6)
    pygame.draw.rect(screen, WHITE, cart)
    pygame.draw.rect(screen, BLACK, cart, 4)
    pygame.draw.circle(screen, WHITE, pend_pos, 20)
    pygame.draw.circle(screen, BLACK, pend_pos, 20, 5)

def update_screen(screen, cart, th):
    screen.fill((255,255,255))
    pend_pos  = get_pos_pend(th, cart.center)

    # pygame.draw.line(screen, BLACK, (1000, HEIGHT*(1-.1)/2), (100, HEIGHT*(1+.1)/2), 4)
    pygame.draw.line(screen, BLACK, (0, HEIGHT/2), (WIDTH, HEIGHT/2), 4)
    # pygame.draw.line(screen, BLACK, (1440, HEIGHT*(1-.1)/2), (1440, HEIGHT*(1+.1)/2), 4)
    draw_player(screen, cart, pend_pos)
    
def observations(x,th,xDot,thDot):
    return np.array([x, np.cos(th), np.sin(th), xDot/2, thDot/20]).astype(np.float32)

In [None]:
class Pendulo(gym.Env):
    """
    Custom Environment that follows gym interface.
    This is a simple env where the agent must learn to go always left. 
    """
    # Because of google colab, we cannot implement the GUI ('human' render mode)
    metadata = {'render.modes': ['data', "pygame"]}
    # Define constants for clearer code
    # Distancias en mm?

    def __init__(self, fps = 120, render_mode = None):
        super(Pendulo, self).__init__()   

        # (x,th), (xDot,thDot)
        self.agent_vars = np.array(((0.,np.random.normal(0,.1)),(0.,0.)))

        # variables de tiempo
        self.dt = 1/fps # timestep en seg

        # threshold del reward
        #self.targetH = np.cos(target_th)
        self.total_reward = 0

        # Define action and observation space
        # They must be gym.spaces objects
        self.action_space = spaces.Box(low = -1, high = 1,
                                            shape=(1,), dtype=np.float32)
        # The observation will be the coordinate of the agent
        # (x, v, cos th, sin th, thDot)
        # elijo pasarle seno y coseno enves del angulo xq quedan en [-1,1]
        # y no tienen el problema de discontinuidad de th en [-pi,pi]
        self.observation_space = spaces.Box(low = -1, high = 1,
                                            shape=(5,), dtype=np.float32)
        
        self.render_mode = render_mode
        if render_mode == "data":
            with open("output.txt", "w") as f:
                f.write("x\tcos(th)\tsin(th)\tv\tthDot\t\n")
        elif render_mode == "pygame":
            pygame.init()
            pygame.font.init()

            self.screen = pygame.display.set_mode((WIDTH, HEIGHT))
            # self.clock  = pygame.time.Clock()
            self.player = pygame.Rect(CART_POS + CART_SIZE)
            self.my_font = pygame.font.SysFont('Comic Sans MS', 50) 

    def reset(self, seed = 0,):
        """
        Important: the observation must be a numpy array
        :return: (np.array) 
        """
        super().reset(seed=seed)

        # Initialize the agent at the right of the grid
        self.agent_vars = np.array(((0.,np.random.normal(0,.1)),(0.,0.)))
        self.total_reward = 0
        if self.render_mode == "pygame":
            pygame.display.update()
            self.player.move_ip(WIDTH/2 - self.player.x, 0)
            update_screen(self.screen, self.player, self.agent_vars[0][1])
        
        return observations(*np.ravel(self.agent_vars)), {}

    def step(self, action):

        x,th = self.agent_vars[0]
        v,thDot = self.agent_vars[1]

        if abs(action[0]) > 1:
            raise ValueError("Received invalid action={} which is not part of the action space".format(action))
        elif abs(x) > 1:
            # x = np.clip(x, -1, 1)
            # a = -v/self.dt * .2
            # thDotDot = -v/self.dt * 1.2
            # v = 0
            return observations(*np.ravel(self.agent_vars)),\
                -10000, True, False, {"Crashed" : True}
        else:
            thDotDot = a = action[0] * 3

        x, v = RK4(cart_evol, [x, v], self.dt, a = a)
        th, thDot = RK4(pend_evol, [th, thDot], self.dt, a = thDotDot)

        # print(x,v,th,thDot)

        self.agent_vars = np.array(((x,th),(v,thDot)))
        
        # reward
        if np.cos(th) < 0:
            reward = 1/(1 + 10*abs(x) + abs(thDot))
        else:
            reward = -np.cos(th)*(4+abs(x))
        self.total_reward += reward
        # Optionally we can pass additional info, we are not using that for now
        info = {}

        return observations(*np.ravel(self.agent_vars)),\
                reward, False, False, info

    def render(self, seed = 0):
        if self.render_mode == 'data':
            # agent is represented as a cross, rest as a dot
            data = observations(*np.ravel(self.agent_vars))
            with open("output.txt", "a") as f:
                for var in data:
                    f.write(str(var))
                    f.write("\t")
                f.write("\n")
        elif self.render_mode == "pygame":
            x,th = self.agent_vars[0]
            x = WIDTH/2 * (x+1)
            self.player.move_ip(x-self.player.x, 0)

            update_screen(self.screen, self.player, th)
            self.screen.blit(self.my_font.render("SCORE: %.1f"%(self.total_reward), False, GREEN), (100,50))
            pygame.display.update()
        else:
            raise NotImplementedError()

    def close(self):
        pass

In [44]:
env = Pendulo()
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)
# gym.register(
#     id="Pendulo",
#     entry_point=Pendulo,
# )
# wrap it

In [45]:
multi_env = make_vec_env(Pendulo , n_envs = 16)
model = sb.PPO('MlpPolicy', multi_env,
               n_steps = 1024, batch_size = 64)
# model = sb.PPO.load(save_dir + "/PendulumV2_PPO_2E6Iterations_2envs", multi_env)

In [89]:
model.learn(total_timesteps = int(6e6), reset_num_timesteps = 1500)
            #,progress_bar = True)

<stable_baselines3.ppo.ppo.PPO at 0x1e92d024f50>

In [61]:
pygame.quit()

In [91]:
single_env = Pendulo(render_mode = "pygame")
obs, _ = single_env.reset()
        
for step in range(3000):
    time.sleep(0.005)
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = single_env.step(action)
    single_env.render()
    #print("Step {}".format(step + 1), "\t", "Action: ", action, "\t", "reward=", reward)
    if terminated or truncated:
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        print(step+1, info)
        break
print(step+1, info)

3000 {}


In [37]:
# Hagamos modificaciones:
#   Limitamos los time_steps por episodio
class CustomWrapper(gym.Wrapper):
  """
  :param env: (gym.Env) Gym environment that will be wrapped
  :param max_steps: (int) Max number of steps per episode
  """
  def __init__(self, env, max_steps = 100):
    # Call the parent constructor, so we can access self.env later
    super(CustomWrapper, self).__init__(env)
    self.max_steps = max_steps
    # Counter of steps per episode
    self.current_step = 0
  
  def reset(self, seed = None):
    """
    Reset the environment 
    """
    # Reset the counter
    self.current_step = 0
    return self.env.reset()

  def step(self, action):
    """
    :param action: ([float] or int) Action taken by the agent
    :return: (np.ndarray, float, bool, dict) observation, reward, is the episode over?, additional informations
    """
    self.current_step += 1
    obs, reward, done, truncated, info = self.env.step(action)
    # Overwrite the done signal when 
    if self.current_step >= self.max_steps:
      truncated = True
      # Update the info dict to signal that the limit was exceeded
      info['time_limit_reached'] = True
    return obs, reward, done, truncated, info

In [38]:
from stable_baselines3.common.monitor import Monitor
eval_env = Monitor(CustomWrapper(Pendulo(), 2400))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100, deterministic=True)

print(mean_reward, std_reward)

-225.53949138000004 959.1036871679634


In [None]:
save_dir = "/tmp/gym/"
model.save(save_dir + "/PendulumV2_PPO_2E6Iterations_2envs")

# Data?

In [None]:
data = pd.read_table("output.txt")
x, costh, sinth, v, thDot = data["x"],data["cos(th)"],data["sin(th)"],data["v"],data["thDot"],

fig, ax = plt.subplots(2,2)

ax[0,0].plot(x)
ax[0,1].plot(x - .2*sinth, - .2*costh)
ax[1,0].plot(v)
ax[1,1].plot(thDot)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
import matplotlib.collections as mcoll
import matplotlib.path as mpath
from cycler import cycler

xx = x - .2*sinth
yy = - .2*costh

MAP = 'jet'
NPOINTS = len(xx)

fig = plt.figure()
ax1 = fig.add_subplot(111)
ax1.set(xlim = (-1,1), ylim = (-.25,.25))
cm = plt.get_cmap(MAP)
for i in range(10):
    colors = [cm(1.0*i/(NPOINTS-1)) for i in range(NPOINTS-1)]
    ax1.set_prop_cycle(cycler('color', colors))
    for i in range(NPOINTS-1):
        plt.plot(xx[i:i+2],yy[i:i+2])

# plt.title('Inner minimization', fontsize=25)
# plt.xlabel(r'Friction torque $[Nm]$', fontsize=25)
# plt.ylabel(r'Accelerations energy $[\frac{Nm}{s^2}]$', fontsize=25)
plt.show() # Show the figure

In [None]:
from matplotlib import animation

fig, ax = plt.subplots(figsize = (6,3), dpi = 100)

x1 = x
zz = np.zeros(len(x))
xx = x - .2*sinth
yy = - .2*costh

# x1 = np.array(x1[::10])
# zz = np.array(zz[::10])
# xx = np.array(xx[::10])
# yy = np.array(yy[::10])

rail = ax.plot([-1,1],[0,0], "k")[0]
line = ax.plot([xx[0],x[0]],[yy[0],zz[0]], "k:")[0]

scat = ax.scatter(xx[0], yy[0], c="b", s=25)
scat2 = ax.scatter(x[0], zz[0], c="k", s=25)

ax.set(xlim=[-1, 1], ylim=[-.5, .5], xticks=[], yticks=[])


def update(frame):
    # for each frame, update the data stored on each artist.
    x = x1[frame]
    z = zz[frame]
    X = xx[frame]
    Y = yy[frame]
    # update the scatter plot:
    data = np.stack([X, Y]).T
    data2 = np.stack([x, z]).T
    scat.set_offsets(data)
    scat2.set_offsets(data2)
    # update the line plot:
    line.set_xdata([x,X])
    line.set_ydata([z,Y])
    return (line, scat)


ani = animation.FuncAnimation(fig=fig, func=update, frames = len(x1))
ani.save("mov.mp4", fps = 120)