In [4]:
%load_ext autoreload
%autoreload 2

import numpy as np
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from scipy.stats import poisson
from typing import Tuple

sns.set('notebook', font_scale=1.1, rc={'figure.figsize': (6, 6)})
sns.set_style('ticks', rc={'figure.facecolor': 'none', 'axes.facecolor': 'none'})
%config InlineBackend.figure_format = 'svg'
matplotlib.rcParams['figure.facecolor'] = 'white'

import logging
logging.basicConfig(level=logging.DEBUG)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### 4.3 Policy Iteration: Jack's Car Rental

---

We implement **Exercise 4.7** and write a program for policy iteration to solve Jack's car rental problem exactly as shown in Figure 4.2 (See the policy iteration algorithm on page 80).

$$
$$

**Intuition:** start with a guess policy --> evaluate --> improve it --> evaluate again --> improve it --> and so on!

In [2]:
'''
Parameters
'''

SEED = 42
np.random.seed(SEED)

LOCATIONS = ["A", "B"]

# Pay for renting out a car 
CAR_REWARD = 10

# Cars can be moved from one loc to the other
CAR_MOVE_COST = 2

# Assume that the number of cars requested and returned at each location are Poisson random variables
LAMBDA_REQ = {"A": 3, "B": 4}
LAMBDA_RETURN = {"A": 4, "B": 2}

# Assume that there can be no more than 20 cars per location
MAX_CARS = 5

# The maximum cars that can be moved overnight
MAX_MOVING_CARS = 3

# Discount rate
GAMMA = 0.9

# The action indicates how many cars to move from point A to B or B to A
# Positive: move cars from A --> B | Negative: move cars from B --> A
ACTIONS = np.arange(-MAX_MOVING_CARS, MAX_MOVING_CARS + 1)

In [3]:
from functools import lru_cache

MAX_CARS_POISSON = 11

@lru_cache
def prob_cars(cars: int, lambda_: int) -> float:
    return poisson.pmf(k=cars, mu=lambda_)

@lru_cache
def prob_state_reward_given_state_action(
    state_prime: Tuple[int, int],
    reward: int,
    state: Tuple[int, int],
    action: int,
) -> float:
    net_cars_a = (state[0] + max(0, action)) - state_prime[0]
    net_cars_b = (state[1] - min(0, action)) - state_prime[1]
    return np.sum(
        [
            (
                prob_cars(req_cars_a, LAMBDA_REQ["A"]) *
                prob_cars(return_cars_a, LAMBDA_RETURN["A"])
            ) * (
                prob_cars(req_cars_b, LAMBDA_REQ["B"]) *
                prob_cars(return_cars_b, LAMBDA_RETURN["B"])
            )
            for req_cars_a in range(MAX_CARS_POISSON + 1)
            for return_cars_a in range(MAX_CARS_POISSON + 1)
            for req_cars_b in range(MAX_CARS_POISSON + 1)
            for return_cars_b in range(MAX_CARS_POISSON + 1)
            if (
                req_cars_a + return_cars_a == net_cars_a
            ) and (
                req_cars_b + return_cars_b == net_cars_b
            ) and (
                reward == (
                    (req_cars_a + req_cars_b) * CAR_REWARD
                        - np.abs(action) * CAR_MOVE_COST
                )
            )  
        ]
    )


In [None]:
MAX_ITER_POLICY_EVALUATION = 5
MAX_ITER_POLICY_IMPROVEMENT = 5

states = [
    (cars_a, cars_b)
    for cars_a in range(0, MAX_CARS + 1)
    for cars_b in range(0, MAX_CARS + 1)
]

rewards = np.sort(
    np.unique(
        [
            (
                (rent_cars_a + rent_cars_b) * CAR_REWARD
                    - np.abs(move_cars) * -CAR_MOVE_COST
            )
            for rent_cars_a in range(MAX_CARS + 1)
            for rent_cars_b in range(MAX_CARS + 1)
            for move_cars in ACTIONS
        ],
    )
)


# Step 1. Initialization
values = np.zeros(shape=(MAX_CARS + 1, MAX_CARS + 1), dtype=int)
policies = np.zeros(shape=(MAX_CARS + 1, MAX_CARS + 1), dtype=int)

policy_improvement_iters = 0
policy_stable = False
while not policy_stable and policy_improvement_iters < MAX_ITER_POLICY_IMPROVEMENT:
    policy_improvement_iters += 1

    delta = np.inf
    policy_evaluation_iters = 0
    # Step 2. Policy Evaluation
    while delta > THETA and policy_evaluation_iters < MAX_ITER_POLICY_EVALUATION:
        policy_evaluation_iters += 1
        delta = 0
        for state in states:
            state_value = values[state]
            action = policies[state]
            values[state] = np.sum(
                [
                    prob_state_reward_given_state_action(
                        state_prime=state_prime,
                        reward=reward,
                        state=state,
                        action=action,
                    ) * (reward + GAMMA * values[state_prime])
                    for state_prime in states
                    for reward in rewards
                ]
            )
            delta = max(delta, np.abs(state_value - values[state]))
        
        # _, ax = plt.subplots()
        # sns.heatmap(values, cbar=True, cmap='Blues', ax=ax)
        # ax.set_title(f'k = {policy_evaluation_iters}', loc='left')
        # ax.set_title(r'Policy evaluation | $v_{\pi}k$', y=1.05)
        # ax.set_xlabel('Number of cars at location A')
        # ax.set_ylabel('Number of cars at location B')
        # plt.show()
        
        logging.info(f"Completed Policy Evaluation {policy_evaluation_iters}")

        if delta < THETA:
            break

    # Step 3. Policy Improvement
    policy_stable = True
    for state in states:
        old_action = policies[state]
        policies[state] = np.argmax(
            [
                np.sum(
                    [
                        prob_state_reward_given_state_action(
                            state_prime=state_prime,
                            reward=reward,
                            state=state,
                            action=action,
                        ) * (reward + GAMMA * values[state_prime])
                        for reward in rewards
                        for state_prime in states
                    ]
                )
                for action in ACTIONS
            ]
        ) - MAX_MOVING_CARS

        if old_action != policies[state]:
            policy_stable = False

    _, ax = plt.subplots()
    sns.heatmap(policies, cbar=True, cmap='Blues', ax=ax)
    ax.set_title(f'k = {policy_improvement_iters}', loc='left')
    ax.set_title(r'Policy Improvement | $\pi_k$', y=1.05)
    ax.set_xlabel('Number of cars at location A')
    ax.set_ylabel('Number of cars at location B')
    plt.show()

    logging.info(f"Completed Policy Improvement {policy_improvement_iters}")

In [None]:
x = np.arange(MAX_CARS+1)
y = np.arange(MAX_CARS+1)
X, Y = np.meshgrid(x, y)

fig = plt.figure()
fig.suptitle(f'v π k = {policy_evaluation_iters}',)
ax = plt.axes(projection='3d')
im = ax.plot_surface(X, Y, values, rstride=1, cstride=1, cmap='viridis', edgecolor='none')
ax.set_zlabel('Return')
ax.set_xlabel('# of cars at loc A')
ax.set_ylabel('# of cars at loc B')
ax.view_init(30, 150)
plt.colorbar(im);