# Mountain Car Task

Consider the task of driving an underpowered car up a steep mountain road, as suggested by the diagram below.

In [7]:
from __future__ import annotations

import os
import math
from dataclasses import dataclass
from typing import List, Tuple

import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from tqdm import tqdm

# Use a non-interactive backend – we only save figures to files
matplotlib.use("Agg")


# ============================================================
# Mountain Car environment
# ============================================================
@dataclass
class MCConfig:
    min_position: float = -1.2
    max_position: float = 0.5
    max_speed: float = 0.07
    goal_position: float = 0.5
    force: float = 0.001
    gravity: float = 0.0025
    max_steps: int = 300  # same cap used in the book


class MountainCar:
    """
    Minimal Mountain Car with three discrete actions:
      0 = push left, 1 = coast, 2 = push right
    Reward is -1 on every step until the goal is reached.
    """

    def __init__(self, cfg: MCConfig | None = None) -> None:
        self.cfg = cfg or MCConfig()
        self.reset()

    def reset(self) -> np.ndarray:
        self.position = np.random.uniform(-0.6, -0.4)
        self.velocity = 0.0
        self.t = 0
        return np.array([self.position, self.velocity], dtype=np.float32)

    def step(self, action: int) -> Tuple[np.ndarray, float, bool]:
        assert action in (0, 1, 2)
        cfg = self.cfg

        # convert action to acceleration {-1, 0, +1}
        accel = action - 1

        self.velocity += accel * cfg.force - cfg.gravity * math.cos(3 * self.position)
        self.velocity = float(np.clip(self.velocity, -cfg.max_speed, cfg.max_speed))

        self.position += self.velocity
        self.position = float(np.clip(self.position, cfg.min_position, cfg.max_position))

        # stop if we bounce into the left wall
        if self.position == cfg.min_position and self.velocity < 0:
            self.velocity = 0.0

        self.t += 1
        done = bool(self.position >= cfg.goal_position or self.t >= cfg.max_steps)
        reward = -1.0

        state = np.array([self.position, self.velocity], dtype=np.float32)
        return state, reward, done


# Global environment used in all experiments
ENV = MountainCar()


# ============================================================
# Tile coding
# ============================================================
class IHT:
    """
    Index Hash Table for tile coding: maps coordinate tuples to
    integer indices up to `size`, then falls back to hashing.
    """

    def __init__(self, size: int) -> None:
        self.size = size
        self._dict: dict[Tuple[int, ...], int] = {}
        self.overfull_count = 0

    def get_index(self, key: Tuple[int, ...]) -> int:
        if key in self._dict:
            return self._dict[key]

        if len(self._dict) >= self.size:
            if self.overfull_count == 0:
                print("[IHT] Capacity reached – using hashed indices (collisions possible).")
            self.overfull_count += 1
            return hash(key) % self.size

        idx = len(self._dict)
        self._dict[key] = idx
        return idx


class TileCoder:
    """
    Tile coder for 2D state (position, velocity) + action.
    """

    def __init__(
        self,
        num_tilings: int,
        num_tiles: int,
        low: np.ndarray,
        high: np.ndarray,
        iht_size: int,
    ) -> None:
        self.num_tilings = num_tilings
        self.num_tiles = num_tiles
        self.low = low.astype(np.float32)
        self.high = high.astype(np.float32)
        self.iht = IHT(iht_size)

        self.tile_width = (self.high - self.low) / (num_tiles - 1)

    def _coords_for_tiling(self, tiling: int, state: np.ndarray, action: int) -> Tuple[int, ...]:
        # each tiling is slightly shifted
        offsets = (tiling / self.num_tilings) * self.tile_width
        shifted = state + offsets
        ratios = (shifted - self.low) / self.tile_width
        indices = np.floor(ratios).astype(int)
        return (tiling, int(indices[0]), int(indices[1]), int(action))

    def tiles(self, state: np.ndarray, action: int) -> List[int]:
        active: List[int] = []
        for tiling in range(self.num_tilings):
            coords = self._coords_for_tiling(tiling, state, action)
            idx = self.iht.get_index(coords)
            active.append(idx)
        return active


# ============================================================
# Action-value function with tile coding
# ============================================================
class ActionValueFunction:
    """
    Linear approximator for Q(s, a) using tile-coded features:

        Q(s, a) = sum_{i in active_tiles(s, a)} w_i
    """

    def __init__(
        self,
        step_size_scaled: float,   # this is α × num_tilings (like in the book)
        num_tilings: int = 8,
        num_tiles: int = 8,
        iht_size: int = 4096,
        gamma: float = 1.0,
    ) -> None:
        self.num_actions = 3
        self.num_tilings = num_tilings
        self.gamma = gamma

        low = np.array([ENV.cfg.min_position, -ENV.cfg.max_speed], dtype=np.float32)
        high = np.array([ENV.cfg.max_position, ENV.cfg.max_speed], dtype=np.float32)

        self.tile_coder = TileCoder(num_tilings, num_tiles, low, high, iht_size)
        self.weights = np.zeros(iht_size, dtype=np.float32)

        # real step-size per update
        self.alpha = step_size_scaled / num_tilings

    def _features(self, state: np.ndarray, action: int) -> List[int]:
        return self.tile_coder.tiles(state, action)

    def q(self, state: np.ndarray, action: int) -> float:
        idxs = self._features(state, action)
        return float(self.weights[idxs].sum())

    def q_all(self, state: np.ndarray) -> np.ndarray:
        return np.array([self.q(state, a) for a in range(self.num_actions)], dtype=np.float32)

    def greedy_action(self, state: np.ndarray) -> int:
        return int(np.argmax(self.q_all(state)))

    def epsilon_greedy(self, state: np.ndarray, epsilon: float) -> int:
        if np.random.rand() < epsilon:
            return int(np.random.randint(self.num_actions))
        return self.greedy_action(state)

    def update(self, state: np.ndarray, action: int, target: float) -> None:
        idxs = self._features(state, action)
        estimate = self.weights[idxs].sum()
        delta = target - estimate
        self.weights[idxs] += self.alpha * delta


# ============================================================
# One episode of n-step semi-gradient SARSA
# ============================================================
def run_n_step_sarsa_episode(
    value_function: ActionValueFunction,
    n: int = 4,
    epsilon: float = 0.0,
) -> int:
    """
    Run a single n-step semi-gradient SARSA episode.

    Returns the number of steps in that episode.
    """
    gamma = value_function.gamma
    state = ENV.reset()
    action = value_function.epsilon_greedy(state, epsilon)

    states = [state]
    actions = [action]
    rewards = [0.0]  # placeholder for R_0

    T = math.inf
    t = 0

    while True:
        if t < T:
            next_state, reward, done = ENV.step(actions[t])
            states.append(next_state)
            rewards.append(reward)

            if done:
                T = t + 1
            else:
                next_action = value_function.epsilon_greedy(next_state, epsilon)
                actions.append(next_action)

        tau = t - n + 1
        if tau >= 0:
            # n-step return
            G = 0.0
            for i in range(tau + 1, min(tau + n, T) + 1):
                G += (gamma ** (i - tau - 1)) * rewards[i]

            if tau + n < T:
                G += (gamma ** n) * value_function.q(states[tau + n], actions[tau + n])

            value_function.update(states[tau], actions[tau], G)

        if tau == T - 1:
            return int(T - 1)  # number of actual steps

        t += 1


# ============================================================
# Cost-to-go surface
# ============================================================
def plot_cost_surface(
    value_function: ActionValueFunction,
    ax,
    title: str,
    num_positions: int = 50,
    num_velocities: int = 50,
) -> None:
    """
    Approximate cost-to-go by -max_a Q(s, a) on a grid of states.
    """
    cfg = ENV.cfg
    positions = np.linspace(cfg.min_position, cfg.max_position, num_positions)
    velocities = np.linspace(-cfg.max_speed, cfg.max_speed, num_velocities)
    P, V = np.meshgrid(positions, velocities)
    Z = np.zeros_like(P)

    for i in range(P.shape[0]):
        for j in range(P.shape[1]):
            s = np.array([P[i, j], V[i, j]], dtype=np.float32)
            q_vals = value_function.q_all(s)
            Z[i, j] = -np.max(q_vals)

    ax.plot_surface(P, V, Z, rstride=1, cstride=1, linewidth=0, antialiased=True)
    ax.set_title(title)
    ax.set_xlabel("Position")
    ax.set_ylabel("Velocity")
    ax.set_zlabel("Cost-to-go")


# ============================================================
# Run experiments (analogues of Figures 10.1–10.4)
# ============================================================
output_dir = "generated_pictures"
os.makedirs(output_dir, exist_ok=True)

np.random.seed(0)
tilings = 8

# ---------- Figure 1: evolving cost-to-go ----------
episodes_total = 9000
snapshots = [0, 99, episodes_total - 1]

fig = plt.figure(figsize=(18, 5))
axes = [fig.add_subplot(1, len(snapshots), i + 1, projection="3d") for i in range(len(snapshots))]

step_size_for_surface = 0.3  # α × tilings
vf_surface = ActionValueFunction(step_size_scaled=step_size_for_surface, num_tilings=tilings)

for ep in tqdm(range(episodes_total), desc="Figure 1 training"):
    run_n_step_sarsa_episode(vf_surface, n=4, epsilon=0.0)
    if ep in snapshots:
        idx = snapshots.index(ep)
        plot_cost_surface(vf_surface, axes[idx], f"Episode {ep + 1}")

plt.tight_layout()
plt.savefig(os.path.join(output_dir, "figure_10_1.png"))
plt.close()


# ---------- Figure 2: effect of step-size ----------
runs = 10
episodes = 500
step_sizes_list = [0.1, 0.2, 0.5]  # these are α × tilings
times = np.zeros((len(step_sizes_list), episodes))

for run in range(runs):
    vfs = [ActionValueFunction(step_size_scaled=s, num_tilings=tilings) for s in step_sizes_list]
    for idx, vf in enumerate(vfs):
        for ep in tqdm(range(episodes), desc=f"Figure 2 run {run+1}, α×m={step_sizes_list[idx]}", leave=False):
            steps_taken = run_n_step_sarsa_episode(vf, n=4, epsilon=0.0)
            times[idx, ep] += steps_taken

times /= runs

plt.figure(figsize=(6, 4))
for i, s in enumerate(step_sizes_list):
    plt.plot(times[i], label=rf"$\alpha = {s}/{tilings}$")
plt.xlabel("Episode")
plt.ylabel(f"Steps per episode (log scale, averaged over {runs} runs)")
plt.yscale("log")
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "figure_10_2.png"))
plt.close()


# ---------- Figure 3: n = 1 vs n = 8 ----------
runs = 10
episodes = 500
step_sizes_n = [0.5, 0.3]  # α × tilings for n=1 and n=8
n_values = [1, 8]
times_n = np.zeros((len(step_sizes_n), episodes))

for run in range(runs):
    vfs = [ActionValueFunction(step_size_scaled=step_sizes_n[i], num_tilings=tilings)
           for i in range(len(step_sizes_n))]
    for idx, vf in enumerate(vfs):
        n_current = n_values[idx]
        for ep in tqdm(range(episodes), desc=f"Figure 3 run {run+1}, n={n_current}", leave=False):
            steps_taken = run_n_step_sarsa_episode(vf, n=n_current, epsilon=0.0)
            times_n[idx, ep] += steps_taken

times_n /= runs

plt.figure(figsize=(6, 4))
for i, n_val in enumerate(n_values):
    plt.plot(times_n[i], label=f"n = {n_val:.1f}")
plt.xlabel("Episode")
plt.ylabel(f"Steps per episode (log scale, averaged over {runs} runs)")
plt.yscale("log")
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "figure_10_3.png"))
plt.close()


# ---------- Figure 4: effect of α × tilings and n ----------
runs = 5
episodes = 50
max_steps = ENV.cfg.max_steps

alpha_scaled_values = np.arange(0.25, 1.75, 0.25)   # α × tilings
n_grid = np.power(2, np.arange(0, 5))               # 1, 2, 4, 8, 16
mean_steps = np.zeros((len(n_grid), len(alpha_scaled_values)))

for run in range(runs):
    for n_idx, n_val in enumerate(n_grid):
        for a_idx, alpha_scaled in enumerate(alpha_scaled_values):
            # for some (n, α) pairs the algorithm diverges – treat as max cost
            if (n_val == 8 and alpha_scaled > 1.0) or (n_val == 16 and alpha_scaled > 0.75):
                mean_steps[n_idx, a_idx] += max_steps * episodes
                continue

            vf = ActionValueFunction(step_size_scaled=alpha_scaled, num_tilings=tilings)
            total_steps = 0
            for ep in tqdm(range(episodes), desc=f"Figure 4 run {run+1}, n={n_val}, α×m={alpha_scaled}", leave=False):
                total_steps += run_n_step_sarsa_episode(vf, n=n_val, epsilon=0.0)

            mean_steps[n_idx, a_idx] += total_steps

# average over runs and episodes
mean_steps /= (runs * episodes)

plt.figure(figsize=(6, 4))
for i, n_val in enumerate(n_grid):
    plt.plot(alpha_scaled_values, mean_steps[i], label=f"n = {n_val}")
plt.xlabel(r"$\alpha \times$ number of tilings (8)")
plt.ylabel(f"Steps per episode (avg over first {episodes} episodes and {runs} runs)")
plt.ylim([220, max_steps])
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "figure_10_4.png"))
plt.close()

print(f"All figures saved into: {os.path.abspath(output_dir)}")


Figure 1 training: 100%|██████████| 9000/9000 [01:25<00:00, 105.11it/s]
                                                                               

All figures saved into: /Users/antonstepanyan/Desktop/RL/Reinforcement-Learning/mountain-car/notebooks/generated_pictures
