In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
from collections import deque
import random
import math

class RoboticArmEnvironment:
    """
    Entorno de simulación para el brazo robótico que debe voltear un vaso.
    Simplificado pero con las características esenciales del problema.
    """
    
    def __init__(self):
        # Configuración del entorno
        self.workspace_size = 1.0  # 1 metro de espacio de trabajo
        self.num_joints = 4  # 4 articulaciones del brazo
        self.max_steps = 200  # Máximo número de pasos por episodio
        
        # Estado del brazo (ángulos de articulaciones en radianes)
        self.joint_angles = np.zeros(self.num_joints)
        self.joint_limits = np.array([[-np.pi, np.pi]] * self.num_joints)
        
        # Posición del vaso (x, y, z) y orientación
        self.cup_position = np.zeros(3)
        self.cup_upright = True  # Si el vaso está derecho
        
        # Posición del end-effector del brazo
        self.end_effector_pos = np.zeros(3)
        
        # Contador de pasos
        self.current_step = 0
        
        # Configuración de dimensiones
        self.state_dim = self.num_joints + 6  # articulaciones + pos_vaso + pos_end_effector
        self.action_dim = self.num_joints + 1  # movimiento articulaciones + acción empuje
        
        self.reset()
    
    def reset(self):
        """Reinicia el entorno para un nuevo episodio"""
        # Posición inicial aleatoria del brazo
        self.joint_angles = np.random.uniform(-0.5, 0.5, self.num_joints)
        
        # Posición aleatoria del vaso en el espacio de trabajo
        self.cup_position = np.array([
            np.random.uniform(-0.4, 0.4),  # x
            np.random.uniform(0.2, 0.8),   # y (distancia del brazo)
            0.05  # z (altura fija sobre la mesa)
        ])
        self.cup_upright = True
        
        # Calcular posición inicial del end-effector
        self._update_end_effector_position()
        
        self.current_step = 0
        
        return self._get_state()
    
    def _update_end_effector_position(self):
        """Calcula la posición del end-effector basada en cinemática directa simplificada"""
        # Longitudes de los eslabones (simplificado)
        link_lengths = [0.3, 0.25, 0.2, 0.15]
        
        x = 0
        y = 0
        z = 0
        
        current_angle = 0
        for i, (angle, length) in enumerate(zip(self.joint_angles, link_lengths)):
            current_angle += angle
            x += length * np.cos(current_angle)
            y += length * np.sin(current_angle)
            if i == 0:  # Primera articulación controla altura
                z += length * np.sin(angle)
        
        self.end_effector_pos = np.array([x, y, max(0, z)])
    
    def _get_state(self):
        """Retorna el estado actual del entorno"""
        return np.concatenate([
            self.joint_angles,
            self.cup_position,
            self.end_effector_pos
        ])
    
    def step(self, action):
        """Ejecuta una acción en el entorno"""
        self.current_step += 1
        
        # Separar acciones de movimiento y empuje
        joint_actions = action[:-1]
        push_action = action[-1]
        
        # Aplicar acciones de movimiento (limitadas)
        delta_angles = np.clip(joint_actions * 0.1, -0.1, 0.1)
        self.joint_angles += delta_angles
        
        # Aplicar límites de articulaciones
        for i in range(self.num_joints):
            self.joint_angles[i] = np.clip(
                self.joint_angles[i], 
                self.joint_limits[i][0], 
                self.joint_limits[i][1]
            )
        
        # Actualizar posición del end-effector
        self._update_end_effector_position()
        
        # Calcular recompensa
        reward = self._calculate_reward(push_action)
        
        # Verificar si el episodio terminó
        done = self._check_done()
        
        return self._get_state(), reward, done, {}
    
    def _calculate_reward(self, push_action):
        """Calcula la recompensa basada en el estado actual"""
        reward = 0
        
        # Distancia al vaso
        distance_to_cup = np.linalg.norm(self.end_effector_pos - self.cup_position)
        
        # Recompensa por acercarse al vaso
        reward += -distance_to_cup * 2.0
        
        # Recompensa por estar cerca del vaso
        if distance_to_cup < 0.05:
            reward += 10.0
            
            # Si está cerca y ejecuta empuje
            if push_action > 0.5 and self.cup_upright:
                reward += 50.0  # Gran recompensa por voltear el vaso
                self.cup_upright = False
        
        # Penalización por salirse del espacio de trabajo
        if (abs(self.end_effector_pos[0]) > 0.5 or 
            self.end_effector_pos[1] < 0 or 
            self.end_effector_pos[1] > 1.0 or
            self.end_effector_pos[2] < 0):
            reward -= 20.0
        
        # Penalización por cada paso (incentiva eficiencia)
        reward -= 0.1
        
        return reward
    
    def _check_done(self):
        """Verifica si el episodio ha terminado"""
        # Termina si el vaso fue volteado o se alcanzó el límite de pasos
        return not self.cup_upright or self.current_step >= self.max_steps


class PPOAgent:
    """
    Agente PPO (Proximal Policy Optimization) para controlar el brazo robótico
    """
    
    def __init__(self, state_dim, action_dim, lr=3e-4):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.lr = lr
        
        # Redes neuronales
        self.policy_net = PolicyNetwork(state_dim, action_dim)
        self.value_net = ValueNetwork(state_dim)
        
        # Optimizadores
        self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
        
        # Hiperparámetros PPO
        self.gamma = 0.99
        self.lambda_gae = 0.95
        self.epsilon = 0.2
        self.entropy_coef = 0.01
        self.value_coef = 0.5
        
        # Buffer para almacenar experiencias
        self.buffer = []
    
    def get_action(self, state):
        """Selecciona una acción basada en la política actual"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        
        with torch.no_grad():
            mean, std = self.policy_net(state_tensor)
            dist = Normal(mean, std)
            action = dist.sample()
            log_prob = dist.log_prob(action).sum(-1)
            
        return action.squeeze().numpy(), log_prob.item()
    
    def store_transition(self, state, action, reward, next_state, done, log_prob):
        """Almacena una transición en el buffer"""
        self.buffer.append({
            'state': state,
            'action': action,
            'reward': reward,
            'next_state': next_state,
            'done': done,
            'log_prob': log_prob
        })
    
    def update(self):
        """Actualiza la política usando PPO"""
        if len(self.buffer) == 0:
            return
        
        # Procesar experiencias
        states = torch.FloatTensor([t['state'] for t in self.buffer])
        actions = torch.FloatTensor([t['action'] for t in self.buffer])
        rewards = [t['reward'] for t in self.buffer]
        dones = [t['done'] for t in self.buffer]
        old_log_probs = torch.FloatTensor([t['log_prob'] for t in self.buffer])
        
        # Calcular returns y advantages usando GAE
        returns, advantages = self._compute_gae(rewards, dones, states)
        
        # Múltiples épocas de entrenamiento
        for _ in range(4):
            # Calcular probabilidades actuales
            mean, std = self.policy_net(states)
            dist = Normal(mean, std)
            new_log_probs = dist.log_prob(actions).sum(-1)
            entropy = dist.entropy().sum(-1)
            
            # Ratio de probabilidades
            ratio = torch.exp(new_log_probs - old_log_probs)
            
            # Pérdida de política PPO
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.epsilon, 1 + self.epsilon) * advantages
            policy_loss = -torch.min(surr1, surr2).mean()
            
            # Pérdida de entropía
            entropy_loss = -self.entropy_coef * entropy.mean()
            
            # Pérdida total de política
            total_policy_loss = policy_loss + entropy_loss
            
            # Actualizar política
            self.policy_optimizer.zero_grad()
            total_policy_loss.backward()
            torch.nn.utils.clip_grad_norm_(self.policy_net.parameters(), 0.5)
            self.policy_optimizer.step()
            
            # Pérdida de valor
            values = self.value_net(states).squeeze()
            value_loss = F.mse_loss(values, returns)
            
            # Actualizar red de valor
            self.value_optimizer.zero_grad()
            value_loss.backward()
            torch.nn.utils.clip_grad_norm_(self.value_net.parameters(), 0.5)
            self.value_optimizer.step()
        
        # Limpiar buffer
        self.buffer.clear()
    
    def _compute_gae(self, rewards, dones, states):
        """Calcula Generalized Advantage Estimation"""
        values = self.value_net(states).squeeze().detach().numpy()
        
        returns = []
        advantages = []
        gae = 0
        
        for i in reversed(range(len(rewards))):
            if i == len(rewards) - 1:
                next_value = 0 if dones[i] else values[i]
            else:
                next_value = values[i + 1]
            
            delta = rewards[i] + self.gamma * next_value - values[i]
            gae = delta + self.gamma * self.lambda_gae * (1 - dones[i]) * gae
            advantages.insert(0, gae)
            returns.insert(0, gae + values[i])
        
        advantages = torch.FloatTensor(advantages)
        returns = torch.FloatTensor(returns)
        
        # Normalizar advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        return returns, advantages


class PolicyNetwork(nn.Module):
    """Red neuronal para la política"""
    
    def __init__(self, state_dim, action_dim):
        super(PolicyNetwork, self).__init__()
        
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        
        self.mean_head = nn.Linear(64, action_dim)
        self.log_std_head = nn.Linear(64, action_dim)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        
        mean = torch.tanh(self.mean_head(x))  # Acciones normalizadas
        log_std = torch.clamp(self.log_std_head(x), -20, 2)
        std = torch.exp(log_std)
        
        return mean, std


class ValueNetwork(nn.Module):
    """Red neuronal para la función de valor"""
    
    def __init__(self, state_dim):
        super(ValueNetwork, self).__init__()
        
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 1)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        value = self.fc4(x)
        
        return value


def train_agent(episodes=1000):
    """Función principal de entrenamiento"""
    env = RoboticArmEnvironment()
    agent = PPOAgent(env.state_dim, env.action_dim)
    
    # Métricas de entrenamiento
    episode_rewards = []
    success_rate = deque(maxlen=100)
    
    print("Iniciando entrenamiento del brazo robótico...")
    
    for episode in range(episodes):
        state = env.reset()
        episode_reward = 0
        
        for step in range(env.max_steps):
            # Seleccionar acción
            action, log_prob = agent.get_action(state)
            
            # Ejecutar acción
            next_state, reward, done, _ = env.step(action)
            
            # Almacenar transición
            agent.store_transition(state, action, reward, next_state, done, log_prob)
            
            state = next_state
            episode_reward += reward
            
            if done:
                success_rate.append(1 if not env.cup_upright else 0)
                break
        
        # Actualizar agente cada 32 episodios
        if episode % 32 == 0 and episode > 0:
            agent.update()
        
        episode_rewards.append(episode_reward)
        
        # Mostrar progreso
        if episode % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            avg_success = np.mean(success_rate) if success_rate else 0
            print(f"Episodio {episode}: Recompensa promedio: {avg_reward:.2f}, "
                  f"Tasa de éxito: {avg_success:.2%}")
    
    return agent, episode_rewards, success_rate


def evaluate_agent(agent, env, num_episodes=10):
    """Evalúa el rendimiento del agente entrenado"""
    successes = 0
    total_rewards = []
    
    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        
        for step in range(env.max_steps):
            action, _ = agent.get_action(state)
            state, reward, done, _ = env.step(action)
            episode_reward += reward
            
            if done:
                if not env.cup_upright:
                    successes += 1
                break
        
        total_rewards.append(episode_reward)
    
    success_rate = successes / num_episodes
    avg_reward = np.mean(total_rewards)
    
    print(f"\n=== EVALUACIÓN FINAL ===")
    print(f"Tasa de éxito: {success_rate:.2%}")
    print(f"Recompensa promedio: {avg_reward:.2f}")
    print(f"Episodios exitosos: {successes}/{num_episodes}")
    
    return success_rate, avg_reward


def plot_training_progress(episode_rewards, success_rate):
    """Visualiza el progreso del entrenamiento"""
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    
    # Recompensas por episodio
    ax1.plot(episode_rewards, alpha=0.6, color='blue')
    ax1.plot(np.convolve(episode_rewards, np.ones(100)/100, mode='valid'), 
             color='red', linewidth=2, label='Media móvil (100 episodios)')
    ax1.set_title('Recompensa por Episodio')
    ax1.set_xlabel('Episodio')
    ax1.set_ylabel('Recompensa')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # Tasa de éxito
    if len(success_rate) > 0:
        success_episodes = range(len(episode_rewards) - len(success_rate), len(episode_rewards))
        ax2.plot(success_episodes, success_rate, alpha=0.6, color='green')
        ax2.set_title('Tasa de Éxito (Últimos 100 episodios)')
        ax2.set_xlabel('Episodio')
        ax2.set_ylabel('Tasa de Éxito')
        ax2.set_ylim(0, 1)
        ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()


# Ejemplo de uso
if __name__ == "__main__":
    # Entrenar el agente
    trained_agent, rewards, success_rates = train_agent(episodes=1000)
    
    # Evaluar el agente entrenado
    env = RoboticArmEnvironment()
    evaluate_agent(trained_agent, env)
    
    # Visualizar resultados
    plot_training_progress(rewards, list(success_rates))
    
    print("\n=== SISTEMA COMPLETADO ===")
    print("El brazo robótico ha sido entrenado exitosamente!")
    print("Características del sistema:")
    print("- Algoritmo: PPO (Proximal Policy Optimization)")
    print("- Espacio de estados: Posición articulaciones + posición vaso + posición end-effector")
    print("- Espacio de acciones: Movimiento articulaciones + acción de empuje")
    print("- Objetivo: Localizar y voltear vaso en posición aleatoria")

ModuleNotFoundError: No module named 'torch'