# Env

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
from datetime import datetime
import os
import sys

import mujoco
import mediapy as media
from gym import Env
from gym.spaces import Discrete, Box

import stable_baselines3
from stable_baselines3.common.env_checker import check_env
import torch as th
from stable_baselines3 import PPO
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.logger import configure
from stable_baselines3.common.logger import Logger, CSVOutputFormat, HumanOutputFormat

import torch
import torch.nn as nn


from stable_baselines3.common.callbacks import BaseCallback
from torch.utils.data import DataLoader, TensorDataset

In [2]:

datos = pd.read_csv('datos_seguimiento_3_polyfit15_conpos.csv',sep=",")
array_datos = datos.to_numpy() #guarda los datos del csv cargado como arreglo de numpy

with open("mujoco56.xml", "r", encoding="utf-8") as f:
    xml = f.read() #Leer el archivo de mujoco cargado al entorno de ejecución y lo guarda en la variable xml

In [None]:
#Discriminador en PyTorch
class AMPDiscriminator(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.net(x)

In [None]:
#Se Modifica el entorno para usar el discriminador
#Se cambia step() para usar el discriminador:


class RobotBipedoEnv(Env):
  def __init__(self, xml_text, tracking_array, tiempo_max_sim, tracking_fps=30):
    self.simulation_time_step=0.002 
    self.motor_velocity_deg_s=90 
    self.motor_velocity_rad_s=(self.motor_velocity_deg_s)*(np.pi/180) 
    self.max_step_motors=self.motor_velocity_rad_s*self.simulation_time_step
    self.model = mujoco.MjModel.from_xml_string(xml_text)
    self.data = mujoco.MjData(self.model)
    self.observation_space = Box(low=-np.inf, high=np.inf, shape=(19, ))
    self.action_space = Box(low=-1, high=1, shape=(8,))
    self.state=np.concatenate((self.data.geom('TrackerCadera_pierna1_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna1_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna1_geom').xpos,
                              self.data.geom('TrackerCadera_pierna2_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna2_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna2_geom').xpos,
                              np.array([self.data.time])))
    if(tiempo_max_sim<=4.9):
      self.final_simulation_time=tiempo_max_sim
    else:
      self.final_simulation_time=4.9
    self.framerate=tracking_fps
    self.tracking=tracking_array
    self.height=480
    self.width=640
    self.frames=[]
    self.renderer = mujoco.Renderer(self.model, self.height, self.width)
    self.obj_frame=1
    self.objective_pos=-self.tracking[self.obj_frame-1][7]



    #Iniciliza Discriminador
    self.discriminator = AMPDiscriminator(input_dim=7)
    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    self.discriminator.to(self.device)
    print("¿AMP usando cpu o gpu?")
    print("Utilizando "+str(self.device)+" para entrenar AMP.")
    self.amp_rew_prop=0.125
    print("AMP Reward: 12.5%")
    self.agent_buffer = []
    # Cargar estados reales de tu CSV
    self.real_data = tracking_array[:, 1:8]
    

  def actuate_model(self, action):
    self.data.actuator("motor1_cadera_pierna1").ctrl+=np.clip(action[0],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor2_cadera_pierna1").ctrl+=np.clip(action[1],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor_rodilla_pierna1").ctrl+=np.clip(action[2],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor_tobillo_pierna1").ctrl+=np.clip(action[3],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor1_cadera_pierna2").ctrl+=np.clip(action[4],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor2_cadera_pierna2").ctrl+=np.clip(action[5],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor_rodilla_pierna2").ctrl+=np.clip(action[6],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("motor_tobillo_pierna2").ctrl+=np.clip(action[7],-self.max_step_motors,self.max_step_motors)
    self.data.actuator("arnes1").ctrl=-self.objective_pos+0.05
    self.data.actuator("arnes2").ctrl=-self.objective_pos+0.05
    self.data.actuator("arnes3").ctrl=-self.objective_pos+0.05
    self.data.actuator("motor1_cadera_pierna1").ctrl=np.clip(self.data.actuator("motor1_cadera_pierna1").ctrl,-0.95,0.698)
    self.data.actuator("motor2_cadera_pierna1").ctrl=np.clip(self.data.actuator("motor2_cadera_pierna1").ctrl,-0.698,0.95)
    self.data.actuator("motor_rodilla_pierna1").ctrl=np.clip(self.data.actuator("motor_rodilla_pierna1").ctrl,0,1.48)
    self.data.actuator("motor_tobillo_pierna1").ctrl=np.clip(self.data.actuator("motor_tobillo_pierna1").ctrl,-0.31,0.61)
    self.data.actuator("motor1_cadera_pierna2").ctrl=np.clip(self.data.actuator("motor1_cadera_pierna2").ctrl,-0.95,0.698)
    self.data.actuator("motor2_cadera_pierna2").ctrl=np.clip(self.data.actuator("motor2_cadera_pierna2").ctrl,-0.698,0.95)
    self.data.actuator("motor_rodilla_pierna2").ctrl=np.clip(self.data.actuator("motor_rodilla_pierna2").ctrl,0,1.48)
    self.data.actuator("motor_tobillo_pierna2").ctrl=np.clip(self.data.actuator("motor_tobillo_pierna2").ctrl,-0.31,0.61)
    self.data.actuator("arnes1").ctrl=np.clip(self.data.actuator("arnes1").ctrl,-1,2)
    self.data.actuator("arnes2").ctrl=np.clip(self.data.actuator("arnes2").ctrl,-1,2)
    self.data.actuator("arnes3").ctrl=np.clip(self.data.actuator("arnes3").ctrl,-1,2) 

  def step(self,action):
    self.actuate_model(action)
    mujoco.mj_step(self.model, self.data)
    self.state=np.concatenate((self.data.geom('TrackerCadera_pierna1_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna1_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna1_geom').xpos,
                              self.data.geom('TrackerCadera_pierna2_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna2_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna2_geom').xpos,
                              np.array([self.data.time])))
    current_angle_cadera_rodilla_pierna1=np.arctan((-(self.state[4]-self.state[1]))/np.abs((self.state[5]-self.state[2])))
    current_angle_rodilla_tobillo_pierna1=np.arctan((-(self.state[7]-self.state[4]))/np.abs((self.state[8]-self.state[5])))
    current_angle_cadera_tobillo_pierna1=np.arctan((-(self.state[7]-self.state[1]))/np.abs((self.state[8]-self.state[2])))
    current_angle_cadera_rodilla_pierna2=np.arctan((-(self.state[13]-self.state[10]))/np.abs((self.state[14]-self.state[11])))
    current_angle_rodilla_tobillo_pierna2=np.arctan((-(self.state[16]-self.state[13]))/np.abs((self.state[17]-self.state[14])))
    current_angle_cadera_tobillo_pierna2=np.arctan((-(self.state[16]-self.state[10]))/np.abs((self.state[17]-self.state[11])))
    current_pos=self.state[1]
    if(self.data.time > self.obj_frame/self.framerate):
      self.obj_frame+=1
    objective_angle_cadera_rodilla_pierna1=self.tracking[self.obj_frame-1][1]
    objective_angle_rodilla_tobillo_pierna1=self.tracking[self.obj_frame-1][2]
    objective_angle_cadera_tobillo_pierna1=self.tracking[self.obj_frame-1][3]
    objective_angle_cadera_rodilla_pierna2=self.tracking[self.obj_frame-1][4]
    objective_angle_rodilla_tobillo_pierna2=self.tracking[self.obj_frame-1][5]
    objective_angle_cadera_tobillo_pierna2=self.tracking[self.obj_frame-1][6]
    self.objective_pos=-self.tracking[self.obj_frame-1][7]
    rp=np.exp(-2*((objective_angle_cadera_rodilla_pierna1-current_angle_cadera_rodilla_pierna1)**2+(objective_angle_rodilla_tobillo_pierna1-current_angle_rodilla_tobillo_pierna1)**2+(objective_angle_cadera_rodilla_pierna2-current_angle_cadera_rodilla_pierna2)**2+(objective_angle_rodilla_tobillo_pierna2-current_angle_rodilla_tobillo_pierna2)**2))
    rv=0
    re=np.exp(-40*((objective_angle_cadera_tobillo_pierna1-current_angle_cadera_tobillo_pierna1)**2+(objective_angle_cadera_tobillo_pierna2-current_angle_cadera_tobillo_pierna2)**2))
    rc=np.exp(-10*((self.objective_pos-current_pos)**2))
    env_reward=0.65*rp+0.1*rv+0.15*re+0.1*rc
    done=False
    if(self.state[2]<-0.15 or self.state[11]<-0.15):
      done=True
      reward=-80
    if(self.data.time >= self.final_simulation_time):
      done=True
    info={
      "reward": env_reward,
      "state": self.state,
      "objective": self.tracking[self.obj_frame-1],
      "time": self.data.time
    }


    #Guardar una observación para entrenar el discriminador más adelante
    joint_angles = np.array([current_angle_cadera_rodilla_pierna1,
                              current_angle_rodilla_tobillo_pierna1,
                              current_angle_cadera_tobillo_pierna1,
                              current_angle_cadera_rodilla_pierna2,
                              current_angle_rodilla_tobillo_pierna2,
                              current_angle_cadera_tobillo_pierna2,
                              -current_pos])
    
    #print("1")
    self.agent_buffer.append(joint_angles.copy())
    #print("2")
    #Calcula recompensa AMP
    with torch.no_grad():
        amp_input = torch.from_numpy(joint_angles).float().unsqueeze(0).to(self.device)
        amp_reward = self.discriminator(amp_input).item()
    #print("3")
    
    
    reward = (1-self.amp_rew_prop) * env_reward + self.amp_rew_prop * (0.9*amp_reward)



    return self.state, reward, done, info


  def render(self):
    if len(self.frames) < self.data.time * self.framerate:
      self.renderer.update_scene(self.data, camera='camara_rodilla')
      pixels = self.renderer.render()
      self.frames.append(pixels)

  def reset(self):
    self.frames=[]
    self.obj_frame=1
    mujoco.mj_resetData(self.model, self.data) 
    self.state=np.concatenate((self.data.geom('TrackerCadera_pierna1_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna1_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna1_geom').xpos,
                              self.data.geom('TrackerCadera_pierna2_geom').xpos,
                              self.data.geom('TrackerRodilla_pierna2_geom').xpos,
                              self.data.geom('TrackerTobillo_pierna2_geom').xpos,
                              np.array([self.data.time])))
    self.objective_pos=-self.tracking[self.obj_frame-1][7]
    return self.state

In [None]:
# Se Entrena el discriminador con un Callback cada n pasos

class AMPCallback(BaseCallback):
    def __init__(self, env, train_freq=2048, batch_size=128, **kwargs):
        super().__init__(**kwargs)
        self.env = env
        self.train_freq = train_freq
        self.batch_size = batch_size
        self.disc_train_step = 0   #un contador de monitoreo para debugging

    def _on_step(self):
        if self.n_calls % self.train_freq == 0:
            self.train_discriminator()
        return True

    def train_discriminator(self):
        # Datos del agente
        fake = np.array(self.env.agent_buffer)
        self.env.agent_buffer = []  # Limpia el buffer
        
        # Datos reales
        real = self.env.real_data[np.random.choice(len(self.env.real_data), len(fake))]
        
        # Tensors
        fake_t = torch.tensor(fake, dtype=torch.float32).to(self.env.device)
        real_t = torch.tensor(real, dtype=torch.float32).to(self.env.device)
        labels_fake = torch.zeros((len(fake), 1), device=self.env.device)
        labels_real = torch.ones((len(real), 1), device=self.env.device)

        # Mezclar
        x = torch.cat([real_t, fake_t])
        y = torch.cat([labels_real, labels_fake])

        dataset = TensorDataset(x, y)
        loader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)

        # Entrenamiento
        self.env.discriminator.train()
        opt = torch.optim.Adam(self.env.discriminator.parameters(), lr=1e-4)
        loss_fn = nn.BCELoss()

        for batch_x, batch_y in loader:
            opt.zero_grad()
            pred = self.env.discriminator(batch_x)
            loss = loss_fn(pred, batch_y)
            loss.backward()
            opt.step()
            self.disc_train_step += 1
            # Debugging
            #if self.disc_train_step % 100 == 0:
            #    print(f"[Discriminator] Step: {self.disc_train_step} | Loss: {loss.item():.4f}")


In [6]:
#Crea el ambiente
env=RobotBipedoEnv(xml_text=xml, tracking_array=array_datos,tiempo_max_sim=2,tracking_fps=30)
#Crea el callback
callback = AMPCallback(env)
#Crea el modelo normal
policy_kwargs = dict(activation_fn=th.nn.ReLU,net_arch=dict(pi=[1024,512,256,128], vf=[1024,512,256,128]))
model = PPO(
        "MlpPolicy", env,
        learning_rate=0.0001,
        n_steps=4096,
        batch_size=1024,
        n_epochs=10,
        gamma=0.99,
        gae_lambda=0.99,
        clip_range=0.15,
        ent_coef=0.001,
        policy_kwargs=policy_kwargs,
        verbose=0
    )


#Configura el logger
# Crear el nombre del archivo CSV con timestamp
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M')
csv_path = f"./logs/ppo_amp2_{timestamp}.csv"
# Crear un nuevo logger con CSV
new_logger = Logger(folder="./logs/",output_formats=[CSVOutputFormat(csv_path)])
# Asignar el nuevo logger al modelo
model.set_logger(new_logger)


#Entrena con callback
model.learn(total_timesteps=5_000_000, progress_bar=True, callback=callback)

Output()

¿AMP usando cpu o gpu?
Utilizando cuda para entrenar AMP.
AMP Reward: 12.5%




<stable_baselines3.ppo.ppo.PPO at 0x18a91df98e0>

In [7]:
model.save(f"ppo_amp2_{timestamp}")
print(f"model saved as ppo_amp2_{timestamp}")

model saved as ppo_amp2_2025-05-11_21-55


In [8]:
eval_env=RobotBipedoEnv(xml_text=xml, tracking_array=array_datos, tiempo_max_sim=2)
state=eval_env.reset()
new_state=state
done=False
recompensa_acumulada=0
pasos=0

while not done:
  state=new_state
  pasos+=1
  action, _states = model.predict(state, deterministic=True)
  new_state,reward,done,info=eval_env.step(action)
  recompensa_acumulada+=reward
  eval_env.render()

print("Recompensa total {} en {} pasos de tiempo.".format(recompensa_acumulada,pasos))
media.show_video(eval_env.frames, fps=eval_env.framerate)

¿AMP usando cpu o gpu?
Utilizando cuda para entrenar AMP.
AMP Reward: 12.5%
Recompensa total 398.85315582538317 en 1000 pasos de tiempo.


0
This browser does not support the video tag.
