# Melliguer controller and MPPI Planner

## MPPI Planner

In [5]:
import numpy as np

def clamp_vec(x, lo, hi):
    return np.minimum(np.maximum(x, lo), hi)

class MPPIPlanner:
    """
    MPPI para dron como punto-masa:
      x = [p(3), v(3)]
      u = a_cmd(3)  (aceleración deseada)
    Obstáculos: esferas y cajas AABB (axis-aligned).
    """

    def __init__(
        self,
        dt,
        horizon=30,           # pasos
        num_samples=300,      # rollouts
        lambda_=1.0,
        noise_sigma=np.array([2.0, 2.0, 2.0]),  # ruido en aceleración
        u_min=np.array([-6.0, -6.0, -4.0]),
        u_max=np.array([ 6.0,  6.0,  4.0]),
        w_goal=6.0,
        w_terminal=50.0,
        w_u=0.05,
        w_smooth=0.02,
        w_obs=80.0,
        obs_margin=0.20,
        obs_softness=0.15,
        goal_tolerance=0.10
    ):
        self.dt = float(dt)
        self.H = int(horizon)
        self.K = int(num_samples)
        self.lambda_ = float(lambda_)

        self.noise_sigma = noise_sigma.astype(float)
        self.u_min = u_min.astype(float)
        self.u_max = u_max.astype(float)

        self.w_goal = float(w_goal)
        self.w_terminal = float(w_terminal)
        self.w_u = float(w_u)
        self.w_smooth = float(w_smooth)
        self.w_obs = float(w_obs)
        self.obs_margin = float(obs_margin)
        self.obs_softness = float(obs_softness)
        self.goal_tolerance = float(goal_tolerance)

        self.goal = np.zeros(3, dtype=float)

        # Secuencia nominal (H x 3)
        self.u_nom = np.zeros((self.H, 3), dtype=float)

        # Obstáculos: lista de dicts
        self.obstacles = []

        # RNG
        self.rng = np.random.default_rng(0)

    def set_goal(self, goal_xyz):
        self.goal = np.array(goal_xyz, dtype=float)

    def set_obstacles(self, obstacles):
        """
        obstacles: lista de dicts, ejemplos:
          {"type":"sphere", "c":[x,y,z], "r":0.25}
          {"type":"box", "c":[x,y,z], "h":[hx,hy,hz]}   # AABB half-sizes
        """
        self.obstacles = obstacles

    # ---------------------------
    # Distancias a obstáculos
    # ---------------------------
    def _sphere_penalty(self, p, c, r):
        # distancia al borde (positiva fuera)
        d = np.linalg.norm(p - c) - (r + self.obs_margin)
        # penalización suave: grande cerca / dentro
        # inside => d<0 => exp(-d/s) crece muchísimo
        return np.exp(-d / self.obs_softness)

    def _box_penalty(self, p, c, h):
        # AABB: distancia signed-like (aprox) usando distancia a caja
        q = np.abs(p - c) - h - self.obs_margin
        outside = np.maximum(q, 0.0)
        d_out = np.linalg.norm(outside)  # 0 si está dentro/proyectado
        # Si está dentro: q tiene componentes negativas
        inside = np.all(q <= 0.0)
        if inside:
            # dentro => penaliza muy fuerte
            return np.exp(+2.0 / self.obs_softness)
        else:
            return np.exp(-d_out / self.obs_softness)

    def _obstacle_cost(self, p):
        if not self.obstacles:
            return 0.0
        cost = 0.0
        for obs in self.obstacles:
            if obs["type"] == "sphere":
                cost += self._sphere_penalty(p, np.array(obs["c"], float), float(obs["r"]))
            elif obs["type"] == "box":
                cost += self._box_penalty(p, np.array(obs["c"], float), np.array(obs["h"], float))
        return self.w_obs * cost

    # ---------------------------
    # Dinámica discreta simple
    # ---------------------------
    def _step_dynamics(self, p, v, u):
        # semi-implicit Euler
        v2 = v + u * self.dt
        p2 = p + v2 * self.dt
        return p2, v2

    # ---------------------------
    # MPPI core
    # ---------------------------
    def compute_action(self, p0, v0):
        p0 = np.array(p0, dtype=float)
        v0 = np.array(v0, dtype=float)

        # Si ya estás cerca del goal, apaga aceleración (suaviza)
        if np.linalg.norm(p0 - self.goal) < self.goal_tolerance:
            self.u_nom[:] = 0.0
            return np.zeros(3), p0, v0

        # Muestreo de ruido: (K, H, 3)
        eps = self.rng.normal(0.0, 1.0, size=(self.K, self.H, 3)) * self.noise_sigma

        # Construir controles muestreados u = clamp(u_nom + eps)
        U = self.u_nom[None, :, :] + eps
        U = clamp_vec(U, self.u_min[None, None, :], self.u_max[None, None, :])

        costs = np.zeros(self.K, dtype=float)

        # Rollouts
        for k in range(self.K):
            p = p0.copy()
            v = v0.copy()
            J = 0.0
            u_prev = None
            for t in range(self.H):
                u = U[k, t]

                # costos de tracking + control
                # (puedes cambiar w_goal si quieres que sea más agresivo)
                J += self.w_goal * np.dot(p - self.goal, p - self.goal)
                J += self.w_u * np.dot(u, u)

                if u_prev is not None:
                    du = (u - u_prev)
                    J += self.w_smooth * np.dot(du, du)
                u_prev = u

                # obstáculo
                J += self._obstacle_cost(p)

                # dinámica
                p, v = self._step_dynamics(p, v, u)

            # costo terminal
            J += self.w_terminal * np.dot(p - self.goal, p - self.goal)
            J += self._obstacle_cost(p)

            costs[k] = J

        # Pesos MPPI
        beta = np.min(costs)
        w = np.exp(-(costs - beta) / self.lambda_)
        w_sum = np.sum(w) + 1e-12
        w = w / w_sum

        # Actualiza u_nom con promedio ponderado
        # u_nom <- sum_k w_k * U_k
        self.u_nom = np.tensordot(w, U, axes=(0, 0))

        # Acción receding-horizon: primer control
        u0 = self.u_nom[0].copy()

        # Shift del plan (para el próximo step)
        self.u_nom[:-1] = self.u_nom[1:]
        self.u_nom[-1] = self.u_nom[-2] * 0.5  # decaimiento suave

        # Predicción 1-step (para p_d, v_d)
        p1, v1 = self._step_dynamics(p0, v0, u0)
        return u0, p1, v1


## Mellinguer

In [8]:
import time
import numpy as np
import mujoco
from mujoco import viewer


# ============================================================
# 1) Utilidades matemáticas (simples)
# ============================================================

def clamp(x, lo, hi):
    """Limita un escalar entre [lo, hi]."""
    return max(lo, min(hi, x))

def vee(M):
    """
    Operador vee para matrices skew-symmetric:
    [  0  -z   y
       z   0  -x
      -y   x   0 ]  -> [x, y, z]
    """
    return np.array([M[2, 1], M[0, 2], M[1, 0]])

def quat_to_rotmat(q):
    """Quaternion (w,x,y,z) -> matriz de rotación 3x3."""
    w, x, y, z = q
    R = np.array([
        [1 - 2*(y*y + z*z),     2*(x*y - z*w),     2*(x*z + y*w)],
        [    2*(x*y + z*w), 1 - 2*(x*x + z*z),     2*(y*z - x*w)],
        [    2*(x*z - y*w),     2*(y*z + x*w), 1 - 2*(x*x + y*y)]
    ])
    return R


# ============================================================
# 2) Disturbancia: fuerza desconocida f^o (depende de posición)
# ============================================================

def unknown_force_world(pos):
    """
    Fuerza externa "desconocida" en coordenadas mundo.
    Puedes cambiarla por cualquier función suave de la posición.
    Unidades: Newtons (aprox).
    """
    x, y, z = pos
    fx = 0.8 * np.sin(0.7 * x)
    fy = 0.8 * np.cos(0.7 * y)
    fz = 0.4 * np.sin(0.5 * z)
    # return np.array([fx, fy, fz])
    return np.array([0.0, 0.0, 0.0])  # solo en XY para ver efecto horizontal
    


# ============================================================
# 3) Trayectoria deseada (por ahora: hover con pequeña oscilación)
# ============================================================

# ----------------------------
# Globals para que desired_trajectory no cambie firma
# ----------------------------
MPPI = None
CURRENT_POS = np.zeros(3)
CURRENT_VEL = np.zeros(3)

def desired_trajectory(t, p0):
    """
    Devuelve (p_d, v_d, a_d, yaw_d, yawdot_d) usando MPPI.
    Requiere que CURRENT_POS y CURRENT_VEL se actualicen en el loop.
    """
    global MPPI, CURRENT_POS, CURRENT_VEL

    # Si MPPI aún no inicializa, hover
    if MPPI is None:
        p_d = p0.copy()
        v_d = np.zeros(3)
        a_d = np.zeros(3)
        yaw_d = 0.0
        yawdot_d = 0.0
        return p_d, v_d, a_d, yaw_d, yawdot_d

    a_cmd, p_d, v_d = MPPI.compute_action(CURRENT_POS, CURRENT_VEL)

    # MPPI produce aceleración deseada (mundo)
    a_d = a_cmd

    yaw_d = 0.0
    yawdot_d = 0.0
    return p_d, v_d, a_d, yaw_d, yawdot_d



# ============================================================
# 4) Controlador Mellinger (versión clara y práctica)
# ============================================================

def mellinger_control(pos, vel, R, omega_body, p_d, v_d, a_d, yaw_d, yawdot_d, m, J):
    """
    Implementación simple del controlador tipo Mellinger:
      - Control de posición -> fuerza deseada F_des (mundo)
      - Construir R_des desde F_des y yaw_d
      - Control de actitud -> torque deseado M (en body)
      - Salida: thrust_total (N) y momentos M (N*m)

    NOTA:
    - omega_body debe estar en el marco body.
    - J aquí lo tomamos diagonal para simplificar.
    """

    # ----------------------------
    # Ganancias (tuneables)
    # ----------------------------
    Kp = np.diag([4.0, 4.0, 8.0])     # posición
    Kv = np.diag([3.0, 3.0, 5.0])     # velocidad

    KR = np.diag([8.0, 8.0, 2.0])     # actitud
    Kw = np.diag([0.25, 0.25, 0.15])  # velocidad angular

    g = 9.81
    e3 = np.array([0.0, 0.0, 1.0])

    # ----------------------------
    # (A) Control de posición -> aceleración deseada
    # e_p = p - p_d (convención común)
    # ----------------------------
    e_p = pos - p_d
    e_v = vel - v_d

    # a_cmd = a_d - Kp*e_p - Kv*e_v + g*e3
    a_cmd = a_d - Kp @ e_p - Kv @ e_v + g * e3

    # Fuerza deseada en el mundo
    F_des = m * a_cmd

    # ----------------------------
    # (B) Construir orientación deseada R_des
    # - b3_des apunta en dirección de F_des
    # - b1_des fija el yaw
    # ----------------------------
    F_norm = np.linalg.norm(F_des)
    if F_norm < 1e-6:
        F_norm = 1e-6

    b3_des = F_des / F_norm

    # Vector "frontal" según yaw deseado (en plano XY del mundo)
    b1_c = np.array([np.cos(yaw_d), np.sin(yaw_d), 0.0])

    # b2_des = b3 x b1_c (normalizado)
    b2_des = np.cross(b3_des, b1_c)
    if np.linalg.norm(b2_des) < 1e-6:
        b2_des = np.array([0.0, 1.0, 0.0])
    else:
        b2_des = b2_des / np.linalg.norm(b2_des)

    # b1_des = b2 x b3
    b1_des = np.cross(b2_des, b3_des)

    # Matriz deseada
    R_des = np.column_stack((b1_des, b2_des, b3_des))

    # ----------------------------
    # (C) Thrust total (N)
    # thrust = F_des · b3_actual
    # b3_actual = tercera columna de R
    # ----------------------------
    b3 = R[:, 2]
    thrust = float(F_des.dot(b3))

    # ----------------------------
    # (D) Error de actitud (forma geométrica)
    # e_R = 0.5 * vee(R_des^T R - R^T R_des)
    # ----------------------------
    e_R_mat = 0.5 * (R_des.T @ R - R.T @ R_des)
    e_R = vee(e_R_mat)

    # Velocidad angular deseada (simplificada)
    # Para empezar: solo yawdot alrededor de e3 en mundo -> lo pasamos a body aprox
    omega_des_world = np.array([0.0, 0.0, yawdot_d])
    omega_des_body = R.T @ omega_des_world

    e_omega = omega_body - omega_des_body

    # ----------------------------
    # (E) Torque deseado (en body)
    # M = -KR e_R - Kw e_omega + omega x (J omega)
    # ----------------------------
    Jomega = J @ omega_body
    coriolis = np.cross(omega_body, Jomega)

    M = -KR @ e_R - Kw @ e_omega + coriolis

    return thrust, M


# ============================================================
# 5) Mixer simple (thrust + torques -> 4 motores)
# ============================================================

def simple_mixer(thrust, M, ctrl_min=0.0, ctrl_max=2.0):
    """
    Mixer "práctico" para tu XML (4 motores con ctrlrange 0..2).
    OJO: esto NO usa parámetros reales (l, kf, km).
    Es un mezclador simple para que funcione en simulación.

    thrust: (N) lo escalamos a un valor de control
    M: torque en body (Mx, My, Mz) lo escalamos
    """

    # Escalas (tuneables según tu modelo/masa)
    kT = 0.6     # convierte Newton -> señal de motor (aprox)
    kMx = 0.8    # torque roll -> diferencia motores
    kMy = 0.8    # torque pitch
    kMz = 0.3    # torque yaw

    # Componente base igual para los 4 motores
    uT = kT * thrust

    Mx, My, Mz = M

    # Mezcla tipo "plus" (ajusta signos si tu dron gira raro)
    u0 = uT - kMy*My - kMz*Mz   # motor0
    u1 = uT + kMy*My + kMz*Mz   # motor1
    u2 = uT + kMx*Mx - kMz*Mz   # motor2
    u3 = uT - kMx*Mx + kMz*Mz   # motor3

    # Saturación al rango del XML
    ctrl = np.array([
        clamp(u0, ctrl_min, ctrl_max),
        clamp(u1, ctrl_min, ctrl_max),
        clamp(u2, ctrl_min, ctrl_max),
        clamp(u3, ctrl_min, ctrl_max),
    ], dtype=float)

    return ctrl


# ============================================================
# 6) Main: correr MuJoCo + control
# ============================================================

def main():
    # --- Cambia aquí tu XML ---
    XML_PATH = "models/jiawei_model_rep_single.xml"
    XML_PATH = "models/single_quadrotor_obstacles.xml"


    # Cargar modelo
    model = mujoco.MjModel.from_xml_path(XML_PATH)
    data = mujoco.MjData(model)

    dt = model.opt.timestep

    # Obtener ID del body del dron
    drone_id = mujoco.mj_name2id(model, mujoco.mjtObj.mjOBJ_BODY, "drone_3d")

    # Masa total del dron (incluye subcuerpos)
    m = float(model.body_subtreemass[drone_id])

    # Inercia (simplificada): usamos diagonal del body
    # (en modelos complejos esto no es perfecto, pero sirve para empezar)
    J_diag = model.body_inertia[drone_id].copy()
    J = np.diag(J_diag)

    # Posición inicial como referencia de hover
    mujoco.mj_forward(model, data)
    p0 = data.xpos[drone_id].copy()

    # ----------------------------
    # MPPI init
    # ----------------------------
    global MPPI
    MPPI = MPPIPlanner(
        dt=dt,
        horizon=15,
        num_samples=100,
        lambda_=2.0,
        noise_sigma=np.array([2.5, 2.5, 2.0]),
        u_min=np.array([-6.0, -6.0, -4.0]),
        u_max=np.array([ 6.0,  6.0,  4.0]),
        w_goal=4.0,
        w_terminal=80.0,
        w_u=0.04,
        w_smooth=0.02,
        w_obs=120.0,
        obs_margin=0.22,
        obs_softness=0.12
    )

    # Punto B (goal): ejemplo
    goal = p0 + np.array([2.5, 0.0, 0.0])
    MPPI.set_goal(goal)

    # Obstáculos (deben coincidir con el XML que te dejo abajo)
    obstacles = [
        {"type":"sphere", "c":[1.0,  0.0, 1.0], "r":0.25},
        {"type":"box",    "c":[1.6,  0.4, 1.0], "h":[0.20, 0.20, 0.40]},
        {"type":"box",    "c":[1.6, -0.4, 1.0], "h":[0.20, 0.20, 0.40]},
    ]
    MPPI.set_obstacles(obstacles)



    # Abrir visor
    with viewer.launch_passive(model, data) as v:
        t = 0.0

        while v.is_running():
            # ------------------------------------------------
            # (1) Leer estado del dron desde MuJoCo
            # ------------------------------------------------
            pos = data.xpos[drone_id].copy()

            # # cvel: [omega_world, v_world] en el marco mundo (aprox)
            # omega_world = data.cvel[drone_id, 0:3].copy()
            # vel_world   = data.cvel[drone_id, 3:6].copy()

            # quat = data.xquat[drone_id].copy()
            # R = quat_to_rotmat(quat)

            # Convertir omega a marco body (para control geométrico)
            # omega_body = R.T @ omega_world

            # ------------------------------------------------
            # (2) Aplicar fuerza desconocida al dron (disturbancia)
            # ------------------------------------------------
            f_unk = unknown_force_world(pos)

            # xfrc_applied: [torque(3), force(3)] en mundo, aplicado al body
            data.xfrc_applied[drone_id, 0:3] = 0.0     # sin torque externo
            data.xfrc_applied[drone_id, 3:6] = f_unk   # fuerza externa


            # Actualizar variables globales para desired_trajectory
            # ------------------------------------------------
            # (1) Leer estado
            # ------------------------------------------------
            pos = data.xpos[drone_id].copy()
            print("Current Position:", pos)
            omega_world = data.cvel[drone_id, 0:3].copy()
            vel_world   = data.cvel[drone_id, 3:6].copy()

            quat = data.xquat[drone_id].copy()
            R = quat_to_rotmat(quat)
            omega_body = R.T @ omega_world

            # >>> MPPI globals
            global CURRENT_POS, CURRENT_VEL
            CURRENT_POS = pos
            CURRENT_VEL = vel_world


            # ------------------------------------------------
            # (3) Generar referencia deseada
            # ------------------------------------------------
            p_d, v_d, a_d, yaw_d, yawdot_d = desired_trajectory(t, p0)
            print("Desired Position:", p_d)

            # ------------------------------------------------
            # (4) Control Mellinger -> thrust (N) y torque M (N*m)
            # ------------------------------------------------
            thrust, M = mellinger_control(
                pos=pos,
                vel=vel_world,
                R=R,
                omega_body=omega_body,
                p_d=p_d,
                v_d=v_d,
                a_d=a_d,
                yaw_d=yaw_d,
                yawdot_d=yawdot_d,
                m=m,
                J=J
            )

            # ------------------------------------------------
            # (5) Mixer -> comandos a 4 motores (0..2 según tu XML)
            # ------------------------------------------------
            ctrl = simple_mixer(thrust, M, ctrl_min=0.0, ctrl_max=2.0)
            data.ctrl[:] = ctrl

            # ------------------------------------------------
            # (6) Paso de simulación
            # ------------------------------------------------
            start = time.time()

            mujoco.mj_step(model, data)
            v.sync()
            t += dt

            # --- sincronizar con tiempo real ---
            elapsed = time.time() - start
            sleep_time = dt - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)



if __name__ == "__main__":
    main()


Current Position: [0. 0. 1.]
Desired Position: [1.12074494e-04 1.59418898e-05 9.99965666e-01]
Current Position: [-4.71897321e-06 -4.33594402e-09  1.00045659e+00]
Desired Position: [ 2.66631026e-04 -2.06660030e-04  1.00134889e+00]
Current Position: [-1.34728981e-05  9.32659644e-06  1.00160221e+00]
Desired Position: [4.44901867e-04 4.18333178e-05 1.00297448e+00]
Current Position: [-3.17741672e-06  3.39540249e-06  1.00325922e+00]
Desired Position: [ 5.51209371e-04 -9.14068921e-05  1.00528647e+00]
Current Position: [ 1.11945739e-04 -4.22822998e-05  1.00564516e+00]
Desired Position: [ 8.31052568e-04 -9.02175916e-05  1.00847435e+00]
Current Position: [ 4.30033298e-04 -1.46281390e-04  1.00890760e+00]
Desired Position: [ 1.44708154e-03 -3.59416393e-04  1.01281933e+00]
Current Position: [ 1.06838146e-03 -3.17907886e-04  1.01310183e+00]
Desired Position: [ 2.41499635e-03 -4.81640319e-04  1.01772949e+00]
Current Position: [ 2.10250750e-03 -5.62797444e-04  1.01821331e+00]
Desired Position: [ 3.869

KeyboardInterrupt: 