# Highway-Env: Conducci√≥n Aut√≥noma con RL

## Objetivos de este Notebook

1. **Entender la observaci√≥n kinematics**: ¬øC√≥mo ve el agente a otros veh√≠culos?
2. **Estudiar gamma bajo**: ¬øPor qu√© Œ≥=0.8 funciona mejor que Œ≥=0.99?
3. **Transfer learning**: ¬øGeneraliza de highway a merge?
4. **Reward shaping**: Velocidad vs seguridad
5. **M√∫ltiples entornos**: Highway, Parking, Intersection

---

## Prerequisitos

```bash
pip install highway-env stable-baselines3
```

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import time

import gymnasium as gym

try:
    import highway_env
    HIGHWAY_AVAILABLE = True
    print("Highway-Env disponible")
except ImportError:
    HIGHWAY_AVAILABLE = False
    print("Instalar con: pip install highway-env")

from stable_baselines3 import DQN, PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor

---

# 1. Entornos Disponibles

| Entorno | ID | Descripci√≥n |
|---------|-----|-------------|
| Highway | `highway-v0` | Adelantar coches en autopista |
| Merge | `merge-v0` | Incorporarse a autopista |
| Roundabout | `roundabout-v0` | Navegar rotonda |
| Parking | `parking-v0` | Aparcar en plaza |
| Intersection | `intersection-v0` | Cruzar intersecci√≥n |

In [None]:
if HIGHWAY_AVAILABLE:
    env = gym.make("highway-v0")
    
    print("="*60)
    print("ENTORNO: Highway-v0")
    print("="*60)
    print(f"\nObservaci√≥n: {env.observation_space}")
    print(f"Acciones: {env.action_space}")
    
    # Ver observaci√≥n
    obs, info = env.reset()
    print(f"\nObservaci√≥n shape: {obs.shape}")
    print(f"Ejemplo: {obs}")
    
    env.close()

---

# 2. An√°lisis de la Observaci√≥n Kinematics

## ¬øC√≥mo ve el agente a otros veh√≠culos?

La observaci√≥n es una **matriz 5√ó5** donde:
- Fila 0: Ego-vehicle (tu coche)
- Filas 1-4: Veh√≠culos cercanos

Cada fila tiene 5 features:

| Columna | Feature | Descripci√≥n |
|---------|---------|-------------|
| 0 | presence | ¬øHay veh√≠culo? (0/1) |
| 1 | x | Posici√≥n X relativa |
| 2 | y | Posici√≥n Y relativa |
| 3 | vx | Velocidad X relativa |
| 4 | vy | Velocidad Y relativa |

In [None]:
if HIGHWAY_AVAILABLE:
    env = gym.make("highway-v0")
    obs, _ = env.reset()
    
    print("="*60)
    print("OBSERVACI√ìN KINEMATICS (5 veh√≠culos √ó 5 features)")
    print("="*60)
    
    features = ["presence", "x", "y", "vx", "vy"]
    vehiculos = ["Ego (t√∫)", "Veh√≠culo 1", "Veh√≠culo 2", "Veh√≠culo 3", "Veh√≠culo 4"]
    
    print(f"\n{'Veh√≠culo':<15}", end="")
    for f in features:
        print(f"{f:>10}", end="")
    print()
    print("-" * 65)
    
    for i, (nombre, fila) in enumerate(zip(vehiculos, obs)):
        print(f"{nombre:<15}", end="")
        for val in fila:
            print(f"{val:>10.3f}", end="")
        print()
    
    env.close()

## Acciones Disponibles

| Acci√≥n | Descripci√≥n |
|--------|-------------|
| 0 | LANE_LEFT (cambiar carril izquierda) |
| 1 | IDLE (mantener) |
| 2 | LANE_RIGHT (cambiar carril derecha) |
| 3 | FASTER (acelerar) |
| 4 | SLOWER (frenar) |

## Funci√≥n de Recompensa

| Evento | Recompensa |
|--------|------------|
| Colisi√≥n | -1 |
| Velocidad alta | +0.4 (proporcional) |
| Carril derecho | +0.1 |
| Cambio de carril | -0.1 (peque√±a penalizaci√≥n) |

---

# 3. C√≥digo Base

In [None]:
class HighwayCallback(BaseCallback):
    """Callback para highway-env."""
    
    def __init__(self, verbose=0):
        super().__init__(verbose)
        self.episode_rewards = []
        self.crashes = []
    
    def _on_step(self) -> bool:
        for info in self.locals.get('infos', []):
            if 'episode' in info:
                self.episode_rewards.append(info['episode']['r'])
            if 'crashed' in info:
                self.crashes.append(info['crashed'])
        return True


def entrenar_highway(env_id="highway-v0", timesteps=30000, algoritmo="DQN", **kwargs):
    """Entrena en un entorno de highway-env."""
    env = gym.make(env_id)
    env = Monitor(env)
    
    config = {
        "learning_rate": 5e-4,
        "gamma": 0.8,  # Horizonte corto para conducci√≥n
        "buffer_size": 50000,
    }
    config.update(kwargs)
    
    if algoritmo == "DQN":
        model = DQN("MlpPolicy", env, verbose=0, **config)
    else:
        model = PPO("MlpPolicy", env, verbose=0, 
                    learning_rate=config['learning_rate'],
                    gamma=config['gamma'])
    
    callback = HighwayCallback()
    model.learn(total_timesteps=timesteps, callback=callback, progress_bar=True)
    
    env.close()
    return model, callback

print("Funciones cargadas")

---

# 4. VARIANTE A: Transfer Learning (Highway ‚Üí Merge)

In [None]:
def estudiar_transfer_learning(timesteps=20000):
    """
    Estudia si un modelo entrenado en highway generaliza a merge.
    """
    if not HIGHWAY_AVAILABLE:
        return {}
    
    print("="*60)
    print("VARIANTE A: Transfer Learning")
    print("="*60)
    
    resultados = {}
    
    # 1. Entrenar en highway
    print("\n1. Entrenando en Highway...")
    model_highway, cb_highway = entrenar_highway("highway-v0", timesteps)
    
    # Evaluar en highway
    env_eval = gym.make("highway-v0")
    mean_hw, std_hw = evaluate_policy(model_highway, env_eval, n_eval_episodes=10)
    env_eval.close()
    print(f"   En Highway: {mean_hw:.1f} ¬± {std_hw:.1f}")
    
    # 2. Evaluar en merge (sin reentrenar)
    print("\n2. Evaluando en Merge (sin reentrenar)...")
    env_merge = gym.make("merge-v0")
    mean_merge_transfer, std_merge = evaluate_policy(model_highway, env_merge, n_eval_episodes=10)
    env_merge.close()
    print(f"   Transfer a Merge: {mean_merge_transfer:.1f} ¬± {std_merge:.1f}")
    
    # 3. Entrenar desde cero en merge
    print("\n3. Entrenando desde cero en Merge...")
    model_merge, cb_merge = entrenar_highway("merge-v0", timesteps)
    
    env_merge2 = gym.make("merge-v0")
    mean_merge_scratch, _ = evaluate_policy(model_merge, env_merge2, n_eval_episodes=10)
    env_merge2.close()
    print(f"   Desde cero en Merge: {mean_merge_scratch:.1f}")
    
    resultados = {
        'highway_rewards': cb_highway.episode_rewards,
        'highway_eval': mean_hw,
        'transfer_to_merge': mean_merge_transfer,
        'merge_from_scratch': mean_merge_scratch
    }
    
    # Conclusi√≥n
    print("\n" + "-"*60)
    print("CONCLUSI√ìN:")
    if mean_merge_transfer > mean_merge_scratch * 0.5:
        print("  El modelo S√ç transfiere conocimiento a merge")
    else:
        print("  El modelo NO transfiere bien a merge")
    
    return resultados

if HIGHWAY_AVAILABLE:
    resultados_a = estudiar_transfer_learning(timesteps=15000)

---

# 5. VARIANTE B: Estudio de Gamma

### ¬øPor qu√© gamma bajo para conducci√≥n?

En conducci√≥n aut√≥noma:
- Las decisiones deben ser **r√°pidas y locales**
- El futuro lejano es **muy incierto** (otros conductores)
- Œ≥=0.8 funciona mejor que Œ≥=0.99

In [None]:
def estudiar_gamma_highway(timesteps=15000):
    """
    Compara diferentes valores de gamma en highway.
    """
    if not HIGHWAY_AVAILABLE:
        return {}
    
    print("="*60)
    print("VARIANTE B: Estudio de Gamma en Highway")
    print("="*60)
    
    gammas = [0.8, 0.9, 0.95, 0.99]
    resultados = {}
    
    for gamma in gammas:
        print(f"\nEntrenando con gamma={gamma}...")
        model, callback = entrenar_highway("highway-v0", timesteps, gamma=gamma)
        
        env_eval = gym.make("highway-v0")
        mean_reward, std_reward = evaluate_policy(model, env_eval, n_eval_episodes=10)
        env_eval.close()
        
        resultados[gamma] = {
            'rewards': callback.episode_rewards,
            'mean': mean_reward,
            'std': std_reward
        }
        print(f"  gamma={gamma}: {mean_reward:.1f} ¬± {std_reward:.1f}")
    
    return resultados

if HIGHWAY_AVAILABLE:
    resultados_b = estudiar_gamma_highway(timesteps=10000)

In [None]:
# Visualizar
if HIGHWAY_AVAILABLE and 'resultados_b' in dir() and resultados_b:
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    for gamma, data in resultados_b.items():
        rewards = data['rewards']
        if len(rewards) > 5:
            smoothed = np.convolve(rewards, np.ones(5)/5, mode='valid')
            axes[0].plot(smoothed, label=f'Œ≥={gamma}')
    axes[0].set_xlabel('Episodio')
    axes[0].set_ylabel('Recompensa')
    axes[0].set_title('Efecto de Gamma en Highway')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)
    
    gammas = list(resultados_b.keys())
    means = [resultados_b[g]['mean'] for g in gammas]
    axes[1].bar([str(g) for g in gammas], means)
    axes[1].set_xlabel('Gamma')
    axes[1].set_ylabel('Recompensa Final')
    axes[1].set_title('Rendimiento por Gamma')
    
    plt.tight_layout()
    plt.show()

---

# 6. VARIANTE C: Reward Shaping (Velocidad vs Seguridad)

In [None]:
def crear_entorno_custom(reward_speed_range=[0.2, 0.5], collision_reward=-1.0):
    """
    Crea entorno con recompensa personalizada.
    """
    env = gym.make("highway-v0")
    
    # Configurar recompensa
    env.unwrapped.config["reward_speed_range"] = reward_speed_range
    env.unwrapped.config["collision_reward"] = collision_reward
    
    return Monitor(env)


def estudiar_reward_shaping(timesteps=15000):
    """
    Compara diferentes configuraciones de recompensa.
    """
    if not HIGHWAY_AVAILABLE:
        return {}
    
    print("="*60)
    print("VARIANTE C: Reward Shaping")
    print("="*60)
    
    configs = {
        "Balanceado": {"reward_speed_range": [0.2, 0.5], "collision_reward": -1.0},
        "Solo Velocidad": {"reward_speed_range": [0.5, 1.0], "collision_reward": -0.5},
        "Solo Seguridad": {"reward_speed_range": [0.0, 0.2], "collision_reward": -5.0},
    }
    
    resultados = {}
    
    for nombre, config in configs.items():
        print(f"\nEntrenando '{nombre}'...")
        print(f"  speed_range={config['reward_speed_range']}, collision={config['collision_reward']}")
        
        env = crear_entorno_custom(**config)
        
        model = DQN("MlpPolicy", env, verbose=0,
                    learning_rate=5e-4, gamma=0.8, buffer_size=50000)
        
        callback = HighwayCallback()
        model.learn(total_timesteps=timesteps, callback=callback, progress_bar=True)
        
        # Evaluar
        env_eval = gym.make("highway-v0")
        mean_reward, std_reward = evaluate_policy(model, env_eval, n_eval_episodes=10)
        
        resultados[nombre] = {
            'rewards': callback.episode_rewards,
            'crashes': callback.crashes,
            'mean': mean_reward,
            'std': std_reward
        }
        
        crash_rate = np.mean(callback.crashes) if callback.crashes else 0
        print(f"  Resultado: {mean_reward:.1f} (crash rate: {crash_rate:.2%})")
        
        env.close()
        env_eval.close()
    
    return resultados

if HIGHWAY_AVAILABLE:
    resultados_c = estudiar_reward_shaping(timesteps=10000)

---

# 7. VARIANTE D: DQN vs PPO en Parking

In [None]:
def comparar_en_parking(timesteps=20000):
    """
    Compara DQN y PPO en el entorno de parking.
    """
    if not HIGHWAY_AVAILABLE:
        return {}
    
    print("="*60)
    print("VARIANTE D: DQN vs PPO en Parking")
    print("="*60)
    
    resultados = {}
    
    for algo in ["DQN", "PPO"]:
        print(f"\nEntrenando {algo} en parking...")
        
        env = gym.make("parking-v0")
        env = Monitor(env)
        
        if algo == "DQN":
            model = DQN("MlpPolicy", env, verbose=0,
                       learning_rate=5e-4, gamma=0.8, buffer_size=50000)
        else:
            model = PPO("MlpPolicy", env, verbose=0,
                       learning_rate=5e-4, gamma=0.8)
        
        callback = HighwayCallback()
        model.learn(total_timesteps=timesteps, callback=callback, progress_bar=True)
        
        env_eval = gym.make("parking-v0")
        mean_reward, std_reward = evaluate_policy(model, env_eval, n_eval_episodes=10)
        env_eval.close()
        
        resultados[algo] = {
            'rewards': callback.episode_rewards,
            'mean': mean_reward,
            'std': std_reward
        }
        
        print(f"  {algo}: {mean_reward:.1f} ¬± {std_reward:.1f}")
        env.close()
    
    return resultados

if HIGHWAY_AVAILABLE:
    resultados_d = comparar_en_parking(timesteps=10000)

---

# 8. Conclusiones

## ¬øQu√© aprendimos?

1. **Observaci√≥n kinematics**: Matriz 5√ó5 con informaci√≥n relativa de veh√≠culos cercanos

2. **Gamma bajo funciona mejor**: Œ≥=0.8 es mejor que Œ≥=0.99 para conducci√≥n porque:
   - Las decisiones son locales
   - El futuro es muy incierto

3. **Transfer learning**: El modelo puede generalizar parcialmente entre entornos similares

4. **Reward shaping**: El balance velocidad/seguridad es crucial

## Referencias

- [Highway-Env Documentation](https://highway-env.readthedocs.io/)
- [RL for Autonomous Driving Survey](https://arxiv.org/abs/2002.00444)

---

## üöó Variantes de Entrenamiento ‚Äî Highway

Las variantes en Highway-Env exploran una pregunta diferente: **¬øc√≥mo transferir conocimiento entre entornos?**

Con 7 entornos disponibles (autopista, rotonda, intersecci√≥n...), podemos entrenar un agente en uno y ver c√≥mo se adapta a otros.

| Variante | Estrategia | Entornos | Concepto |
|----------|-----------|---------|---------|
| A | Entorno √∫nico *(actual)* | 1 entorno | Baseline |
| B | Transfer Learning | 2 entornos | Conocimiento reutilizable |
| C | Curriculum Learning | 5 entornos prog. | Aprender gradualmente |

**Entornos disponibles** (orden de dificultad aproximado):
`highway-fast` ‚Üí `highway` ‚Üí `merge` ‚Üí `roundabout` ‚Üí `intersection`

### Variante A ‚Äî Entrenamiento en Entorno √önico *(actual)*

```python
python highway_conduccion.py --env highway --algorithm DQN
python highway_conduccion.py --env intersection --algorithm DQN
```

Entrena DQN o PPO en un entorno espec√≠fico. Es el baseline para comparar con las otras variantes.

**Observaci√≥n**: matriz 5√ó5 de cinem√°tica (5 veh√≠culos √ó 5 caracter√≠sticas: presencia, x, y, vx, vy)
**Acciones**: LANE_LEFT, IDLE, LANE_RIGHT, FASTER, SLOWER
**Reward**: velocidad alta (+0.4), carril derecho (+0.1), colisi√≥n (-1.0)

In [None]:
# Variante A: Entrenamiento en un solo entorno
# from highway_conduccion import entrenar_highway
# model, callback = entrenar_highway(env_name="highway", timesteps=50000, algorithm="DQN")

print("Variante A: Entrenamiento en entorno √∫nico")
print()
print("Entornos disponibles:")
entornos = {
    "highway":      "Autopista: adelantar coches, mantener velocidad",
    "highway-fast": "Autopista simplificada (m√°s r√°pida)",
    "merge":        "Incorporarse a autopista",
    "roundabout":   "Rotonda",
    "parking":      "Aparcar en plaza (acciones continuas)",
    "intersection": "Cruzar intersecci√≥n",
    "racetrack":    "Circuito de carreras",
}
for name, desc in entornos.items():
    print(f"  {name:<15}: {desc}")
print()
print("Para entrenar en cualquier entorno:")
print("  entrenar_highway(env_name='intersection', timesteps=50000, algorithm='DQN')")

### Variante B ‚Äî Transfer Learning entre Entornos

```python
python highway_conduccion.py --transfer
python highway_conduccion.py --transfer --source highway --target intersection
```

**Idea**: las habilidades aprendidas en autopista (mantener velocidad, no chocar) son parcialmente √∫tiles al cruzar una intersecci√≥n, aunque el escenario sea diferente.

**Flujo**:
1. **Fase 1**: Entrenar en entorno *fuente* (m√°s sencillo, ej. highway) hasta convergencia
2. **Fase 2**: Transferir pesos al entorno *objetivo* (m√°s complejo, ej. intersection) y hacer fine-tuning

```
highway (simple) ‚îÄ‚îÄ‚ñ∫ entrenar ‚îÄ‚îÄ‚ñ∫ pesos red
                                       ‚îÇ
                                  set_env(intersection)
                                       ‚îÇ
intersection (complejo) ‚îÄ‚îÄ‚ñ∫ fine-tune ‚îÄ‚îÄ‚ñ∫ pol√≠tica final
```

**Hiperpar√°metro cr√≠tico**: learning rate del fine-tuning. Debe ser menor que el entrenamiento base para no "olvidar" lo aprendido (*catastrophic forgetting*).

In [None]:
# Variante B: Transfer Learning
# from highway_conduccion import transfer_learning
# model, cb_src, cb_tgt = transfer_learning(
#     source_env="highway",
#     target_env="intersection",
#     source_timesteps=80000,
#     target_timesteps=40000,
#     algorithm="DQN"
# )
# Genera: highway_transfer_learning.png

print("Variante B: Transfer Learning highway ‚Üí intersection")
print()
transfer_code = """
# Fase 1: entrenar en fuente
model, cb_source = entrenar_highway("highway", timesteps=80000, algorithm="DQN")

# Fase 2: transferir al objetivo
env_target = crear_entorno("intersection")
model.set_env(env_target)            # Mismo modelo, nuevo entorno
model.learning_rate = 1e-4           # LR reducido: fine-tuning suave

model.learn(
    total_timesteps=40000,
    reset_num_timesteps=False,       # No reiniciar el contador
)
"""
print(transfer_code)
print("La funci√≥n tambi√©n entrena un modelo desde cero en el objetivo")
print("para comparar 'Con transfer' vs 'Sin transfer'.")

### Variante C ‚Äî Curriculum Learning Progresivo

```python
python highway_conduccion.py --curriculum
```

El agente aprende conducci√≥n de forma gradual, como un estudiante humano:

| Nivel | Entorno | Habilidad a√±adida |
|-------|---------|-----------------|
| 1 | highway-fast | Flujo b√°sico, no chocar |
| 2 | highway | Adelantamientos, cambios de carril |
| 3 | merge | Incorporaci√≥n, gesti√≥n de huecos |
| 4 | roundabout | Decisiones de giro, prioridad |
| 5 | intersection | Coordinaci√≥n compleja |

**Por qu√© funciona**: cada nivel a√±ade una habilidad nueva sobre las anteriores. El agente no tiene que aprender todo desde cero al cambiar de entorno.

**Comparaci√≥n con Variante A** (entrenamiento directo en intersection):
- Sin curriculum: el agente ve colisiones constantemente al principio ‚Üí se√±al de reward muy escasa
- Con curriculum: el agente ya sabe conducir cuando llega a intersection ‚Üí aprende m√°s r√°pido

In [None]:
# Variante C: Curriculum Learning
# from highway_conduccion import curriculum_learning
# model, historial = curriculum_learning(timesteps_per_env=20000, algorithm="DQN")
# Genera: highway_curriculum.png

print("Variante C: Curriculum Learning (5 niveles)")
print()
curriculum_code = """
curriculum = [
    ("highway-fast",  "Autopista simple"),     # Nivel 1
    ("highway",       "Autopista completa"),   # Nivel 2
    ("merge",         "Incorporaci√≥n"),         # Nivel 3
    ("roundabout",    "Rotonda"),               # Nivel 4
    ("intersection",  "Intersecci√≥n"),          # Nivel 5
]

# En cada nivel:
if nivel == 0:
    model = DQN("MlpPolicy", env, ...)  # Crear modelo nuevo
else:
    model.set_env(env)                   # Transferir al siguiente entorno
    model.learning_rate *= 0.8           # Reducir LR progresivamente

model.learn(timesteps_per_env, reset_num_timesteps=(nivel==0))
model.save(f"nivel_{nivel}_{env_name}.zip")  # Checkpoint por nivel
"""
print(curriculum_code)

### Comparativa de Variantes

| Aspecto | A: √önico | B: Transfer | C: Curriculum |
|---------|----------|-------------|---------------|
| Entornos | 1 | 2 | 5 |
| Complejidad impl. | Baja | Media | Media |
| Rendimiento en entorno f√°cil | Alto | Alto | Alto |
| Rendimiento en entorno dif√≠cil | Bajo | Medio | Alto |
| Tiempo total | Corto | Medio | Largo |

**Cu√°ndo usar cada variante**:
- **A** ‚Äî Baseline o cuando solo interesa un entorno
- **B** ‚Äî Cuando tienes un entorno fuente y quieres adaptar a otro
- **C** ‚Äî Cuando quieres el mejor rendimiento posible en el entorno m√°s dif√≠cil

**Lecci√≥n**: el curriculum learning es una de las t√©cnicas m√°s potentes para entornos dif√≠ciles donde el reward es escaso al principio.