## Módulo de inputs

In [None]:
import time
import platform
import subprocess
import abc
from enum import Enum, auto
from pynput.keyboard import Controller as KBController, Key

# TECLADO (MOVIMIENTO)

kb = KBController()

KEY_FORWARD = "w"
KEY_LEFT = "a"
KEY_RIGHT = "d"
KEY_JUMP = Key.space

class MoveHold(Enum):
    NEUTRAL = auto()
    FORWARD = auto()
    LEFT = auto()
    RIGHT = auto()

def _release_keys(keys):
    for k in keys:
        try:
            kb.release(k)
        except Exception:
            pass

def _tap(k, press_time=0.03):
    try:
        kb.press(k)
        time.sleep(press_time)
    finally:
        try:
            kb.release(k)
        except Exception:
            pass

class KeyboardMoveManager:
    """Mantiene 1 tecla de movimiento pulsada (W/A/D). Jump es tap independiente."""
    def __init__(self):
        self.current = MoveHold.NEUTRAL

    def set_hold(self, hold: MoveHold):
        if hold == self.current:
            return

        _release_keys([KEY_FORWARD, KEY_LEFT, KEY_RIGHT])

        if hold == MoveHold.FORWARD:
            kb.press(KEY_FORWARD)
        elif hold == MoveHold.LEFT:
            kb.press(KEY_LEFT)
        elif hold == MoveHold.RIGHT:
            kb.press(KEY_RIGHT)

        self.current = hold

    def jump(self):
        _tap(KEY_JUMP, press_time=0.03)

    def release_all(self):
        _release_keys([KEY_FORWARD, KEY_LEFT, KEY_RIGHT])
        self.current = MoveHold.NEUTRAL



# RATÓN (FACTORY)
class MouseInput(abc.ABC):
    """Clase envoltorio para enviar los inputs del ratón al juego"""
    @abc.abstractmethod
    def move(self, dx, dy): pass # Método para enviar movimientos del ratón

    @abc.abstractmethod
    def left_click(self): pass # Método para enviar evento de clic izquierdo


class LinuxMouseInput(MouseInput):
    """Implementación en Linux. Requiere instalar la libreria 'xdotools'."""
    def move(self, dx, dy):
        if dx == 0 and dy == 0:
            return
        try:
            subprocess.Popen(
                ["xdotool", "mousemove_relative", "--", str(int(dx)), str(int(dy))],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
            )
        except FileNotFoundError:
            print("Error: xdotool no instalado")

    def left_click(self):
        try:
            subprocess.Popen(
                ["xdotool", "click", "1"],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
            )
        except FileNotFoundError:
            print("Error: xdotool no instalado")


class WindowsMouseInput(MouseInput):
    """Implementación en Windows. Requiere del uso de  'pydirectinput'. Desarrollada por si algún compañero usa Windows"""
    def __init__(self):
        try:
            import pydirectinput
            self.engine = pydirectinput
            self.engine.FAILSAFE = False
            self.available = True
        except ImportError:
            print("Error: pydirectinput no instalado (pip install pydirectinput)")
            self.available = False

    def move(self, dx, dy):
        if not self.available or (dx == 0 and dy == 0):
            return
        self.engine.moveRel(int(dx), int(dy), relative=True)

    def left_click(self):
        if not self.available:
            return
        self.engine.click()


class MouseInputFactory:
    """Factory para crear una instnacia de MouseInput de manera sencilla y sin tener que realizar comprobaciones"""
    @staticmethod
    def create():
        system = platform.system()
        if system == "Linux":
            print("Sistema Linux detectado: Usando xdotool")
            return LinuxMouseInput()
        elif system == "Windows":
            print("Sistema Windows detectado: Usando pydirectinput")
            return WindowsMouseInput()
        else:
            print(f"Sistema {system} no soportado oficialmente")
            return None

## Módulo de detección de gestos.

In [2]:
import time
import math
import os
import urllib.request
from collections import deque
import cv2
import numpy as np
import mediapipe as mp


# CONTROLADOR DE POSE (MEDIAPIPE)
# - Usa MediaPipe para detectar landmarks del cuerpo.
# - Hace una calibración (neutral).
# - En ejecución decide: izquierda/derecha, avanzar (forward) y salto (jump).

LS, RS = 11, 12
LH, RH = 23, 24
MIN_VIS = 0.60

# Tiempos de calibración (espera inicial, neutral, forward)
PRE_CALIB_WAIT_SECONDS = 5.0
CALIB_NEUTRAL_SECONDS = 2.0
CALIB_FORWARD_SECONDS = 1.2

# Umbrales y filtros para detectar inclinación lateral y forward (con histéresis y suavizado)
LEAN_X_THRESH = 0.25
EMA_DY = 0.35
DY_ENTER_MIN = 0.030
DY_STD_MULT = 6.0
DY_FWD_FRAC = 0.70
DY_EXIT_RATIO = 0.65
STABLE_FRAMES_FWD = 3
DY_UP_BLOCK_MIN = 0.020

# Umbrales y filtros para detectar salto (altura + “velocidad”) y evitar dobles disparos
EMA_Y = 0.30
JUMP_UP_HIP = 0.10
JUMP_VEL = 0.06
JUMP_COOLDOWN_SEC = 0.55
JUMP_PREBLOCK_UP = 0.06
JUMP_PREBLOCK_VEL = 0.03
STABLE_FRAMES_SIDE = 2

# Conexiones entre landmarks para dibujar el esqueleto en pantalla (debug/visualización)
POSE_CONNECTIONS = [
    (11, 12), (11, 23), (12, 24), (23, 24),
    (11, 13), (13, 15), (12, 14), (14, 16),
    (23, 25), (25, 27), (24, 26), (26, 28),
    (27, 31), (28, 32), (27, 29), (28, 30),
    (29, 31), (30, 32)
]

def _vis(lm):
    """Lee la visibilidad del landmark (si no existe, asume 1.0)"""
    return float(getattr(lm, "visibility", 1.0))

def draw_pose(frame_bgr, lms):
    """ Dibuja puntos y líneas de la pose sobre el frame (útil para ver qué está detectando)"""
    h, w = frame_bgr.shape[:2]
    for lm in lms:
        cv2.circle(frame_bgr, (int(lm.x*w), int(lm.y*h)), 3, (255,255,255), -1)
    for a, b in POSE_CONNECTIONS:
        ax, ay = int(lms[a].x*w), int(lms[a].y*h)
        bx, by = int(lms[b].x*w), int(lms[b].y*h)
        cv2.line(frame_bgr, (ax, ay), (bx, by), (255,255,255), 2)

def center_norm(lms, a, b):
    """ Punto medio entre dos landmarks (sirve para centros de hombros y caderas)"""
    return (
        (lms[a].x + lms[b].x) * 0.5,
        (lms[a].y + lms[b].y) * 0.5,
        (lms[a].z + lms[b].z) * 0.5
    )

def extract_features(lms_norm):
    """Extrae las señales mínimas para controlar:
     - lean_x: inclinación lateral normalizada (izq/der)
     - shy: altura del centro de hombros (para forward)
     - hip_y_norm: altura de cadera normalizada (para salto)"""
    if not all(_vis(lms_norm[i]) >= MIN_VIS for i in (LS, RS, LH, RH)):
        return None

    shx, shy, _ = center_norm(lms_norm, LS, RS)
    hix, hiy, _ = center_norm(lms_norm, LH, RH)

    shoulder_w = abs(lms_norm[LS].x - lms_norm[RS].x) + 1e-6
    lean_x = (shx - hix) / shoulder_w

    torso_len = abs(hiy - shy) + 1e-6
    hip_y_norm = hiy / torso_len

    return {"lean_x": lean_x, "shy": shy, "hip_y_norm": hip_y_norm}

class Debouncer:
    """ Antirrebote: evita que la acción cambie por ruido exigiendo N frames seguidos iguales"""
    def __init__(self):
        self.last = None
        self.count = 0
    def reset(self):
        self.last = None
        self.count = 0
    def update(self, proposed, need):
        if proposed == self.last:
            self.count += 1
        else:
            self.last = proposed
            self.count = 1
        return self.count >= need

class PoseMovementController:
    """ Controlador principal:
     - Descarga el modelo si hace falta
     - Inicializa PoseLandmarker en modo VIDEO (requiere timestamp)
     - Calibra neutral/forward
     - En RUN decide movimientos a partir de las features"""
    MODEL_URL = "https://storage.googleapis.com/mediapipe-models/pose_landmarker/pose_landmarker_lite/float16/1/pose_landmarker_lite.task"
    MODEL_PATH = "pose_landmarker_lite.task"

    def __init__(self):
        self._ensure_model()
        self._init_landmarker()

        # Base de reloj monótono para generar timestamps consistentes
        self._clock0 = time.monotonic()
        self._last_ts_ms = 0

        self.reset()

    def _ensure_model(self):
        """ Si no existe el archivo del modelo, lo descarga"""
        if not os.path.exists(self.MODEL_PATH):
            print("Descargando modelo PoseLandmarker...")
            urllib.request.urlretrieve(self.MODEL_URL, self.MODEL_PATH)
            print("OK:", self.MODEL_PATH)

    def _init_landmarker(self):
        """ Crea el detector de pose con umbrales de confianza y modo VIDEO"""
        BaseOptions = mp.tasks.BaseOptions
        PoseLandmarker = mp.tasks.vision.PoseLandmarker
        PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
        RunningMode = mp.tasks.vision.RunningMode

        self._landmarker = PoseLandmarker.create_from_options(
            PoseLandmarkerOptions(
                base_options=BaseOptions(model_asset_path=self.MODEL_PATH),
                running_mode=RunningMode.VIDEO,
                num_poses=1,
                min_pose_detection_confidence=0.5,
                min_pose_presence_confidence=0.5,
                min_tracking_confidence=0.5
            )
        )

    def reset(self):
        """ Reinicia el flujo de calibración y los estados internos (filtros, historial, debouncer)"""
        self.stage = "WAIT"
        self.wait_start = None
        self.calib_start = None
        self.neu_feats = []
        self.fwd_feats = []
        self.neutral = None

        self.dy_s = 0.0
        self.y_s = 0.0
        self.y_hist = deque(maxlen=10)
        self.last_jump_time = 0.0
        self.deb = Debouncer()
        self.current_hold = MoveHold.NEUTRAL  # viene de la CELDA 1

    def close(self):
        """ Cierre seguro del landmarker"""
        try:
            self._landmarker.close()
        except Exception:
            pass

    def _decide(self, feat):
        """ Lógica de decisión:
         1) Prioriza lateral (izq/der) si la inclinación supera el umbral.
         2) Detecta salto con altura + velocidad + cooldown.
         3) Detecta forward con umbral dinámico + histéresis y bloqueos para no mezclarlo con salto."""
        lean_dx = feat["lean_x"] - self.neutral["lean0"]
        if abs(lean_dx) > LEAN_X_THRESH:
            action = MoveHold.RIGHT if lean_dx > 0 else MoveHold.LEFT
            return action, False, None

        y_raw = feat["hip_y_norm"] - self.neutral["hip_y0"]
        self.y_s = EMA_Y * y_raw + (1.0 - EMA_Y) * self.y_s
        self.y_hist.append(y_raw)

        vel = 0.0
        if len(self.y_hist) >= 4:
            vel = (self.y_hist[-4] - self.y_hist[-1])

        up = -(self.y_s)
        now = time.time()
        can_jump = (now - self.last_jump_time) >= JUMP_COOLDOWN_SEC
        do_jump = can_jump and (up > JUMP_UP_HIP) and (vel > JUMP_VEL)
        preblock_jump = (up > JUMP_PREBLOCK_UP and vel > JUMP_PREBLOCK_VEL)

        dy_raw = (feat["shy"] - self.neutral["shy0"])
        self.dy_s = EMA_DY * dy_raw + (1.0 - EMA_DY) * self.dy_s

        dy_enter = max(DY_ENTER_MIN, DY_STD_MULT * self.neutral["dy_std"], DY_FWD_FRAC * self.neutral["dy_fwd_delta"])
        dy_exit = dy_enter * DY_EXIT_RATIO

        if self.dy_s < -DY_UP_BLOCK_MIN:
            fb = MoveHold.NEUTRAL
        else:
            if self.current_hold != MoveHold.FORWARD and (do_jump or preblock_jump):
                fb = MoveHold.NEUTRAL
            else:
                if self.current_hold == MoveHold.FORWARD:
                    fb = MoveHold.FORWARD if self.dy_s > dy_exit else MoveHold.NEUTRAL
                else:
                    fb = MoveHold.FORWARD if self.dy_s > dy_enter else MoveHold.NEUTRAL

        return fb, do_jump, None

    def process_frame(self, frame_bgr):
        """ Procesamiento de frame:
         - Convierte a RGB y crea mp.Image
         - Genera timestamp monótono creciente (requisito del modo VIDEO)
         - Detecta pose, dibuja y extrae features
         - Si no está calibrado: avanza por etapas WAIT/NEUTRAL/FORWARD y calcula baseline
         - Si está calibrado: decide acción con debouncer y gestiona cooldown de salto"""
        rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb)

        timestamp_ms = int((time.monotonic() - self._clock0) * 1000)
        if timestamp_ms <= self._last_ts_ms:
            timestamp_ms = self._last_ts_ms + 1
        self._last_ts_ms = timestamp_ms

        result = self._landmarker.detect_for_video(mp_image, timestamp_ms)
        feat = None

        if result.pose_landmarks:
            lms = result.pose_landmarks[0]
            draw_pose(frame_bgr, lms)
            feat = extract_features(lms)

        do_jump = False

        if self.stage != "DONE":
            self.current_hold = MoveHold.NEUTRAL

            if self.stage == "WAIT":
                if feat is None:
                    self.wait_start = None
                else:
                    if self.wait_start is None:
                        self.wait_start = time.time()
                    if (time.time() - self.wait_start) >= PRE_CALIB_WAIT_SECONDS:
                        self.stage = "NEUTRAL"
                        self.calib_start = None
                        self.neu_feats.clear()
                        self.fwd_feats.clear()

            elif self.stage in ("NEUTRAL", "FORWARD"):
                if feat is None:
                    pass
                else:
                    if self.calib_start is None:
                        self.calib_start = time.time()
                        if self.stage == "NEUTRAL":
                            self.neu_feats.clear()
                        else:
                            self.fwd_feats.clear()

                    elapsed = time.time() - self.calib_start

                    if self.stage == "NEUTRAL":
                        self.neu_feats.append(feat)
                        if elapsed >= CALIB_NEUTRAL_SECONDS and len(self.neu_feats) >= 10:
                            self.stage = "FORWARD"
                            self.calib_start = None

                    elif self.stage == "FORWARD":
                        self.fwd_feats.append(feat)
                        if elapsed >= CALIB_FORWARD_SECONDS and len(self.fwd_feats) >= 8:
                            lean0 = sum(f["lean_x"] for f in self.neu_feats) / len(self.neu_feats)
                            shy0 = sum(f["shy"] for f in self.neu_feats) / len(self.neu_feats)
                            hip_y0 = sum(f["hip_y_norm"] for f in self.neu_feats) / len(self.neu_feats)

                            dys = [f["shy"] for f in self.neu_feats]
                            dy_std = math.sqrt(sum((x - shy0) ** 2 for x in dys) / len(dys))

                            shy_fwd = sum(f["shy"] for f in self.fwd_feats) / len(self.fwd_feats)
                            dy_fwd_delta = max(0.0, shy_fwd - shy0)

                            self.neutral = {
                                "lean0": lean0,
                                "shy0": shy0,
                                "dy_std": dy_std,
                                "dy_fwd_delta": dy_fwd_delta,
                                "hip_y0": hip_y0
                            }
                            self.stage = "DONE"
                            self.deb.reset()
                            self.dy_s = 0.0
                            self.y_s = 0.0
                            self.y_hist.clear()
                            self.current_hold = MoveHold.NEUTRAL

        else:
            if feat is None:
                self.current_hold = MoveHold.NEUTRAL
                do_jump = False
                self.dy_s = 0.0
                self.y_s = 0.0
                self.y_hist.clear()
                self.deb.reset()
            else:
                proposed_raw, do_jump, _ = self._decide(feat)

                need = STABLE_FRAMES_FWD if proposed_raw == MoveHold.FORWARD else STABLE_FRAMES_SIDE
                if self.deb.update(proposed_raw, need):
                    self.current_hold = proposed_raw

                if do_jump:
                    self.last_jump_time = time.time()

        return self.current_hold, do_jump, self.stage, None