# Elevator Dispatch with Multi-Agent RL

**From [rlbook.ai](https://rlbook.ai/applications/elevator-dispatch)**

Train a multi-agent DQN to control elevators in a building. Learn about:
- Multi-agent coordination without communication
- Reward shaping for complex objectives
- Independent Q-learning with shared replay
- Baseline policy comparisons

This notebook lets you train and evaluate elevator dispatch policies interactively!

## Setup

Install the rlbook package and dependencies.

In [None]:
# Install rlbook package from GitHub
!pip install -q git+https://github.com/ebilgin/rlbook.git#subdirectory=code

import numpy as np
import matplotlib.pyplot as plt
import torch

from rlbook.envs import ElevatorDispatch
from rlbook.agents import ElevatorDQN
from rlbook.agents.elevator_dqn import random_policy, nearest_car_policy, scan_policy

print(f"NumPy: {np.__version__}")
print(f"PyTorch: {torch.__version__}")
print(f"Device: {'cuda' if torch.cuda.is_available() else 'cpu'}")

---
## Part 1: Explore the Environment

Let's create an elevator dispatch environment and see how it works.

In [None]:
# Create environment
env = ElevatorDispatch(
    n_floors=10,
    n_elevators=3,
    traffic_pattern="morning_rush",
    max_timesteps=50,  # Short episode for demo
    render_mode="ansi"
)

print("Environment created!")
print(f"Floors: {env.n_floors}")
print(f"Elevators: {env.n_elevators}")
print(f"Traffic: {env.traffic_pattern.name}")
print(f"\nObservation space (per elevator): {env.observation_space}")
print(f"Action space (all elevators): {env.action_space}")

### Run a Random Episode

See what happens with random elevator actions.

In [None]:
obs, info = env.reset(seed=42)

# Run 10 steps with random actions
for step in range(10):
    # Random actions for all elevators
    actions = [np.random.randint(0, env.n_floors) for _ in range(env.n_elevators)]
    
    obs, reward, done, truncated, info = env.step(actions)
    
    # Print every 3 steps
    if step % 3 == 0:
        print(f"\n=== Step {step} ===")
        print(f"Actions: {actions}")
        print(f"Reward: {reward:.1f}")
        print(f"Waiting: {info['waiting_passengers']}, Served: {info['delivered_passengers']}")
        if step == 9:
            print(f"\nFinal avg wait time: {info['avg_wait_time']:.1f}s")

### Visualize the Building

The environment has an ASCII renderer. Let's see it!

In [None]:
env.reset(seed=42)

# Run a few steps and render
for _ in range(5):
    actions = [np.random.randint(0, env.n_floors) for _ in range(env.n_elevators)]
    env.step(actions)

# Show current state
print(env.render())

---
## Part 2: Baseline Policies

Before training an RL agent, let's see how simple rule-based policies perform.

In [None]:
def evaluate_policy(env, policy_fn, n_episodes=10, policy_name="Policy"):
    """Evaluate a policy over multiple episodes."""
    wait_times = []
    passengers_served = []
    
    for episode in range(n_episodes):
        obs, info = env.reset()
        
        for step in range(env.max_timesteps):
            actions = policy_fn(env)
            obs, reward, done, truncated, info = env.step(actions)
            
            if done or truncated:
                break
        
        wait_times.append(info['avg_wait_time'])
        passengers_served.append(info['delivered_passengers'])
    
    return {
        'avg_wait': np.mean(wait_times),
        'std_wait': np.std(wait_times),
        'avg_served': np.mean(passengers_served),
    }

# Create longer environment for evaluation
eval_env = ElevatorDispatch(
    n_floors=10,
    n_elevators=3,
    traffic_pattern="morning_rush",
    max_timesteps=300
)

print("Evaluating baseline policies...")
print("(This takes ~30 seconds)\n")

baselines = {
    "Random": random_policy,
    "Nearest Car": nearest_car_policy,
    "SCAN": scan_policy,
}

results = {}
for name, policy in baselines.items():
    results[name] = evaluate_policy(eval_env, policy, n_episodes=10, policy_name=name)
    print(f"{name:15s} - Wait: {results[name]['avg_wait']:5.1f}s (±{results[name]['std_wait']:.1f}), "
          f"Served: {results[name]['avg_served']:.0f}")

**Observations:**
- Random is terrible (no surprise!)
- Nearest Car is decent - assigns closest elevator
- SCAN is best baseline - continues in direction like a hard drive

Can RL do better? Let's find out!

---
## Part 3: Train a DQN Agent

Now we'll train a multi-agent DQN. Each elevator learns its own Q-network, but they share experiences.

**Training parameters:**
- 200 episodes (increase for better results, but slower)
- ε starts at 1.0, decays to 0.01
- Shared replay buffer (50k transitions)
- [128, 128] hidden layers

**Expected training time:**
- ~5 minutes on Colab GPU
- ~15 minutes on Colab CPU

In [None]:
# Create training environment
train_env = ElevatorDispatch(
    n_floors=10,
    n_elevators=3,
    traffic_pattern="morning_rush",
    max_timesteps=300
)

# Get observation dimension
obs, _ = train_env.reset()
obs_dim = obs["elevator_0"].shape[0]

# Create agent
agent = ElevatorDQN(
    n_floors=10,
    n_elevators=3,
    observation_dim=obs_dim,
    hidden_dims=(128, 128),
    learning_rate=1e-3,
    gamma=0.99,
    epsilon=1.0,
    epsilon_decay=0.99,  # Faster decay for shorter training
    epsilon_min=0.01,
    buffer_size=50000,
    batch_size=64
)

print(f"Agent created!")
print(f"Observation dim: {obs_dim}")
print(f"Q-networks: {agent.n_elevators} (one per elevator)")
print(f"Parameters per network: {sum(p.numel() for p in agent.q_networks[0].parameters()):,}")

### Training Loop

Run the training! This cell will take several minutes.

In [None]:
n_episodes = 200  # Increase to 500-1000 for better results

episode_rewards = []
avg_wait_times = []
epsilon_history = []

print(f"Training for {n_episodes} episodes...\n")

for episode in range(n_episodes):
    obs, info = train_env.reset()
    episode_reward = 0
    
    for step in range(train_env.max_timesteps):
        # Select actions
        actions = agent.select_actions(obs, training=True)
        
        # Take step
        next_obs, reward, done, truncated, info = train_env.step(actions)
        
        # Store transition
        agent.store_transitions(obs, actions, reward, next_obs, done or truncated)
        
        # Train
        loss = agent.train_step()
        
        episode_reward += reward
        obs = next_obs
        
        if done or truncated:
            break
    
    # Decay epsilon
    agent.decay_epsilon()
    
    # Record metrics
    episode_rewards.append(episode_reward)
    avg_wait_times.append(info['avg_wait_time'])
    epsilon_history.append(agent.epsilon)
    
    # Print progress
    if (episode + 1) % 20 == 0:
        recent_reward = np.mean(episode_rewards[-20:])
        recent_wait = np.mean(avg_wait_times[-20:])
        print(f"Episode {episode + 1:3d}/{n_episodes} | "
              f"Reward: {recent_reward:6.1f} | "
              f"Wait: {recent_wait:5.1f}s | "
              f"ε: {agent.epsilon:.3f} | "
              f"Buffer: {len(agent.replay_buffer)}")

print("\nTraining complete!")

### Visualize Training Progress

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Smooth data for plotting
def smooth(data, window=20):
    return np.convolve(data, np.ones(window)/window, mode='valid')

# Episode rewards
axes[0].plot(smooth(episode_rewards), color='#06b6d4', linewidth=2)
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Episode Reward')
axes[0].set_title('Training Reward (20-episode avg)')
axes[0].grid(alpha=0.3)

# Wait times
axes[1].plot(smooth(avg_wait_times), color='#f97316', linewidth=2)
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('Avg Wait Time (s)')
axes[1].set_title('Average Wait Time (20-episode avg)')
axes[1].grid(alpha=0.3)

# Epsilon decay
axes[2].plot(epsilon_history, color='#8b5cf6', linewidth=2)
axes[2].set_xlabel('Episode')
axes[2].set_ylabel('Epsilon')
axes[2].set_title('Exploration Rate')
axes[2].grid(alpha=0.3)

plt.tight_layout()
plt.show()

print(f"Final performance (last 20 episodes):")
print(f"  Average reward: {np.mean(episode_rewards[-20:]):.1f}")
print(f"  Average wait time: {np.mean(avg_wait_times[-20:]):.1f}s")

---
## Part 4: Evaluate the Trained Agent

How does our trained DQN compare to the baselines?

In [None]:
def evaluate_dqn(env, agent, n_episodes=10):
    """Evaluate trained DQN agent."""
    wait_times = []
    passengers_served = []
    
    for episode in range(n_episodes):
        obs, info = env.reset()
        
        for step in range(env.max_timesteps):
            actions = agent.select_actions(obs, training=False)  # No exploration
            obs, reward, done, truncated, info = env.step(actions)
            
            if done or truncated:
                break
        
        wait_times.append(info['avg_wait_time'])
        passengers_served.append(info['delivered_passengers'])
    
    return {
        'avg_wait': np.mean(wait_times),
        'std_wait': np.std(wait_times),
        'avg_served': np.mean(passengers_served),
    }

# Evaluate
dqn_results = evaluate_dqn(eval_env, agent, n_episodes=10)

# Compare all policies
print("\n" + "="*60)
print("FINAL COMPARISON")
print("="*60)
print(f"{'Algorithm':<20} {'Avg Wait (s)':<15} {'Passengers Served':<20}")
print("-"*60)

for name, res in results.items():
    print(f"{name:<20} {res['avg_wait']:<15.2f} {res['avg_served']:<20.1f}")

print(f"{'DQN (Trained)':<20} {dqn_results['avg_wait']:<15.2f} {dqn_results['avg_served']:<20.1f}")
print("="*60)

# Calculate improvement
best_baseline = min(results.items(), key=lambda x: x[1]['avg_wait'])
improvement = (best_baseline[1]['avg_wait'] - dqn_results['avg_wait']) / best_baseline[1]['avg_wait'] * 100

print(f"\nDQN improves over best baseline ({best_baseline[0]}) by {improvement:.1f}%!")

### Visualize the Comparison

In [None]:
# Prepare data for plotting
algorithms = list(results.keys()) + ['DQN (Trained)']
wait_times_plot = [results[name]['avg_wait'] for name in results.keys()] + [dqn_results['avg_wait']]
colors = ['#ef4444', '#f97316', '#eab308', '#06b6d4']

# Create bar chart
fig, ax = plt.subplots(figsize=(10, 6))
bars = ax.bar(algorithms, wait_times_plot, color=colors, alpha=0.8, edgecolor='black')

# Highlight the best
bars[-1].set_color('#10b981')
bars[-1].set_linewidth(3)

ax.set_ylabel('Average Wait Time (seconds)', fontsize=12)
ax.set_title('Elevator Dispatch Performance Comparison', fontsize=14, fontweight='bold')
ax.grid(axis='y', alpha=0.3)

# Add value labels on bars
for bar in bars:
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height,
            f'{height:.1f}s',
            ha='center', va='bottom', fontweight='bold')

plt.tight_layout()
plt.show()

---
## Part 5: Analyze Learned Behavior

What did the agent learn? Let's run an episode and watch what happens.

In [None]:
# Create environment with rendering
demo_env = ElevatorDispatch(
    n_floors=10,
    n_elevators=3,
    traffic_pattern="morning_rush",
    max_timesteps=50,  # Short for demo
    render_mode="ansi"
)

obs, info = demo_env.reset(seed=42)

print("Running trained agent...\n")
print("Watch how the elevators coordinate!\n")

for step in range(20):
    actions = agent.select_actions(obs, training=False)
    obs, reward, done, truncated, info = demo_env.step(actions)
    
    # Print every 5 steps
    if step % 5 == 0:
        print(f"\n{'='*50}")
        print(f"Step {step}")
        print(f"Actions: {actions} (target floors)")
        print(demo_env.render())
        print(f"Reward: {reward:.1f}")
    
    if done or truncated:
        break

print(f"\nFinal metrics:")
print(f"  Wait time: {info['avg_wait_time']:.1f}s")
print(f"  Passengers served: {info['delivered_passengers']}")
print(f"  Utilization: {info['elevator_utilization']*100:.1f}%")

**What to look for:**
- Do elevators naturally partition the floors? (e.g., one handles low, one mid, one high)
- Do they position themselves anticipating future requests?
- Do they avoid clustering together?

This coordination **emerged** from learning - we never programmed it explicitly!

---
## Part 6: Extensions & Experiments

Try these on your own:

### Exercise 1: Different Traffic Patterns

How does the agent perform on evening_rush (upper floors → lobby)?

In [None]:
# TODO: Create environment with traffic_pattern="evening_rush"
# TODO: Evaluate the agent trained on morning_rush
# Does it generalize? Or does performance drop?

# Your code here

### Exercise 2: Reward Shaping

Try modifying the reward function. What happens if you:
- Increase the starvation penalty (line 377 in elevator.py)
- Increase the delivery bonus
- Add a "fairness" term (penalize variance in wait times)

In [None]:
# TODO: Modify the environment's _calculate_reward method
# TODO: Retrain and compare performance

# Your code here

### Exercise 3: Larger Buildings

What happens with 20 floors and 5 elevators?

In [None]:
# TODO: Create environment with n_floors=20, n_elevators=5
# TODO: Train an agent (may take longer)
# TODO: Does coordination become harder?

# Your code here

### Exercise 4: Network Architecture

How does performance change with different hidden layer sizes?

In [None]:
# TODO: Try hidden_dims=(64, 64) vs (256, 256)
# TODO: Compare training speed and final performance

# Your code here

---
## Summary

In this notebook, you:

1. ✅ **Explored** a multi-elevator environment with realistic traffic patterns
2. ✅ **Compared** baseline policies (Random, Nearest Car, SCAN)
3. ✅ **Trained** a multi-agent DQN with independent Q-learning
4. ✅ **Evaluated** the learned policy and achieved better performance than baselines
5. ✅ **Analyzed** emergent coordination behaviors

**Key Takeaways:**
- Multi-agent RL can learn implicit coordination without communication
- Reward shaping (wait time + delivery bonus + starvation prevention) is critical
- Independent Q-learning with shared replay is a simple but effective approach
- RL outperforms rule-based algorithms when the environment is complex

**Next Steps:**
- Read the full application guide: [rlbook.ai/applications/elevator-dispatch](https://rlbook.ai/applications/elevator-dispatch)
- Explore advanced multi-agent methods (QMIX, MADDPG)
- Try deploying to a real building (sim-to-real challenges!)
- Experiment with the extensions above

**Questions or feedback?** Join the discussion at [discord.gg/mJ7n3zNf7r](https://discord.gg/mJ7n3zNf7r)

---
## Save Your Trained Model (Optional)

Save the trained agent to continue later or share with others.

In [None]:
# Save the model
agent.save("elevator_dqn_trained.pt")
print("Model saved to elevator_dqn_trained.pt")

# To load later:
# new_agent = ElevatorDQN(...same parameters...)
# new_agent.load("elevator_dqn_trained.pt")