# Basis _reinforcement learning_

## CartPole

CartPole is een klassiek controle probleem waarbij een staaf rechtop moet blijven op een kar die heen en weer kan bewegen.

[![CartPole](https://gymnasium.farama.org/_images/cart_pole.gif)](https://gymnasium.farama.org/)

### Het Probleem
- **State**: 4 continue waarden (positie kar, snelheid kar, hoek staaf, hoeksnelheid staaf)
- **Actions**: 2 discrete acties (duw naar links of rechts)
- **Reward**: +1 voor elke tijdstap waarbij de staaf rechtop blijft
- **Doel**: Hou de staaf zo lang mogelijk rechtop (max 500 tijdstappen)

### Setup
We gebruiken:
- [Gymnasium](https://gymnasium.farama.org/index.html): Een framework voor RL omgevingen (oorspronkelijk van OpenAI)
- [Stable-Baselines3](https://stable-baselines3.readthedocs.io/en/master/#): Kwalitatieve PyTorch implementaties van RL algoritmes

In [None]:
import gymnasium as gym
import numpy as np
import pandas as pd
import plotly.express as px
import torch
# from matplotlib import animation


# print(f"CUDA available: {torch.cuda.is_available()}")

### De Omgeving Verkennen

In [None]:
# Create the CartPole environment
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Reset environment to get initial state
state, info = env.reset(seed=42)

print("=== CartPole Environment ===")
print(f"State space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"\nInitial state: {state}")
print("\nState components:")
print(f"  [0] Cart Position: {state[0]:.3f}")
print(f"  [1] Cart Velocity: {state[1]:.3f}")
print(f"  [2] Pole Angle: {state[2]:.3f}")
print(f"  [3] Pole Angular Velocity: {state[3]:.3f}")
print("\nPossible actions:")
print("  0: Push cart to the LEFT")
print("  1: Push cart to the RIGHT")

### Random baseline agent

Voordat we een intelligent model trainen, kijken we eerst hoe een **random agent** (die willekeurige acties neemt) presteert. Dit geeft ons een baseline.

In [None]:
# Test random agent
def evaluate_random_agent(env, n_episodes=10, seed=42):
    """
    Evaluate a random agent that takes random actions.

    Args:
        env: Gymnasium environment
        n_episodes: Number of episodes to run
        seed: Random seed for reproducibility

    Returns
    -------
        List of episode rewards
    """
    episode_rewards = []

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed + episode)
        episode_reward = 0
        done = False
        truncated = False

        while not (done or truncated):
            # Random action
            action = env.action_space.sample()
            state, reward, done, truncated, info = env.step(action)
            episode_reward += reward

        episode_rewards.append(episode_reward)

    return episode_rewards


# Evaluate random agent
random_rewards = evaluate_random_agent(env, n_episodes=100)

print("=== Random Agent Performance ===")
print(f"Average reward: {np.mean(random_rewards):.2f} Â± {np.std(random_rewards):.2f}")
print(f"Min reward: {np.min(random_rewards):.2f}")
print(f"Max reward: {np.max(random_rewards):.2f}")

px.histogram(random_rewards, nbins=20, title="Random Agent: Reward Distribution").add_vline(
    x=np.mean(random_rewards),
    line_dash="dash",
    line_color="red",
    annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()

In [None]:
px.line(
    y=random_rewards,
    title="Random Agent: Reward per Episode",
    labels={"x": "Episode", "y": "Reward"},
).add_hline(
    y=np.mean(random_rewards),
    line_dash="dash",
    line_color="red",
    annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()


### Training met Deep Q-Network (DQN)

Nu gaan we een **Deep Q-Network (DQN)** trainen om een intelligente policy te leren. DQN is een value-based methode die een neural network gebruikt om de optimale $Q$-functie $Q^*(s,a)$ te benaderen.

In [None]:
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor

In [None]:
# Create a fresh environment for training
env = gym.make("CartPole-v1")

# Create DQN model with better hyperparameters
# The neural network will learn Q(s,a) for each state-action pair
model = DQN(
    "MlpPolicy",  # Multi-Layer Perceptron policy network
    env,
    learning_rate=1e-3,
    buffer_size=50000,
    learning_starts=1000,  # Start learning after more experiences
    batch_size=64,  # Larger batch size for more stable learning
    tau=1.0,
    gamma=0.99,  # Discount factor
    train_freq=4,
    target_update_interval=250,
    exploration_fraction=0.1,
    exploration_initial_eps=1.0,
    exploration_final_eps=0.02,  # Lower final exploration
    # verbose=1,  # Show training progress
    tensorboard_log=None,
)

In [None]:
# Train the agent for longer
model.learn(total_timesteps=50000, progress_bar=True)

# Evaluate the trained model (wrap env with Monitor to avoid warning)
eval_env = Monitor(gym.make("CartPole-v1"))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100, deterministic=True)
eval_env.close()

print(f"  Random Agent: {np.mean(random_rewards):.2f} Â± {np.std(random_rewards):.2f}")
print(f"  Trained DQN:  {mean_reward:.2f} Â± {std_reward:.2f}")

### Visualisatie

In [None]:
from IPython.display import HTML
from matplotlib import animation
from matplotlib import pyplot as plt


def create_animation(frames, interval=50):
    """
    Create an animation from frames.

    Args:
        frames: List of RGB arrays
        interval: Delay between frames in milliseconds

    Returns
    -------
        matplotlib animation object
    """
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.axis("off")

    # Display first frame
    img = ax.imshow(frames[0])

    def animate(frame_idx):
        img.set_array(frames[frame_idx])
        ax.set_title(f"Step {frame_idx}/{len(frames) - 1}", fontsize=14)
        return [img]

    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=interval, blit=True)

    plt.close()  # Don't show the static figure
    return anim

In [None]:
# Visualize trained agent
def visualize_agent(model, env, n_steps=1000):
    """
    Run agent in environment and collect frames for visualization.

    Args:
        model: Trained RL model
        env: Gymnasium environment
        n_steps: Maximum number of steps

    Returns
    -------
        frames, rewards, actions
    """
    frames = []
    rewards_list = []
    actions_list = []

    state, info = env.reset(seed=42)
    frames.append(env.render())

    for _ in range(n_steps):
        # Get action from trained policy (deterministic)
        action, _states = model.predict(state, deterministic=True)
        actions_list.append(int(action))

        # Take action in environment
        state, reward, done, truncated, info = env.step(action)
        rewards_list.append(reward)
        frames.append(env.render())

        if done or truncated:
            break

    return frames, rewards_list, actions_list


# Create environment with rendering
env_render = gym.make("CartPole-v1", render_mode="rgb_array")
frames, rewards_list, actions_list = visualize_agent(model, env_render, n_steps=1000)
env_render.close()

print(f"\nEpisode lasted {len(rewards_list)} steps")
print(f"Total reward: {sum(rewards_list):.0f}")
print(f"Action distribution: LEFT={actions_list.count(0)}, RIGHT={actions_list.count(1)}")

In [None]:
# Create animation of the trained CartPole agent
print("Creating animation...")
anim = create_animation(frames, interval=50)

# Display the animation
HTML(anim.to_jshtml())

### Analyse

DQN leert een **Q-function** $Q(s,a)$ die voor elke state-action combinatie voorspelt wat de **verwachte cumulatieve reward** (_return_) is.

Bij een greedy policy, kiest de agent altijd actie met de hoogste Q-waarde:
$$\pi(s) = \arg\max_a Q(s,a)$$

In [None]:
# Analyze Q-values for different states
def analyze_q_values(model, env, n_samples=100):
    """
    Sample random states and analyze Q-values.

    Args:
        model: Trained DQN model
        env: Gymnasium environment
        n_samples: Number of states to sample

    Returns
    -------
        states, q_values, actions
    """
    states = []
    q_values_left = []
    q_values_right = []
    chosen_actions = []

    for _ in range(n_samples):
        state, _ = env.reset()
        states.append(state)

        # Get Q-values for both actions
        with torch.no_grad():
            q_values = model.q_net(torch.FloatTensor(state).unsqueeze(0))
            q_values_left.append(q_values[0, 0].item())
            q_values_right.append(q_values[0, 1].item())
            chosen_actions.append(torch.argmax(q_values).item())

    return np.array(states), q_values_left, q_values_right, chosen_actions


# Analyze Q-values
states, q_left, q_right, actions = analyze_q_values(model, env, n_samples=300)

print("\n=== Q-Value Analysis ===")
print(f"Average Q-value for LEFT: {np.mean(q_left):.2f}")
print(f"Average Q-value for RIGHT: {np.mean(q_right):.2f}")
print(f"Q-value range: [{min(q_left + q_right):.2f}, {max(q_left + q_right):.2f}]")

# Heatmap: Q-values based on Pole Angle and Cart Position
# Create a grid of states to visualize Q-values
print("\nCreating Q-value heatmap...")
angle_range = np.linspace(-0.3, 0.3, 40)
position_range = np.linspace(-2.4, 2.4, 40)
q_grid_left = np.zeros((len(angle_range), len(position_range)))
q_grid_right = np.zeros((len(angle_range), len(position_range)))

for i, angle in enumerate(angle_range):
    for j, position in enumerate(position_range):
        # Create a state with this angle and position, zero velocities
        test_state = np.array([position, 0.0, angle, 0.0])
        with torch.no_grad():
            q_values = model.q_net(torch.FloatTensor(test_state).unsqueeze(0))
            q_grid_left[i, j] = q_values[0, 0].item()
            q_grid_right[i, j] = q_values[0, 1].item()

# Plot Q-value difference heatmap
q_diff_grid = q_grid_right - q_grid_left

fig = px.imshow(
    q_diff_grid,
    x=position_range,
    y=angle_range,
    color_continuous_scale="RdBu_r",
    color_continuous_midpoint=0,
    title="Learned Policy: Q(RIGHT) - Q(LEFT) for Different States",
    labels={"x": "Cart Position", "y": "Pole Angle (radians)", "color": "Q(RIGHT) - Q(LEFT)"},
    aspect="auto",
)
fig.update_layout(
    xaxis_title="Cart Position (negative = left of center, positive = right of center)",
    yaxis_title="Pole Angle (negative = leaning left, positive = leaning right)",
    height=500,
)
fig.show()

print("\nðŸ’¡ Interpretation of the Heatmap:")
print("- BLUE: Agent prefers action LEFT (push cart to the left)")
print("- RED: Agent prefers action RIGHT (push cart to the right)")
print("- The diagonal structure shows the learned strategy:")
print("  â†’ When pole leans left, push left")
print("  â†’ When pole leans right, push right")
print("- At the edges (extreme cart positions), the agent adjusts the strategy")
print("  to keep the cart within bounds")

### Training met PPO (Proximal Policy Optimization)

In [None]:
# Import PPO
from stable_baselines3 import PPO

# Create fresh environment
env_ppo = gym.make("CartPole-v1")

# Create PPO model
# PPO learns a policy Ï€(a|s) directly, not Q-values
model_ppo = PPO(
    "MlpPolicy",
    env_ppo,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    verbose=0,
)

# Train PPO
model_ppo.learn(total_timesteps=50000, progress_bar=True)

# Evaluate PPO (wrap env with Monitor to avoid warning)
eval_env_ppo = Monitor(gym.make("CartPole-v1"))
mean_reward_ppo, std_reward_ppo = evaluate_policy(
    model_ppo, eval_env_ppo, n_eval_episodes=100, deterministic=True
)
eval_env_ppo.close()

print(f"Random Agent: {np.mean(random_rewards):.2f} Â± {np.std(random_rewards):.2f}")
print(f"DQN Agent:    {mean_reward:.2f} Â± {std_reward:.2f}")
print(f"PPO Agent:    {mean_reward_ppo:.2f} Â± {std_reward_ppo:.2f}")

# Visualize comparison
df_comparison = pd.DataFrame(
    {
        "Algorithm": ["Random", "DQN", "PPO"],
        "Mean Reward": [np.mean(random_rewards), mean_reward, mean_reward_ppo],
        "Std": [np.std(random_rewards), std_reward, std_reward_ppo],
    }
)
fig = px.bar(
    df_comparison,
    x="Algorithm",
    y="Mean Reward",
    error_y="Std",
    title="Algorithm Performance Comparison on CartPole-v1",
    color="Algorithm",
    color_discrete_map={"Random": "gray", "DQN": "blue", "PPO": "green"},
    text=[
        f"{m:.1f}Â±{s:.1f}"
        for m, s in zip(df_comparison["Mean Reward"], df_comparison["Std"], strict=False)
    ],
)
fig.add_hline(y=500, line_dash="dash", line_color="red", annotation_text="Maximum possible (500)")
fig.update_layout(yaxis_range=[0, 550])
fig.show()

## LunarLander

Laten we nu een complexer probleem bekijken: **LunarLander-v2**. Hier moet een maanlander veilig landen op een landingsplatform.

[![LunarLander](https://huggingface.co/datasets/huggingface-deep-rl-course/course-images/resolve/main/en/unit1/lunarLander.gif)](https://huggingface.co/learn/deep-rl-course)

### Het Probleem
- **State**: 8 continue waarden (positie, snelheid, hoek, hoeksnelheid, been-contact)
- **Actions**: 4 discrete acties (niets, linker motor, hoofd motor, rechter motor)
- **Rewards**: 
  - +100 tot +140 voor succesvolle landing
  - -100 voor crash
  - Kleine negatieve rewards voor brandstofverbruik
  - Positieve rewards voor dichter bij landingszone
- **Doel**: Land veilig met minimaal brandstofverbruik


In [None]:
# Create LunarLander environment
env_lunar = gym.make("LunarLander-v3")

# Explore the environment
state, info = env_lunar.reset(seed=42)

print("=== LunarLander-v2 Environment ===")
print(f"State space: {env_lunar.observation_space}")
print(f"Action space: {env_lunar.action_space}")
print(f"\nInitial state shape: {state.shape}")
print(f"State: {state}")
print("\nState components:")
print("  [0] X position")
print("  [1] Y position")
print("  [2] X velocity")
print("  [3] Y velocity")
print("  [4] Angle")
print("  [5] Angular velocity")
print("  [6] Left leg contact (0=no, 1=yes)")
print("  [7] Right leg contact (0=no, 1=yes)")
print("\nActions:")
print("  0: Do nothing")
print("  1: Fire left engine")
print("  2: Fire main engine")
print("  3: Fire right engine")

# Test random agent on LunarLander
print("\n=== Testing Random Agent ===")
random_rewards_lunar = evaluate_random_agent(env_lunar, n_episodes=20, seed=42)
print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} Â± {np.std(random_rewards_lunar):.2f}")
print("(Note: Negative rewards mean crashes!)")


### Training met PPO op LunarLander

In [None]:
# Train PPO on LunarLander
model_lunar = PPO(
    "MlpPolicy",
    env_lunar,
    learning_rate=3e-4,
    n_steps=1024,
    batch_size=64,
    n_epochs=4,
    gamma=0.999,
    gae_lambda=0.98,
    clip_range=0.2,
    verbose=0,
)

# Train for longer since this is more complex
model_lunar.learn(total_timesteps=500000, progress_bar=True)

# Evaluate (wrap env with Monitor to avoid warning)
eval_env_lunar = Monitor(gym.make("LunarLander-v3"))
mean_reward_lunar, std_reward_lunar = evaluate_policy(
    model_lunar, eval_env_lunar, n_eval_episodes=50, deterministic=True
)
eval_env_lunar.close()

print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} Â± {np.std(random_rewards_lunar):.2f}")
print(f"Trained PPO:  {mean_reward_lunar:.2f} Â± {std_reward_lunar:.2f}")
print("\nNote: Score > 200 is considered solved!")
status = "SOLVED âœ“" if mean_reward_lunar > 200 else "Needs more training"
print(f"Status: {status}")

# Visualize performance
df_lunar = pd.DataFrame(
    {
        "Algorithm": ["Random", "PPO"],
        "Mean Reward": [np.mean(random_rewards_lunar), mean_reward_lunar],
        "Std": [np.std(random_rewards_lunar), std_reward_lunar],
    }
)
fig = px.bar(
    df_lunar,
    x="Algorithm",
    y="Mean Reward",
    error_y="Std",
    title="LunarLander-v2 Performance",
    color="Algorithm",
    color_discrete_map={"Random": "gray", "PPO": "green"},
)
fig.add_hline(y=200, line_dash="dash", line_color="red", annotation_text="Solved threshold (200)")
fig.add_hline(y=0, line_color="black")
fig.show()

### Visualisatie

In [None]:
# Visualize trained LunarLander agent
env_lunar_render = gym.make("LunarLander-v3", render_mode="rgb_array")
frames_lunar, rewards_lunar, actions_lunar = visualize_agent(
    model_lunar, env_lunar_render, n_steps=500
)
env_lunar_render.close()

print("\n=== Episode Analysis ===")
print(f"Episode length: {len(rewards_lunar)} steps")
print(f"Total reward: {sum(rewards_lunar):.1f}")
print(
    f"Final outcome: {'SUCCESS âœ“' if sum(rewards_lunar) > 200 else 'CRASH' if sum(rewards_lunar) < 0 else 'PARTIAL'}"
)
print("\nAction usage:")
action_names = ["Do nothing", "Left engine", "Main engine", "Right engine"]
for action_id, action_name in enumerate(action_names):
    count = actions_lunar.count(action_id)
    percentage = (count / len(actions_lunar)) * 100
    print(f"  {action_name}: {count} times ({percentage:.1f}%)")

# Show action distribution
df_actions = pd.DataFrame(
    {"Action": action_names, "Count": [actions_lunar.count(i) for i in range(4)]}
)
px.bar(
    df_actions,
    x="Action",
    y="Count",
    title=f"LunarLander Action Distribution (Total Reward: {sum(rewards_lunar):.1f})",
).show()


In [None]:
# Create animation of the trained LunarLander agent
print("Creating LunarLander animation...")
anim_lunar = create_animation(frames_lunar, interval=50)

# Display the animation
HTML(anim_lunar.to_jshtml())