<a href="https://colab.research.google.com/github/FelixMaturano/SIS420/blob/main/brazo_Robotico.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

 1. Entorno del Brazo Robótico (ArmEnv)

In [None]:
!pip install gymnasium pybullet torch numpy opencv-python

In [None]:
import os, math, pybullet as p, pybullet_data, gymnasium as gym
from gymnasium import spaces
import numpy as np
from pathlib import Path

URDF_ROOT = Path(pybullet_data.getDataPath())

class ArmEnv(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 60}

    def __init__(self, render_mode: str | None = None, max_steps: int = 300):
        super().__init__()
        self.render_mode = render_mode
        self.client = p.connect(p.GUI if render_mode == "human" else p.DIRECT)
        p.setAdditionalSearchPath(str(URDF_ROOT))
        p.setGravity(0, 0, -9.81, physicsClientId=self.client)

        # Cargar mesa, brazo y vaso
        self.plane = p.loadURDF("plane.urdf", physicsClientId=self.client)
        self.table = p.loadURDF("table/table.urdf", [0.5, 0, 0], p.getQuaternionFromEuler([0, 0, 0]), physicsClientId=self.client)
        self.arm = p.loadURDF("kuka_iiwa/model.urdf", [0, 0, 0.7], useFixedBase=True, physicsClientId=self.client)
        self.cup_visual = p.createVisualShape(p.GEOM_CYLINDER, radius=0.035, length=0.1, rgbaColor=[0.95, 0.9, 0.8, 1])
        self.cup_collision = p.createCollisionShape(p.GEOM_CYLINDER, radius=0.035, height=0.1)
        self.cup = None

        self.max_steps = max_steps
        self.step_ctr = 0
        self._setup_spaces()

    def reset(self, seed: int | None = None, options: dict | None = None):
        super().reset(seed=seed)
        self.step_ctr = 0
        if self.cup is not None:
            p.removeBody(self.cup, physicsClientId=self.client)

        # Posición aleatoria del vaso
        xy = self.np_random.uniform(low=-0.5, high=0.5, size=(2,))
        cup_pos = [xy[0] + 0.5, xy[1], 0.82]  # sobre la mesa
        self.cup = p.createMultiBody(baseMass=0.05,
                                     baseCollisionShapeIndex=self.cup_collision,
                                     baseVisualShapeIndex=self.cup_visual,
                                     basePosition=cup_pos,
                                     baseOrientation=[0, 0, 0, 1],
                                     physicsClientId=self.client)

        # Configuración home del brazo
        home_q = [0, -1.0, 0, -1.7, 0, 1.2, 0]
        for j, q in enumerate(home_q):
            p.resetJointState(self.arm, j, q, physicsClientId=self.client)

        obs = self._get_obs()
        info = {}
        return obs, info

    def step(self, action: np.ndarray):
        self.step_ctr += 1
        # Control de juntas
        dv = 0.05  # rad/step
        for j in range(7):
            cur_q = p.getJointState(self.arm, j, physicsClientId=self.client)[0]
            target_q = cur_q + np.clip(action[j], -1, 1) * dv
            p.setJointMotorControl2(self.arm, j, p.POSITION_CONTROL, targetPosition=target_q, force=200, physicsClientId=self.client)

        # Empuje vertical
        push_strength = np.clip(action[-1], 0, 1)
        p.applyExternalForce(objectUniqueId=self.arm, linkIndex=6,
                             forceObj=[0, 0, -50 * push_strength],
                             posObj=[0, 0, 0], flags=p.LINK_FRAME,
                             physicsClientId=self.client)

        p.stepSimulation(physicsClientId=self.client)
        if self.render_mode == "human":
            p.configureDebugVisualizer(p.COV_ENABLE_SINGLE_STEP_RENDERING, 1, physicsClientId=self.client)

        obs = self._get_obs()
        reward, terminated, truncated = self._get_reward_done()
        info = {}
        return obs, reward, terminated, truncated, info

    def render(self):
        if self.render_mode == "rgb_array":
            width, height = 640, 480
            view_matrix = p.computeViewMatrixFromYawPitchRoll(
                cameraTargetPosition=[0.5, 0, 0.5],
                distance=1.5,
                yaw=45,
                pitch=-30,
                roll=0,
                upAxisIndex=2,
                physicsClientId=self.client)
            proj_matrix = p.computeProjectionMatrixFOV(
                fov=60, aspect=width/height, nearVal=0.1, farVal=10.0)
            img = p.getCameraImage(width, height, view_matrix, proj_matrix,
                                 renderer=p.ER_BULLET_HARDWARE_OPENGL,
                                 physicsClientId=self.client)[2]
            return np.asarray(img)
        return None

    def close(self):
        p.disconnect(self.client)

    def _setup_spaces(self):
        high = np.array([1] * 10, dtype=np.float32)
        self.observation_space = spaces.Box(-high, high, dtype=np.float32)
        self.action_space = spaces.Box(low=-1, high=1, shape=(8,), dtype=np.float32)

    def _get_obs(self):
        cup_pos, _ = p.getBasePositionAndOrientation(self.cup, physicsClientId=self.client)
        cup_pos = np.array(cup_pos, dtype=np.float32)
        q = [p.getJointState(self.arm, j, physicsClientId=self.client)[0] for j in range(7)]
        q = np.array(q, dtype=np.float32)
        obs = np.concatenate([cup_pos, q])
        # Normalización
        obs[:3] = obs[:3] - np.array([0.5, 0, 0.82])
        obs[3:] = q / math.pi
        return obs

    def _get_reward_done(self):
        cup_pos, cup_orient = p.getBasePositionAndOrientation(self.cup, physicsClientId=self.client)
        z_up = p.getMatrixFromQuaternion(cup_orient)[2]
        done_success = z_up < 0
        cup_height_ok = cup_pos[2] > 0.79

        reward = 0.0
        if done_success and cup_height_ok:
            reward += 1.0
        elif cup_height_ok:
            reward += 0.5 * (1 - abs(z_up))
        else:
            reward -= 1.0
        reward -= 1.0 / self.max_steps

        terminated = done_success or not cup_height_ok
        truncated = self.step_ctr >= self.max_steps
        return reward, terminated, truncated

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ActorCritic(nn.Module):
    def __init__(self, img_channels: int, state_dim: int, action_dim: int):
        super().__init__()
        # CNN para imágenes (placeholder)
        self.cnn = nn.Sequential(
            nn.Conv2d(img_channels, 32, 8, stride=4), nn.GELU(),
            nn.Conv2d(32, 64, 4, stride=2), nn.GELU(),
            nn.Conv2d(64, 64, 3, stride=1), nn.GELU(),
            nn.Flatten())
        cnn_out = 7 * 7 * 64  # para 84×84

        # MLP para estados
        self.mlp_state = nn.Sequential(
            nn.Linear(state_dim, 128), nn.GELU(),
            nn.Linear(128, 128), nn.GELU())

        # Fusión
        fusion_dim = cnn_out + 128
        self.fusion = nn.Sequential(
            nn.Linear(fusion_dim, 256), nn.GELU())

        # Cabezas de política y valor
        self.policy_mu = nn.Linear(256, action_dim)
        self.policy_logstd = nn.Parameter(torch.zeros(action_dim))
        self.value_head = nn.Linear(256, 1)

    def forward(self, img: torch.Tensor, state: torch.Tensor):
        img_feat = self.cnn(img / 255.0)
        state_feat = self.mlp_state(state)
        x = torch.cat([img_feat, state_feat], dim=-1)
        x = self.fusion(x)
        return self.policy_mu(x), self.policy_logstd.exp(), self.value_head(x).squeeze(-1)

    def act(self, img, state):
        mu, std, _ = self.forward(img, state)
        dist = torch.distributions.Normal(mu, std)
        action = dist.sample()
        log_prob = dist.log_prob(action).sum(-1)
        return action, log_prob, dist

    def value(self, img, state):
        _, _, v = self.forward(img, state)
        return v


In [None]:
class RolloutBuffer:
    def __init__(self, size, obs_shape, state_dim, action_dim, device):
        self.size = size
        self.ptr = 0
        self.full = False
        self.device = device
        self.imgs = torch.zeros((size, *obs_shape), dtype=torch.uint8, device=device)
        self.states = torch.zeros((size, state_dim), device=device)
        self.actions = torch.zeros((size, action_dim), device=device)
        self.rewards = torch.zeros(size, device=device)
        self.dones = torch.zeros(size, device=device)
        self.logprobs = torch.zeros((size, action_dim), device=device)
        self.values = torch.zeros(size, device=device)
        self.advantages = torch.zeros(size, device=device)
        self.returns = torch.zeros(size, device=device)

    def add(self, img, state, action, reward, done, logprob, value):
        self.imgs[self.ptr] = img
        self.states[self.ptr] = state
        self.actions[self.ptr] = action
        self.rewards[self.ptr] = reward
        self.dones[self.ptr] = done
        self.logprobs[self.ptr] = logprob
        self.values[self.ptr] = value
        self.ptr += 1
        if self.ptr >= self.size:
            self.full = True
            self.ptr = 0

    def compute_returns_advantages(self, last_value, gamma=0.99, lam=0.95):
        gae = 0
        for i in reversed(range(self.size)):
            delta = self.rewards[i] + gamma * (1 - self.dones[i]) * (
                last_value if i == self.size - 1 else self.values[i + 1]) - self.values[i]
            gae = delta + gamma * lam * (1 - self.dones[i]) * gae
            self.advantages[i] = gae
            self.returns[i] = gae + self.values[i]

In [None]:
class PPO:
    def __init__(self, actor_critic: nn.Module, lr=3e-4, eps_clip=0.2,
                 vf_coef=0.5, ent_coef=0.01, max_grad_norm=0.5, device="cpu"):
        self.ac = actor_critic
        self.optimizer = Adam(self.ac.parameters(), lr=lr)
        self.eps_clip = eps_clip
        self.vf_coef = vf_coef
        self.ent_coef = ent_coef
        self.max_grad_norm = max_grad_norm
        self.device = device

    def update(self, buffer: RolloutBuffer, batch_size=64, epochs=10):
        idxs = torch.arange(buffer.size, device=self.device)
        adv = (buffer.advantages - buffer.advantages.mean()) / (buffer.advantages.std() + 1e-8)

        for _ in range(epochs):
            perm = idxs[torch.randperm(buffer.size)]
            for i in range(0, buffer.size, batch_size):
                batch = perm[i:i+batch_size]
                imgs = buffer.imgs[batch].float()
                states = buffer.states[batch]
                actions = buffer.actions[batch]
                old_logp = buffer.logprobs[batch]
                returns = buffer.returns[batch]
                adv_b = adv[batch]

                mu, std, values = self.ac(imgs, states)
                dist = torch.distributions.Normal(mu, std)
                logp = dist.log_prob(actions).sum(-1)
                ratio = torch.exp(logp - old_logp.sum(-1))

                surr1 = ratio * adv_b
                surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * adv_b
                policy_loss = -torch.min(surr1, surr2).mean()
                value_loss = F.mse_loss(values, returns)
                entropy = dist.entropy().sum(-1).mean()

                loss = policy_loss + self.vf_coef * value_loss - self.ent_coef * entropy

                self.optimizer.zero_grad()
                loss.backward()
                nn.utils.clip_grad_norm_(self.ac.parameters(), self.max_grad_norm)
                self.optimizer.step()

In [None]:
from gymnasium.vector import AsyncVectorEnv
from torch.optim import Adam

# Configuración
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
N_ENVS = 4  # Reducido para Colab
ROLLOUT_STEPS = 512  # Reducido para Colab
TOTAL_UPDATES = 1000  # Reducido para demostración

# Crear entorno
def make_env(render=False):
    def _thunk():
        return ArmEnv(render_mode="human" if render else None)
    return _thunk

vec_env = AsyncVectorEnv([make_env(render=(i==0)) for i in range(N_ENVS)])
obs, _ = vec_env.reset()

# Dimensiones (usando imagen dummy)
img_shape = (1, 84, 84)  # 1 canal (escala de grises)
state_dim = obs.shape[1]  # 10

# Modelo y algoritmo
ac = ActorCritic(img_channels=1, state_dim=state_dim, action_dim=8).to(DEVICE)
ppo = PPO(ac, device=DEVICE)
buffer = RolloutBuffer(ROLLOUT_STEPS * N_ENVS, img_shape, state_dim, 8, DEVICE)

# Bucle de entrenamiento
for update in range(TOTAL_UPDATES):
    for step in range(ROLLOUT_STEPS):
        # Observaciones (imagen dummy por ahora)
        img = torch.zeros((N_ENVS, *img_shape), dtype=torch.uint8, device=DEVICE)
        state = torch.tensor(obs, dtype=torch.float32, device=DEVICE)

        with torch.no_grad():
            action, logp, _ = ac.act(img, state)
            value = ac.value(img, state)

        actions_np = action.cpu().numpy()
        next_obs, rewards, terms, truncs, _ = vec_env.step(actions_np)

        # Almacenar en buffer
        for e in range(N_ENVS):
            buffer.add(img[e], state[e], action[e], rewards[e],
                      terms[e] or truncs[e], logp[e], value[e])
        obs = next_obs

    # Calcular ventajas
    with torch.no_grad():
        last_img = torch.zeros((N_ENVS, *img_shape), dtype=torch.uint8, device=DEVICE)
        last_state = torch.tensor(obs, dtype=torch.float32, device=DEVICE)
        last_value = ac.value(last_img, last_state).cpu()

    buffer.compute_returns_advantages(last_value)

    # Actualizar política
    ppo.update(buffer, batch_size=64, epochs=4)

    print(f"Update {update+1}/{TOTAL_UPDATES} - Avg Reward: {buffer.rewards.mean():.2f}")

# Cerrar entorno al finalizar
vec_env.close()