# Checkpoint 9: Reward Engineering for Mario Kart

**The reward function is the heart of reinforcement learning.** It defines what behavior we want our agent to learn. In this notebook, we'll explore the art and science of reward engineering, building progressively more sophisticated reward functions for racing games.

## Learning Objectives
- Understand why reward design is critical for RL success
- Learn the difference between sparse and dense rewards
- Implement modular, composable reward functions
- Build and compare multiple reward function versions
- Recognize and avoid common reward hacking pitfalls

In [None]:
# Install required packages
!pip install numpy matplotlib

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple

print("Imports successful!")

## Why Reward Design Matters

In reinforcement learning, **the reward function is the only way we communicate our goals to the agent**. The agent will learn to maximize whatever reward signal we provide - nothing more, nothing less.

### Key Principles

1. **The agent learns what you reward, not what you want**
   - If your reward doesn't capture your true objective, the agent will find unintended shortcuts
   
2. **Reward shaping can dramatically accelerate learning**
   - Good intermediate rewards guide the agent toward the goal
   
3. **Bad rewards lead to bad behavior**
   - "Reward hacking" occurs when agents exploit loopholes in the reward function

### Racing Game Example

Consider a simple reward: `+1` for winning the race, `0` otherwise.

**Problems:**
- The agent might never experience the `+1` reward during exploration
- No feedback about whether it's improving
- Learning from such sparse signals is extremely difficult

## Sparse vs Dense Rewards

### Sparse Rewards
Reward is only given at significant events (e.g., winning, scoring, completing a level).

**Pros:**
- Simple to define
- Directly tied to the true objective
- Less risk of reward hacking

**Cons:**
- Hard to learn from (credit assignment problem)
- Agent may never discover the reward during exploration
- Slow convergence

### Dense Rewards
Continuous feedback at every timestep based on intermediate progress.

**Pros:**
- Faster learning
- Clear signal for improvement
- Easier credit assignment

**Cons:**
- More complex to design correctly
- Risk of reward hacking
- May not align with true objective

### Comparison Table

| Aspect | Sparse Reward | Dense Reward |
|--------|---------------|---------------|
| **Example** | +100 for completing lap | +1 per checkpoint, +0.1 per speed unit |
| **Learning Speed** | Slow | Fast |
| **Design Complexity** | Low | High |
| **Hacking Risk** | Low | High |
| **Credit Assignment** | Difficult | Easy |

## Reward Shaping and Potential-Based Shaping

### What is Reward Shaping?
Adding intermediate rewards to guide the agent toward the goal without changing the optimal policy.

### Potential-Based Reward Shaping (PBRS)

Ng et al. (1999) proved that rewards of the form:

$$F(s, s') = \gamma \Phi(s') - \Phi(s)$$

where $\Phi(s)$ is a potential function, **preserve the optimal policy**.

### Example: Distance-Based Potential

For racing, we could define:
$$\Phi(s) = -\text{distance\_to\_finish}(s)$$

This gives positive reward for getting closer to the finish line.

### Benefits of PBRS
1. **Theoretical guarantee**: Optimal policy unchanged
2. **Faster learning**: Dense feedback signal
3. **Flexible**: Can incorporate domain knowledge

### Practical Considerations
- Real-world environments may not have perfect state information
- Approximations can still cause subtle policy changes
- Magnitude matters: too strong shaping can dominate the true reward

## Common Racing Reward Components

| Component | Description | Typical Weight | Implementation Notes |
|-----------|-------------|----------------|----------------------|
| **Speed** | Current velocity normalized by max speed | 0.1 - 0.5 | Encourages fast driving; normalize to [0, 1] |
| **Progress** | Change in track checkpoint/position | 0.5 - 1.0 | Main objective; handle lap wraparound |
| **Position** | Race position (1st through 8th) | 0.1 - 0.3 | For competitive behavior in multi-agent |
| **Collision** | Penalty for hitting walls/karts | -0.1 to -0.5 | Detect via sudden speed drops or collision flag |
| **Lap Completion** | Bonus for completing a lap | 5.0 - 100.0 | Sparse milestone reward |
| **Time Penalty** | Small negative per timestep | -0.01 to -0.001 | Encourages faster completion |
| **Drift Bonus** | Reward for successful drifts | 0.1 - 0.5 | Game-specific mechanic |
| **Item Usage** | Reward/penalty for item use | Variable | Context-dependent |

## Reward Hacking Pitfalls

Reward hacking occurs when agents find unintended ways to maximize reward without achieving the true objective.

### Common Racing Game Exploits

1. **Spinning in Place**
   - **Cause**: Speed reward without direction check
   - **Solution**: Reward forward progress, not just speed

2. **Driving Backwards**
   - **Cause**: Progress reward that can be negative
   - **Solution**: Penalize backward movement explicitly

3. **Exploiting Shortcuts**
   - **Cause**: Only rewarding checkpoints, not actual path
   - **Solution**: May be acceptable (creative!) or add path constraints

4. **Wall Grinding**
   - **Cause**: No collision penalty or too weak
   - **Solution**: Add meaningful collision penalties

5. **Camping at Bonus Locations**
   - **Cause**: High bonus item rewards without completion incentive
   - **Solution**: Time penalties, progress requirements

6. **Intentional Crashing**
   - **Cause**: Reset puts agent in advantageous position
   - **Solution**: Penalize deaths/resets heavily

### Detection Strategies
- Visualize learned policies regularly
- Track auxiliary metrics (actual lap times, collisions)
- Compare to human performance patterns

In [None]:
# RewardFunction Abstract Base Class

class RewardFunction(ABC):
    """
    Abstract base class for reward functions.
    
    All reward functions must implement:
    - compute(state): Calculate reward for current state
    - reset(): Reset internal state for new episode
    """
    
    @abstractmethod
    def compute(self, state: Dict) -> float:
        """
        Compute reward given current state.
        
        Args:
            state: Dictionary containing game state information
                   Expected keys may include: 'speed', 'checkpoint', 'lap',
                   'position', 'collision', etc.
        
        Returns:
            float: Reward value for this timestep
        """
        pass
    
    @abstractmethod
    def reset(self) -> None:
        """
        Reset internal state for a new episode.
        
        Called at the start of each episode to clear any
        accumulated state (e.g., previous checkpoint).
        """
        pass

print("RewardFunction base class defined.")

In [None]:
# SpeedReward Class

class SpeedReward(RewardFunction):
    """
    Reward based on normalized speed.
    
    Encourages the agent to drive fast by giving reward
    proportional to current speed normalized by maximum speed.
    """
    
    def __init__(self, max_speed: float = 100.0, weight: float = 1.0):
        """
        Args:
            max_speed: Maximum possible speed for normalization
            weight: Multiplier for the reward
        """
        self.max_speed = max_speed
        self.weight = weight
    
    def compute(self, state: Dict) -> float:
        """
        Compute speed-based reward.
        
        Returns value in range [0, weight] based on current speed.
        """
        speed = state.get('speed', 0)
        # Normalize speed to [0, 1] and apply weight
        normalized = min(speed / self.max_speed, 1.0)
        return self.weight * normalized
    
    def reset(self) -> None:
        """No internal state to reset."""
        pass

# Test SpeedReward
speed_reward = SpeedReward(max_speed=100.0, weight=0.5)
test_states = [
    {'speed': 0},
    {'speed': 50},
    {'speed': 100},
    {'speed': 120},  # Over max
]

print("SpeedReward Test:")
for state in test_states:
    reward = speed_reward.compute(state)
    print(f"  Speed {state['speed']:3d} -> Reward {reward:.3f}")

In [None]:
# ProgressReward Class

class ProgressReward(RewardFunction):
    """
    Reward based on checkpoint progress with lap wrap handling.
    
    Tracks progress through checkpoints and rewards forward movement.
    Handles the wrap-around when completing a lap (checkpoint N -> 0).
    """
    
    def __init__(self, total_checkpoints: int = 100, weight: float = 1.0):
        """
        Args:
            total_checkpoints: Number of checkpoints per lap
            weight: Multiplier for the reward
        """
        self.total_checkpoints = total_checkpoints
        self.weight = weight
        self.last_checkpoint = 0
    
    def compute(self, state: Dict) -> float:
        """
        Compute progress-based reward.
        
        Returns reward proportional to checkpoints passed since last call.
        Handles lap wrap-around correctly.
        """
        current_cp = state.get('checkpoint', 0)
        delta = current_cp - self.last_checkpoint
        
        # Handle lap wrap (e.g., going from checkpoint 99 to 0)
        if delta < -self.total_checkpoints // 2:
            delta += self.total_checkpoints
        # Handle backward movement (going from 0 to 99 backwards)
        elif delta > self.total_checkpoints // 2:
            delta -= self.total_checkpoints
        
        self.last_checkpoint = current_cp
        return self.weight * delta
    
    def reset(self) -> None:
        """Reset checkpoint tracking for new episode."""
        self.last_checkpoint = 0

# Test ProgressReward
progress_reward = ProgressReward(total_checkpoints=100, weight=1.0)

test_sequence = [
    {'checkpoint': 0},   # Start
    {'checkpoint': 5},   # Normal forward
    {'checkpoint': 10},  # More forward
    {'checkpoint': 98},  # Near lap end
    {'checkpoint': 2},   # Lap wrap!
    {'checkpoint': 1},   # Slight backward
]

print("ProgressReward Test (wrap at 100):")
progress_reward.reset()
for i, state in enumerate(test_sequence):
    reward = progress_reward.compute(state)
    print(f"  Step {i}: Checkpoint {state['checkpoint']:2d} -> Reward {reward:+.1f}")

In [None]:
# CompositeReward Class

class CompositeReward(RewardFunction):
    """
    Combine multiple reward functions with weights.
    
    Allows building complex reward functions from simpler components.
    Each component has its own weight for fine-tuning.
    """
    
    def __init__(self, reward_fns: List[Tuple[RewardFunction, float]]):
        """
        Args:
            reward_fns: List of (RewardFunction, weight) tuples
        """
        self.reward_fns = reward_fns
    
    def compute(self, state: Dict) -> float:
        """
        Compute weighted sum of all component rewards.
        """
        total = 0.0
        for fn, weight in self.reward_fns:
            total += weight * fn.compute(state)
        return total
    
    def reset(self) -> None:
        """Reset all component reward functions."""
        for fn, _ in self.reward_fns:
            fn.reset()
    
    def compute_breakdown(self, state: Dict) -> Dict[str, float]:
        """
        Get individual rewards from each component (for debugging).
        """
        breakdown = {}
        for fn, weight in self.reward_fns:
            name = fn.__class__.__name__
            breakdown[name] = weight * fn.compute(state)
        return breakdown

# Test CompositeReward
composite = CompositeReward([
    (SpeedReward(max_speed=100), 0.3),
    (ProgressReward(total_checkpoints=100), 1.0),
])

composite.reset()
test_state = {'speed': 80, 'checkpoint': 5}
reward = composite.compute(test_state)
print(f"Composite Reward Test:")
print(f"  State: {test_state}")
print(f"  Total Reward: {reward:.3f}")

In [None]:
# MarioKartRewardV1: Speed + Progress

class MarioKartRewardV1(RewardFunction):
    """
    Version 1: Basic speed and progress rewards.
    
    Combines:
    - Speed reward (weight 0.3): Encourages fast driving
    - Progress reward (weight 1.0): Main objective - move forward
    """
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100):
        self.max_speed = max_speed
        self.total_checkpoints = total_checkpoints
        
        self.speed_reward = SpeedReward(max_speed, weight=0.3)
        self.progress_reward = ProgressReward(total_checkpoints, weight=1.0)
    
    def compute(self, state: Dict) -> float:
        speed_r = self.speed_reward.compute(state)
        progress_r = self.progress_reward.compute(state)
        return speed_r + progress_r
    
    def reset(self) -> None:
        self.speed_reward.reset()
        self.progress_reward.reset()

print("MarioKartRewardV1 defined: Speed + Progress")

In [None]:
# MarioKartRewardV2: V1 + Collision Penalty

class MarioKartRewardV2(MarioKartRewardV1):
    """
    Version 2: V1 + Collision penalty.
    
    Adds collision detection via sudden speed drops.
    When speed drops dramatically, we assume a collision occurred.
    """
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100,
                 collision_threshold: float = 20.0, collision_penalty: float = -0.5):
        """
        Args:
            collision_threshold: Speed drop that indicates collision
            collision_penalty: Penalty applied on collision
        """
        super().__init__(max_speed, total_checkpoints)
        self.collision_threshold = collision_threshold
        self.collision_penalty = collision_penalty
        self.last_speed = 0
    
    def compute(self, state: Dict) -> float:
        # Get base reward from V1
        base_reward = super().compute(state)
        
        # Detect collision via sudden speed drop
        current_speed = state.get('speed', 0)
        speed_drop = self.last_speed - current_speed
        
        collision_r = 0.0
        if speed_drop > self.collision_threshold:
            collision_r = self.collision_penalty
        
        self.last_speed = current_speed
        return base_reward + collision_r
    
    def reset(self) -> None:
        super().reset()
        self.last_speed = 0

print("MarioKartRewardV2 defined: V1 + Collision Penalty")

In [None]:
# MarioKartRewardV3: V2 + Lap Bonus + Time Penalty

class MarioKartRewardV3(MarioKartRewardV2):
    """
    Version 3: V2 + Lap bonus + Time penalty.
    
    Adds:
    - Lap completion bonus: Big reward for finishing laps
    - Time penalty: Small constant penalty to encourage speed
    """
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100,
                 collision_threshold: float = 20.0, collision_penalty: float = -0.5,
                 lap_bonus: float = 10.0, time_penalty: float = -0.01):
        """
        Args:
            lap_bonus: Reward for completing a lap
            time_penalty: Penalty per timestep (encourages faster completion)
        """
        super().__init__(max_speed, total_checkpoints, 
                        collision_threshold, collision_penalty)
        self.lap_bonus = lap_bonus
        self.time_penalty = time_penalty
        self.last_lap = 0
    
    def compute(self, state: Dict) -> float:
        # Get base reward from V2
        base_reward = super().compute(state)
        
        # Lap completion bonus
        current_lap = state.get('lap', 0)
        lap_r = 0.0
        if current_lap > self.last_lap:
            lap_r = self.lap_bonus
        self.last_lap = current_lap
        
        # Time penalty (constant per step)
        time_r = self.time_penalty
        
        return base_reward + lap_r + time_r
    
    def reset(self) -> None:
        super().reset()
        self.last_lap = 0

print("MarioKartRewardV3 defined: V2 + Lap Bonus + Time Penalty")

In [None]:
# Test with Simulated State Sequence

def generate_simulated_episode(n_steps: int = 50, seed: int = 42) -> List[Dict]:
    """
    Generate a realistic simulated racing episode.
    
    Simulates:
    - Acceleration from start
    - Cruising at high speed
    - Collision event (speed drops)
    - Recovery
    - Lap completion
    """
    np.random.seed(seed)
    states = []
    
    speed = 0
    checkpoint = 0
    lap = 0
    
    for i in range(n_steps):
        # Acceleration phase (first 10 steps)
        if i < 10:
            speed = min(speed + 10, 80)
        # Collision at step 25
        elif i == 25:
            speed = 20  # Sudden drop
        # Recovery after collision
        elif 25 < i < 35:
            speed = min(speed + 8, 80)
        # Normal cruising with small variations
        else:
            speed = max(60, min(90, speed + np.random.randint(-5, 6)))
        
        # Progress through checkpoints
        if speed > 30:
            checkpoint = (checkpoint + 2) % 100
        else:
            checkpoint = (checkpoint + 1) % 100
        
        # Lap completion (when checkpoint wraps)
        if checkpoint < 5 and i > 0 and states[-1]['checkpoint'] > 95:
            lap += 1
        
        states.append({
            'speed': speed,
            'checkpoint': checkpoint,
            'lap': lap
        })
    
    return states

# Generate test episode
test_episode = generate_simulated_episode(n_steps=50)

# Evaluate all three reward versions
reward_v1 = MarioKartRewardV1()
reward_v2 = MarioKartRewardV2()
reward_v3 = MarioKartRewardV3()

reward_v1.reset()
reward_v2.reset()
reward_v3.reset()

rewards_v1 = []
rewards_v2 = []
rewards_v3 = []

for state in test_episode:
    rewards_v1.append(reward_v1.compute(state))
    rewards_v2.append(reward_v2.compute(state))
    rewards_v3.append(reward_v3.compute(state))

print(f"Episode Summary (50 steps):")
print(f"  V1 Total Reward: {sum(rewards_v1):.2f}")
print(f"  V2 Total Reward: {sum(rewards_v2):.2f}")
print(f"  V3 Total Reward: {sum(rewards_v3):.2f}")

In [None]:
# Visualization: Comparison of V1/V2/V3

fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: State information
ax1 = axes[0, 0]
steps = range(len(test_episode))
speeds = [s['speed'] for s in test_episode]
checkpoints = [s['checkpoint'] for s in test_episode]

ax1.plot(steps, speeds, 'b-', label='Speed', linewidth=2)
ax1.axvline(x=25, color='r', linestyle='--', alpha=0.5, label='Collision Event')
ax1.set_xlabel('Step')
ax1.set_ylabel('Speed')
ax1.set_title('Simulated Episode: Speed Profile')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Plot 2: Per-step rewards
ax2 = axes[0, 1]
ax2.plot(steps, rewards_v1, 'g-', label='V1: Speed+Progress', alpha=0.8)
ax2.plot(steps, rewards_v2, 'b-', label='V2: +Collision', alpha=0.8)
ax2.plot(steps, rewards_v3, 'r-', label='V3: +Lap+Time', alpha=0.8)
ax2.axvline(x=25, color='gray', linestyle='--', alpha=0.5)
ax2.set_xlabel('Step')
ax2.set_ylabel('Reward')
ax2.set_title('Per-Step Rewards by Version')
ax2.legend()
ax2.grid(True, alpha=0.3)

# Plot 3: Cumulative rewards
ax3 = axes[1, 0]
ax3.plot(steps, np.cumsum(rewards_v1), 'g-', label='V1', linewidth=2)
ax3.plot(steps, np.cumsum(rewards_v2), 'b-', label='V2', linewidth=2)
ax3.plot(steps, np.cumsum(rewards_v3), 'r-', label='V3', linewidth=2)
ax3.set_xlabel('Step')
ax3.set_ylabel('Cumulative Reward')
ax3.set_title('Cumulative Rewards Over Episode')
ax3.legend()
ax3.grid(True, alpha=0.3)

# Plot 4: Total reward comparison
ax4 = axes[1, 1]
versions = ['V1\n(Speed+Progress)', 'V2\n(+Collision)', 'V3\n(+Lap+Time)']
totals = [sum(rewards_v1), sum(rewards_v2), sum(rewards_v3)]
colors = ['green', 'blue', 'red']
bars = ax4.bar(versions, totals, color=colors, alpha=0.7, edgecolor='black')
ax4.set_ylabel('Total Reward')
ax4.set_title('Total Episode Reward Comparison')
for bar, total in zip(bars, totals):
    ax4.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5, 
             f'{total:.1f}', ha='center', va='bottom', fontsize=12)
ax4.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

print("\nKey Observations:")
print("- V2 shows a penalty spike at step 25 (collision detected)")
print("- V3 has slightly lower rewards due to constant time penalty")
print("- All versions reward forward progress similarly")

In [None]:
# Extended Simulation: 200 Steps with Multiple Events

def generate_extended_episode(n_steps: int = 200, seed: int = 123) -> List[Dict]:
    """
    Generate a longer episode with multiple events:
    - Multiple collisions
    - Multiple lap completions
    - Speed variations
    """
    np.random.seed(seed)
    states = []
    
    speed = 0
    checkpoint = 0
    lap = 0
    collision_steps = [30, 80, 150]  # Collision events
    
    for i in range(n_steps):
        # Handle collisions
        if i in collision_steps:
            speed = max(10, speed - 50)  # Big speed drop
        # Acceleration
        elif speed < 80:
            speed = min(speed + 5, 80)
        # Normal variation
        else:
            speed = max(60, min(90, speed + np.random.randint(-3, 4)))
        
        # Progress (faster when speed is higher)
        progress_rate = 1 + int(speed / 40)
        old_checkpoint = checkpoint
        checkpoint = (checkpoint + progress_rate) % 100
        
        # Detect lap completion
        if checkpoint < old_checkpoint:
            lap += 1
        
        states.append({
            'speed': speed,
            'checkpoint': checkpoint,
            'lap': lap
        })
    
    return states

# Generate extended episode
extended_episode = generate_extended_episode(n_steps=200)

# Reset and evaluate
reward_v1 = MarioKartRewardV1()
reward_v2 = MarioKartRewardV2()
reward_v3 = MarioKartRewardV3()

reward_v1.reset()
reward_v2.reset()
reward_v3.reset()

ext_rewards_v1 = []
ext_rewards_v2 = []
ext_rewards_v3 = []

for state in extended_episode:
    ext_rewards_v1.append(reward_v1.compute(state))
    ext_rewards_v2.append(reward_v2.compute(state))
    ext_rewards_v3.append(reward_v3.compute(state))

# Visualization
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

steps = range(len(extended_episode))
speeds = [s['speed'] for s in extended_episode]
laps = [s['lap'] for s in extended_episode]

# Speed and lap profile
ax1 = axes[0, 0]
ax1.plot(steps, speeds, 'b-', label='Speed', linewidth=1.5)
ax1_twin = ax1.twinx()
ax1_twin.plot(steps, laps, 'g--', label='Lap', linewidth=2)
ax1.set_xlabel('Step')
ax1.set_ylabel('Speed', color='blue')
ax1_twin.set_ylabel('Lap', color='green')
ax1.set_title('Extended Episode: Speed and Lap Progress')
for cs in [30, 80, 150]:
    ax1.axvline(x=cs, color='red', linestyle='--', alpha=0.3)
ax1.grid(True, alpha=0.3)

# Cumulative rewards
ax2 = axes[0, 1]
ax2.plot(steps, np.cumsum(ext_rewards_v1), 'g-', label='V1', linewidth=2)
ax2.plot(steps, np.cumsum(ext_rewards_v2), 'b-', label='V2', linewidth=2)
ax2.plot(steps, np.cumsum(ext_rewards_v3), 'r-', label='V3', linewidth=2)
ax2.set_xlabel('Step')
ax2.set_ylabel('Cumulative Reward')
ax2.set_title('Cumulative Rewards (200 Steps)')
ax2.legend()
ax2.grid(True, alpha=0.3)

# Rolling average reward
ax3 = axes[1, 0]
window = 20
rolling_v1 = np.convolve(ext_rewards_v1, np.ones(window)/window, mode='valid')
rolling_v2 = np.convolve(ext_rewards_v2, np.ones(window)/window, mode='valid')
rolling_v3 = np.convolve(ext_rewards_v3, np.ones(window)/window, mode='valid')
ax3.plot(rolling_v1, 'g-', label='V1', alpha=0.8)
ax3.plot(rolling_v2, 'b-', label='V2', alpha=0.8)
ax3.plot(rolling_v3, 'r-', label='V3', alpha=0.8)
ax3.set_xlabel('Step')
ax3.set_ylabel('Rolling Avg Reward (20 steps)')
ax3.set_title('Rolling Average Reward')
ax3.legend()
ax3.grid(True, alpha=0.3)

# Final comparison
ax4 = axes[1, 1]
versions = ['V1', 'V2', 'V3']
totals = [sum(ext_rewards_v1), sum(ext_rewards_v2), sum(ext_rewards_v3)]
colors = ['green', 'blue', 'red']
bars = ax4.bar(versions, totals, color=colors, alpha=0.7, edgecolor='black')
ax4.set_ylabel('Total Reward')
ax4.set_title('Total Episode Reward (200 Steps)')
for bar, total in zip(bars, totals):
    ax4.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1, 
             f'{total:.1f}', ha='center', va='bottom', fontsize=12)
ax4.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

# Summary statistics
final_lap = extended_episode[-1]['lap']
print(f"\nExtended Episode Summary:")
print(f"  Total Steps: 200")
print(f"  Laps Completed: {final_lap}")
print(f"  Collisions: 3")
print(f"\nTotal Rewards:")
print(f"  V1 (Speed+Progress):     {sum(ext_rewards_v1):.2f}")
print(f"  V2 (+Collision Penalty): {sum(ext_rewards_v2):.2f}")
print(f"  V3 (+Lap Bonus+Time):    {sum(ext_rewards_v3):.2f}")

## Exercise: Design Your Own Reward Function

Create `MarioKartRewardV4` that extends V3 with a **position-based reward** for competitive racing.

### Requirements

1. Add a `position` field to the state (1-8, where 1 is first place)
2. Implement position-based reward:
   - Higher reward for better positions
   - Bonus for overtaking (improving position)
   - Penalty for being overtaken

### Hints

```python
class MarioKartRewardV4(MarioKartRewardV3):
    def __init__(self, ..., position_weight=0.2):
        super().__init__(...)
        self.position_weight = position_weight
        self.last_position = 4  # Start mid-pack
    
    def compute(self, state):
        base_reward = super().compute(state)
        position = state.get('position', 4)
        
        # Position reward: 1st place = 1.0, 8th place = 0.0
        position_r = (8 - position) / 7 * self.position_weight
        
        # Overtake bonus/penalty
        # ... your implementation here ...
        
        return base_reward + position_r + overtake_r
```

### Discussion Questions

1. How might position rewards interact with other reward components?
2. Could position rewards lead to aggressive/dangerous driving behavior?
3. Should the position reward scale differently for 1st vs 8th place?

## Quiz

Test your understanding of reward engineering concepts:

### Question 1
Why can sparse rewards be problematic for RL training?

<details>
<summary>Click for answer</summary>

Sparse rewards make learning difficult because:
- The agent may rarely or never experience the reward during exploration
- Credit assignment is hard (which actions led to the reward?)
- Learning is slow with infrequent feedback
- The agent gets no guidance on whether it's improving
</details>

### Question 2
What is potential-based reward shaping and why is it useful?

<details>
<summary>Click for answer</summary>

Potential-based reward shaping adds intermediate rewards of the form F(s,s') = gamma * Phi(s') - Phi(s). It's useful because:
- It preserves the optimal policy (theoretically guaranteed)
- Provides dense feedback to accelerate learning
- Allows incorporating domain knowledge (e.g., distance to goal)
</details>

### Question 3
How does MarioKartRewardV2 detect collisions?

<details>
<summary>Click for answer</summary>

V2 detects collisions by monitoring sudden speed drops. When the current speed is much lower than the previous speed (drop exceeds `collision_threshold`), it assumes a collision occurred and applies the `collision_penalty`. This is a heuristic approach that works when direct collision information isn't available.
</details>

### Question 4
Why include a time penalty in MarioKartRewardV3?

<details>
<summary>Click for answer</summary>

The time penalty (small negative reward per timestep) serves several purposes:
- Encourages faster completion of laps/races
- Prevents the agent from "stalling" or moving very slowly
- Creates urgency that better matches the racing objective
- Helps distinguish between completing quickly vs slowly
</details>

In [None]:
# Save reward_functions.py Module

reward_module_code = '''
"""
Reward Functions for Mario Kart Reinforcement Learning

This module provides modular, composable reward functions for training
RL agents on racing games. Each reward function follows the RewardFunction
abstract base class interface.

Classes:
    RewardFunction: Abstract base class
    SpeedReward: Reward based on normalized speed
    ProgressReward: Reward based on checkpoint progress
    CompositeReward: Combine multiple reward functions
    MarioKartRewardV1: Speed + Progress
    MarioKartRewardV2: V1 + Collision penalty
    MarioKartRewardV3: V2 + Lap bonus + Time penalty

Example:
    >>> reward_fn = MarioKartRewardV3()
    >>> reward_fn.reset()
    >>> state = {\'speed\': 80, \'checkpoint\': 50, \'lap\': 1}
    >>> reward = reward_fn.compute(state)
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Tuple


class RewardFunction(ABC):
    """Abstract base class for reward functions."""
    
    @abstractmethod
    def compute(self, state: Dict) -> float:
        """Compute reward given current state."""
        pass
    
    @abstractmethod
    def reset(self) -> None:
        """Reset internal state for new episode."""
        pass


class SpeedReward(RewardFunction):
    """Reward based on normalized speed."""
    
    def __init__(self, max_speed: float = 100.0, weight: float = 1.0):
        self.max_speed = max_speed
        self.weight = weight
    
    def compute(self, state: Dict) -> float:
        speed = state.get(\'speed\', 0)
        normalized = min(speed / self.max_speed, 1.0)
        return self.weight * normalized
    
    def reset(self) -> None:
        pass


class ProgressReward(RewardFunction):
    """Reward based on checkpoint progress with lap wrap handling."""
    
    def __init__(self, total_checkpoints: int = 100, weight: float = 1.0):
        self.total_checkpoints = total_checkpoints
        self.weight = weight
        self.last_checkpoint = 0
    
    def compute(self, state: Dict) -> float:
        current_cp = state.get(\'checkpoint\', 0)
        delta = current_cp - self.last_checkpoint
        
        if delta < -self.total_checkpoints // 2:
            delta += self.total_checkpoints
        elif delta > self.total_checkpoints // 2:
            delta -= self.total_checkpoints
        
        self.last_checkpoint = current_cp
        return self.weight * delta
    
    def reset(self) -> None:
        self.last_checkpoint = 0


class CompositeReward(RewardFunction):
    """Combine multiple reward functions with weights."""
    
    def __init__(self, reward_fns: List[Tuple[RewardFunction, float]]):
        self.reward_fns = reward_fns
    
    def compute(self, state: Dict) -> float:
        total = 0.0
        for fn, weight in self.reward_fns:
            total += weight * fn.compute(state)
        return total
    
    def reset(self) -> None:
        for fn, _ in self.reward_fns:
            fn.reset()


class MarioKartRewardV1(RewardFunction):
    """V1: Speed + Progress rewards."""
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100):
        self.speed_reward = SpeedReward(max_speed, weight=0.3)
        self.progress_reward = ProgressReward(total_checkpoints, weight=1.0)
    
    def compute(self, state: Dict) -> float:
        return (self.speed_reward.compute(state) + 
                self.progress_reward.compute(state))
    
    def reset(self) -> None:
        self.speed_reward.reset()
        self.progress_reward.reset()


class MarioKartRewardV2(MarioKartRewardV1):
    """V2: V1 + Collision penalty (via speed drop detection)."""
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100,
                 collision_threshold: float = 20.0, collision_penalty: float = -0.5):
        super().__init__(max_speed, total_checkpoints)
        self.collision_threshold = collision_threshold
        self.collision_penalty = collision_penalty
        self.last_speed = 0
    
    def compute(self, state: Dict) -> float:
        base_reward = super().compute(state)
        current_speed = state.get(\'speed\', 0)
        speed_drop = self.last_speed - current_speed
        
        collision_r = self.collision_penalty if speed_drop > self.collision_threshold else 0
        self.last_speed = current_speed
        return base_reward + collision_r
    
    def reset(self) -> None:
        super().reset()
        self.last_speed = 0


class MarioKartRewardV3(MarioKartRewardV2):
    """V3: V2 + Lap bonus + Time penalty."""
    
    def __init__(self, max_speed: float = 100.0, total_checkpoints: int = 100,
                 collision_threshold: float = 20.0, collision_penalty: float = -0.5,
                 lap_bonus: float = 10.0, time_penalty: float = -0.01):
        super().__init__(max_speed, total_checkpoints, 
                        collision_threshold, collision_penalty)
        self.lap_bonus = lap_bonus
        self.time_penalty = time_penalty
        self.last_lap = 0
    
    def compute(self, state: Dict) -> float:
        base_reward = super().compute(state)
        current_lap = state.get(\'lap\', 0)
        
        lap_r = self.lap_bonus if current_lap > self.last_lap else 0
        self.last_lap = current_lap
        
        return base_reward + lap_r + self.time_penalty
    
    def reset(self) -> None:
        super().reset()
        self.last_lap = 0


if __name__ == "__main__":
    # Quick test
    reward_fn = MarioKartRewardV3()
    reward_fn.reset()
    
    test_states = [
        {\'speed\': 50, \'checkpoint\': 0, \'lap\': 0},
        {\'speed\': 80, \'checkpoint\': 5, \'lap\': 0},
        {\'speed\': 80, \'checkpoint\': 10, \'lap\': 0},
    ]
    
    print("Testing MarioKartRewardV3:")
    for state in test_states:
        r = reward_fn.compute(state)
        print(f"  State: {state} -> Reward: {r:.3f}")
'''

# Write the module
with open('reward_functions.py', 'w') as f:
    f.write(reward_module_code)

print("Saved: reward_functions.py")

# Verify import works
import importlib.util
spec = importlib.util.spec_from_file_location("reward_functions", "reward_functions.py")
reward_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(reward_module)

print("\nModule contents:")
print([name for name in dir(reward_module) if not name.startswith('_')])

## Summary

In this notebook, we learned:

1. **Reward Design Fundamentals**
   - The reward function defines what behavior the agent learns
   - Sparse vs dense rewards trade off simplicity for learning speed

2. **Reward Shaping**
   - Potential-based shaping can accelerate learning
   - Must be careful not to change the optimal policy

3. **Modular Reward Architecture**
   - Abstract base class for consistent interface
   - Composable components (speed, progress, etc.)
   - Versioned rewards for iterative improvement

4. **Racing-Specific Considerations**
   - Progress tracking with lap wraparound
   - Collision detection via speed drops
   - Balance between speed and safety

5. **Avoiding Pitfalls**
   - Reward hacking detection and prevention
   - Importance of visualization and monitoring

### Next Steps
- Implement V4 with position rewards
- Integrate with actual Mario Kart environment
- Experiment with different weight configurations
- Use reward_functions.py in training scripts