In [None]:
#!pip install gymnasium
#!pip install gymnasium[classic-control]

In [1]:
import gymnasium as gym
env = gym.make("CartPole-v1", render_mode="human")
observation, info = env.reset()

print(env.observation_space)
print(env.observation_space.low)
print(env.observation_space.high)
print(env.action_space)


valores = [0.0]

for _ in range(300):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    valores.append(reward)

    if terminated or truncated:
        observation, info = env.reset()

env.close()

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)
[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]
[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
Discrete(2)


In [None]:
#import dpg

#%run dpg.py

# Código de prueba MPC 1

In [2]:
import logging
from typing import Any, Optional

import casadi as cs
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from csnlp import Nlp
from csnlp.wrappers import Mpc
from gymnasium.wrappers import TimeLimit

from mpcrl import (
    LearnableParameter,
    LearnableParametersDict,
    LstdDpgAgent,
    UpdateStrategy,
)
from mpcrl import exploration as E
from mpcrl.optim import GradientDescent
from mpcrl.util.control import dlqr
from mpcrl.wrappers.agents import Log, RecordUpdates
from mpcrl.wrappers.envs import MonitorEpisodes


In [None]:
class CartPoleSystem:
    def __init__(self):
        self.env = gym.make("CartPole-v1")
        #self.env = gym.make("CartPole-v1", m_p=0.5, m_c=1.0, length=0.5)
        #self.env = gym.make("CartPole-v1", render_mode="human")
        self.observation_space = self.env.observation_space
        self.action_space = self.env.action_space

    def reset(self):
        return self.env.reset()

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        return obs, reward, done, info

    def get_stage_cost(self, state):
        # Define the individual cost components
        x, x_dot, theta, theta_dot = state  # Extract state variables

        # Penalty for being far from the center position (cart position)
        cart_pos_cost = abs(x)

        # Penalty for high angular velocity
        theta_dot_cost = abs(theta_dot)

        # Penalty for being far from upright (cart angle)
        pole_angle_cost = abs(theta)

        # Penalty for high cart velocity
        x_dot_cost = abs(x_dot) if pole_angle_cost < np.pi/2 else 0  # Penalize only when pole is upright

        # Combine individual costs with weights (adjust weights as needed)
        stage_cost = 0.1 * cart_pos_cost + 0.2 * theta_dot_cost + \
                     0.5 * pole_angle_cost + 0.1 * x_dot_cost

        return stage_cost


In [None]:
class CartPoleMpc(Mpc[cs.SX]):

    m_p=0.5
    m_c=1.0, 
    length=0.5
    horizon = 10                                        # Horizonte de predicción, pasos hacia el futuro a predecir
    discount_factor = 0.9                               # Para ponderar la importancia de las recompensas futuras en relación con las recompensas inmediatas
    learnable_pars_init = {                             # Parámetros que se pueden aprender durante el entrenamiento del controlador.
        "V0": np.asarray([0, 0]),                            # Valor inicial 
        "x_lb": np.asarray([-2.4, 0]),                          # Son los límites inferiores y superiores para el estado del sistema.
        "x_ub": np.asarray([2.4, 0]),
        "b": np.zeros(4),
        "f": np.zeros(4+1),
        "A": np.asarray([[0, 1, 0, 0], [0, 0, m_p*9.8/m_c, 0], [0, 0, 0, 1], [0, 0, (m)/, 0]]),
        "B": np.asarray([[0.0312], [0.25]]),
    }
    
    def __init__(self) -> None:
        N = self.horizon
        gamma = self.discount_factor
        w = LtiSystem.w
        nx, nu = LtiSystem.nx, LtiSystem.nu
        x_bnd, a_bnd = LtiSystem.x_bnd, LtiSystem.a_bnd
        nlp = Nlp[cs.SX]()
        super().__init__(nlp, N)

        # parameters
        V0 = self.parameter("V0", (nx,))
        x_lb = self.parameter("x_lb", (nx,))
        x_ub = self.parameter("x_ub", (nx,))
        b = self.parameter("b", (nx, 1))
        f = self.parameter("f", (nx + nu, 1))
        A = self.parameter("A", (nx, nx))
        B = self.parameter("B", (nx, nu))

        # variables (state, action, slack)                #########################
        x, _ = self.state("x", nx, bound_initial=False)
        u, _ = self.action("u", nu, lb=a_bnd[0], ub=a_bnd[1])
        s, _, _ = self.variable("s", (nx, N), lb=0)

        # dynamics
        self.set_dynamics(lambda x, u: A @ x + B * u + b, n_in=2, n_out=1)
        self.set_dynamics(self, x, u):
            

        # other constraints
        self.constraint("x_lb", x_bnd[0] + x_lb - s, "<=", x[:, 1:])
        self.constraint("x_ub", x[:, 1:], "<=", x_bnd[1] + x_ub + s)

        # objective
        A_init, B_init = self.learnable_pars_init["A"], self.learnable_pars_init["B"]
        S = cs.DM(dlqr(A_init, B_init, 0.5 * np.eye(nx), 0.25 * np.eye(nu))[1])
        gammapowers = cs.DM(gamma ** np.arange(N)).T
        self.minimize(
            V0.T @ x[:, 0]  # to have a derivative, V0 must be a function of the state
            + cs.bilin(S, x[:, -1])
            + cs.sum2(f.T @ cs.vertcat(x[:, :-1], u))
            + 0.5
            * cs.sum2(
                gammapowers
                * (cs.sum1(x[:, :-1] ** 2) + 0.5 * cs.sum1(u**2) + w.T @ s)
            )
        )

        # solver
        opts = {
            "expand": True,
            "print_time": False,
            "bound_consistency": True,
            "clip_inactive_lam": True,
            "calc_lam_x": False,
            "calc_lam_p": False,
            "jit": False,
            "ipopt": {
                # "linear_solver": "pardiso",
                # "tol": 1e-5,
                # "barrier_tol_factor": 1,
                "max_iter": 500,
                "sb": "yes",
                "print_level": 0,
            },
        }
        self.init_solver(opts, solver="ipopt")
        

In [None]:
# now, let's create the instances of such classes and start the training
#mpc = LinearMpc()
#learnable_pars = LearnableParametersDict[cs.SX](
#    (
#        LearnableParameter(name, val.shape, val, sym=mpc.parameters[name])
#        for name, val in mpc.learnable_pars_init.items()
#    )
#)


### env = MonitorEpisodes(TimeLimit(LtiSystem(), max_episode_steps=int(5e3)))
env = MonitorEpisodes(TimeLimit(gym.make("CartPole-v1"), max_episode_steps=500))

mpc = CartPoleMpc()

rollout_length = 100
agent = Log(
    RecordUpdates(
        LstdDpgAgent(
            mpc=mpc,
            learnable_parameters=learnable_pars,
            discount_factor=mpc.discount_factor,
            optimizer=GradientDescent(learning_rate=1e-6),
            update_strategy=UpdateStrategy(rollout_length, "on_timestep_end"),
            rollout_length=rollout_length,
            exploration=E.GreedyExploration(0.05),
            record_policy_performance=True,
            record_policy_gradient=True,
        )
    ),
    level=logging.DEBUG,
    log_frequencies={"on_timestep_end": 1000},
)
agent.train(env=env, episodes=1, seed=69)


# Código de prueba MPC 2

In [11]:
class CartPoleSystem(gym.Env):
    metadata = {'render.modes': ['human']}

    def __init__(self):
        super().__init__()
        self.env = gym.make("CartPole-v1")
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space

    def reset(self):
        return self.env.reset()

    def step(self, action):
        return self.env.step(action)

    def render(self, mode='human'):
        return self.env.render(mode=mode)

    def close(self):
        self.env.close()

    def get_stage_cost(self, state):
        # Define the individual cost components
        x, x_dot, theta, theta_dot = state  # Extract state variables

        # Penalty for being far from the center position (cart position)
        cart_pos_cost = abs(x)

        # Penalty for high angular velocity
        theta_dot_cost = abs(theta_dot)

        # Penalty for being far from upright (cart angle)
        pole_angle_cost = abs(theta)

        # Penalty for high cart velocity
        x_dot_cost = abs(x_dot) if pole_angle_cost < np.pi/2 else 0  # Penalize only when pole is upright

        # Combine individual costs with weights (adjust weights as needed)
        stage_cost = 0.1 * cart_pos_cost + 0.2 * theta_dot_cost + \
                     0.5 * pole_angle_cost + 0.1 * x_dot_cost
        pass


class CartPoleMpc(Mpc[cs.SX]):
    def __init__(self, system):
        self.system = system  # Instancia de CartPoleSystem
        # Inicializa cualquier otro parámetro necesario para MPC
        m_p=0.5
        m_c=1.0 
        length=0.5
        g = 9.8
        # Definir las matrices A, B y el vector b
        #self.A = np.eye([[0, 1, 0, 0],[0, 0, (m_p*g)/m_c, 0],[0, 0, 0, 1],[0, 0, ((m_c+m_p)*g)/(m_c*length), 0]])  # Matriz de transición de estado (ya definida)
        self.A = np.eye(4)
        self.B = np.array([0, 1/m_c, 0, -1/(m_c * length)])  # Vector de entrada (acción)
        self.b = np.zeros(4)  # Término de sesgo (puedes ajustarlo según tu modelo)
        super()._init_(system)

    def dynamics(self, x, u):
        """
        Ecuaciones de dinámica del sistema.
        x: Estado (vector de 4 dimensiones)
        u: Acción (fuerza aplicada al carrito)
        """
        return self.A @ x + self.B * u + self.b
    
    def plan(self, current_state):
        # Implementa la lógica para planificar la secuencia de acciones
        # Esto puede incluir la predicción del estado futuro y la optimización del costo
        #pass
        """
        Planifica la secuencia de acciones basada en el estado actual.

        Args:
            current_state (np.ndarray): Vector de estado actual (por ejemplo, [x, x_dot, theta, theta_dot]).

        Returns:
            action (float): Acción planificada (por ejemplo, fuerza aplicada al carrito).
        """
        # Implementa aquí tu lógica de planificación:
        # - Puedes usar algoritmos de optimización, controladores PID, etc.
        # - Considera los objetivos de control y los costos asociados.

        # Ejemplo simple: Mantener el poste vertical
        # Calcula la acción como una función del ángulo actual del poste
        theta = current_state[2]
        action = -theta  # Invierte el ángulo como acción
        return action

    def execute(self):
        # Ejecuta la secuencia de acciones planificadas en el entorno
        state = self.system.reset()
        done = False
        while not done:
            action = self.plan(state)
            state, reward, done, _ = self.system.step(action)
            # Opcional: renderizar el entorno si es necesario
            self.system.render()
        self.system.close()

In [12]:
# Crear la instancia de la nueva clase MPC

env = MonitorEpisodes(TimeLimit(CartPoleSystem(), max_episode_steps=int(5e3)))

mpc = CartPoleMpc(system = env)


#cartpole_env = CartPoleSystem()
#mpc_agent = CartPoleMpc(cartpole_env)

# Definir los parámetros aprendibles para el nuevo MPC
#learnable_pars = LearnableParametersDictcs.SX
#        for name, val in mpc.learnable_pars_init.items()
#)

learnable_pars = LearnableParametersDict[cs.SX](
    (
        LearnableParameter(name, val.shape, val, sym=mpc.parameters[name])
        for name, val in mpc.learnable_pars_init.items()
    )
)


# Crear el entorno utilizando CartPoleSystem
#env = MonitorEpisodes(TimeLimit(CartPoleSystem(), max_episode_steps=int(5e3)))

# Definir la longitud del rollout
rollout_length = 100

# Crear y configurar el agente con la nueva clase MPC y los parámetros actualizados
agent = Log(
    RecordUpdates(
        LstdDpgAgent(
            mpc=mpc,
            learnable_parameters=learnable_pars,
            discount_factor=mpc.discount_factor,
            optimizer=GradientDescent(learning_rate=1e-6),
            update_strategy=UpdateStrategy(rollout_length, "on_timestep_end"),
            rollout_length=rollout_length,
            exploration=E.GreedyExploration(0.05),
            record_policy_performance=True,
            record_policy_gradient=True,
        )
    ),
    level=logging.DEBUG,
    log_frequencies={"on_timestep_end": 1000},
)

# Entrenar el agente con el nuevo entorno
agent.train(env=env, episodes=1, seed=69)


AttributeError: 'super' object has no attribute '_init_'