# Entrenamiento y Evaluación del Agente KUKA Arm

Este notebook implementa el entrenamiento y evaluación de un agente de aprendizaje por refuerzo para controlar un brazo robótico KUKA en un entorno simulado.

## 0. Configuración del Entorno de Google Colab

Primero, montamos Google Drive y configuramos las dependencias necesarias.

In [None]:
# Montar Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Instalar dependencias necesarias
!pip install gymnasium pybullet stable-baselines3

## 1. Configuración del Entorno

Importamos las dependencias necesarias y definimos el entorno del brazo robótico KUKA.

In [None]:
import gymnasium as gym
from gymnasium import spaces
import pybullet as p
import pybullet_data
import numpy as np
import os
import time
from stable_baselines3 import PPO
import matplotlib.pyplot as plt
import csv

# Configurar rutas de Google Drive
DRIVE_PATH = '/content/drive/MyDrive/KUKA_Project'
ASSETS_PATH = os.path.join(DRIVE_PATH, 'assets')
LOGS_PATH = os.path.join(DRIVE_PATH, 'logs')

# Crear directorios necesarios si no existen
os.makedirs(ASSETS_PATH, exist_ok=True)
os.makedirs(LOGS_PATH, exist_ok=True)

### Subir el archivo glass.urdf a Google Drive

Necesitamos subir el archivo glass.urdf a la carpeta assets en Google Drive. Puedes hacerlo manualmente o usar el siguiente bloque para subirlo desde local.

In [None]:
from google.colab import files

# Descomenta y ejecuta esta celda para subir el archivo glass.urdf
# uploaded = files.upload()
# for filename in uploaded.keys():
#     if filename == 'glass.urdf':
#         !mv {filename} {ASSETS_PATH}/

### Definición del Entorno KUKA

In [None]:
class KukaArmEnv(gym.Env):
    def __init__(self, render=False):
        super().__init__()
        self.render_mode = render
        self.max_steps = 200
        self.step_count = 0
        self.episode_count = 0
        self.contact_made = False
        self.prev_dist = None
        self.cup_initial_pos = None

        if self.render_mode:
            p.connect(p.GUI)
        else:
            p.connect(p.DIRECT)

        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        self.kuka = None
        self.cup_id = None

        self.action_space = spaces.Box(low=-2.0, high=2.0, shape=(7,), dtype=np.float32)
        self.observation_space = spaces.Box(low=-10.0, high=10.0, shape=(10,), dtype=np.float32)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        p.resetSimulation()
        p.setGravity(0, 0, -9.8)
        p.loadURDF("plane.urdf")

        self.kuka = p.loadURDF("kuka_iiwa/model.urdf", useFixedBase=True)

        if self.episode_count < 100:
            cup_x = 0.6
        else:
            cup_x = np.random.uniform(0.5, 0.65)

        # Usar la ruta de Google Drive para el archivo glass.urdf
        cup_path = os.path.join(ASSETS_PATH, "glass.urdf")
        self.cup_initial_pos = [cup_x, 0, 0.06]
        self.cup_id = p.loadURDF(cup_path, basePosition=self.cup_initial_pos)

        initial_angles = [0.0, -0.9, 0.0, -1.4, 0.0, 1.4, 0.0]
        for j in range(7):
            p.resetJointState(self.kuka, j, targetValue=initial_angles[j])

        self.step_count = 0
        self.contact_made = False
        self.episode_count += 1
        self.prev_dist = None

        return self._get_obs(), {}

    def _get_obs(self):
        joint_states = [p.getJointState(self.kuka, i)[0] for i in range(7)]
        cup_pos = p.getBasePositionAndOrientation(self.cup_id)[0]
        return np.array(joint_states + list(cup_pos), dtype=np.float32)

    def step(self, action):
        for i in range(7):
            p.setJointMotorControl2(self.kuka, i, p.POSITION_CONTROL,
                                    targetPosition=action[i], force=200)

        for _ in range(10):
            p.stepSimulation()
            if self.render_mode:
                time.sleep(1./240.)

        efector_pos = p.getLinkState(self.kuka, 6)[0]
        cup_pos, _ = p.getBasePositionAndOrientation(self.cup_id)
        dist = np.linalg.norm(np.array(efector_pos) - np.array(cup_pos))
        efector_height = efector_pos[2]

        terminated = False
        truncated = False
        reward = -0.2 * dist

        if self.prev_dist is not None:
            if dist < self.prev_dist:
                reward += 0.05
            else:
                reward -= 0.05
        self.prev_dist = dist

        if efector_height > 0.4:
            reward -= 0.1 * (efector_height - 0.4)

        y_error = abs(efector_pos[1] - cup_pos[1])
        if y_error < 0.05:
            reward += 0.05

        codo_angle = p.getJointState(self.kuka, 3)[0]
        if abs(codo_angle) < 0.5:
            reward -= 0.1
        elif -1.5 < codo_angle < -0.8:
            reward += 0.05

        if dist < 0.08:
            p.resetBaseVelocity(self.cup_id, linearVelocity=[0, 0, -1.0])
            reward = 1.0
            terminated = True
            return self._get_obs(), reward, terminated, truncated, {}

        dx = abs(cup_pos[0] - self.cup_initial_pos[0])
        dz = abs(cup_pos[2] - self.cup_initial_pos[2])
        if dx > 0.02 or dz > 0.02:
            reward = 1.0
            terminated = True
            return self._get_obs(), reward, terminated, truncated, {}

        self.step_count += 1
        if self.step_count >= self.max_steps:
            reward = -1.0
            truncated = True

        return self._get_obs(), reward, terminated, truncated, {}

    def close(self):
        p.disconnect()

## 2. Entrenamiento del Agente

Ahora entrenaremos el agente usando el algoritmo PPO (Proximal Policy Optimization).

In [None]:
# Inicializar el entorno
env = KukaArmEnv(render=False)

# Crear y entrenar el modelo
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=30000)

# Guardar el modelo entrenado en Google Drive
model_path = os.path.join(DRIVE_PATH, "ppo_kuka_arm")
model.save(model_path)

# Inicializar array para guardar recompensas
np.save(os.path.join(LOGS_PATH, "rewards_kuka.npy"), [])

## 3. Evaluación del Agente

Evaluaremos el agente entrenado en 50 episodios y analizaremos su rendimiento.

In [None]:
# Inicializar entorno con renderizado
env = KukaArmEnv(render=True)
model = PPO.load(model_path, env=env)

total_episodes = 50
success_count = 0
results = []

for ep in range(total_episodes):
    obs, _ = env.reset()
    total_reward = 0
    done = False
    step = 0
    hit = False

    while not done:
        action, _ = model.predict(obs)
        obs, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        if reward == 1.0:
            hit = True
        done = terminated or truncated
        step += 1

    success = 1 if hit else 0
    success_count += success
    print(f"[EPISODIO {ep+1:02d}] Recompensa total: {total_reward:.2f} → {'✅ ÉXITO' if success else '❌ FALLA'}")
    results.append([ep+1, float(total_reward), "Éxito" if success else "Falla"])

# Guardar resultados en CSV en Google Drive
csv_path = os.path.join(LOGS_PATH, "eval_results.csv")
with open(csv_path, "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["Episodio", "Recompensa", "Resultado"])
    writer.writerows(results)

# Imprimir resumen
print("\n📊 EVALUACIÓN COMPLETA")
print(f"Total de episodios: {total_episodes}")
print(f"Episodios exitosos: {success_count}")
print(f"Tasa de éxito: {100 * success_count / total_episodes:.2f}%")

## 4. Visualización de Resultados

Visualizaremos los resultados del entrenamiento y la evaluación.

In [None]:
# Gráfica de recompensas
rewards = [row[1] for row in results]
rolling_avg = np.convolve(rewards, np.ones(5)/5, mode='valid')

plt.figure(figsize=(8, 4))
plt.plot(rewards, label="Recompensa por episodio")
plt.plot(rolling_avg, label="Promedio móvil (5)", linestyle='--')
plt.xlabel("Episodio")
plt.ylabel("Recompensa")
plt.title("Evaluación del agente PPO")
plt.legend()
plt.grid(True)
plt.tight_layout()

# Guardar la gráfica en Google Drive
plot_path = os.path.join(LOGS_PATH, "eval_rewards_plot.png")
plt.savefig(plot_path)
plt.show()

# Cerrar entorno
env.close()