In [14]:
import numpy as np
from math import exp, factorial

MAX_CARS = 20
MAX_MOVE = 5

RENT_REWARD = 10
MOVE_COST = 2
DISCOUNT = 0.9

# Poisson parameters
LAMBDA_REQ_1 = 3
LAMBDA_REQ_2 = 4
LAMBDA_RET_1 = 3
LAMBDA_RET_2 = 2

# Exercise 4.7 modifications
FREE_MOVE_1_TO_2 = 1
PARKING_LIMIT = 10
PARKING_COST = 4

POISSON_MAX = 11   # truncation

In [15]:
def poisson_pmf(lam, n):
    return (lam ** n) * exp(-lam) / factorial(n)

In [16]:
def poisson_probs(lam, max_n=POISSON_MAX):
    probs = np.zeros(max_n + 1)
    for n in range(max_n):
        probs[n] = poisson_pmf(lam,n)
        
    probs[max_n] = 1 - probs[:max_n].sum()
    return probs

In [17]:
P_REQ_1 = poisson_probs(LAMBDA_REQ_1)
P_REQ_2 = poisson_probs(LAMBDA_REQ_2)
P_RET_1 = poisson_probs(LAMBDA_RET_1)
P_RET_2 = poisson_probs(LAMBDA_RET_2)

In [18]:
def valid_actions(state):
    c1, c2 = state
    actions = []
    for a in range(-MAX_MOVE, MAX_MOVE + 1):
        if a < 0:
            move = -a
            if move <= c2 and c1 + move <= MAX_CARS:
                actions.append(a)
        else:
            move = a
            if move <= c1 and c2 + move <= MAX_CARS:
                actions.append(a)
    return actions

In [19]:
def moving_cost(a):
    if a > 0:
        paid = max(a - FREE_MOVE_1_TO_2, 0)
        return MOVE_COST * paid
    else:
        return MOVE_COST * abs(a)

In [20]:
def parking_cost(c1_after_move, c2_after_move):
    cost = 0
    if c1_after_move > PARKING_LIMIT:
        cost += PARKING_COST
    if c2_after_move > PARKING_LIMIT:
        cost += PARKING_COST
    return cost

In [21]:
def transition(state, action, req1, req2, ret1, ret2):
    c1, c2 = state
    c1_m = c1 - action
    c2_m = c2 + action
    
    cost_move = moving_cost(action)
    cost_park = parking_cost(c1_m, c2_m)
    
    rent1 = min(c1_m, req1)
    rent2 = min(c2_m, req2)
    
    reward = RENT_REWARD * (rent1 + rent2)
    reward -= cost_move + cost_park
    
    c1_after = c1_m - rent1
    c2_after = c2_m - rent2
    
    next_c1 = min(c1_after + ret1, MAX_CARS)
    next_c2 = min(c2_after + ret2, MAX_CARS)
    
    return (next_c1, next_c2), reward

In [22]:
def expected_return(state, action, V):

    total = 0.0

    for req1 in range(POISSON_MAX + 1):
        for req2 in range(POISSON_MAX + 1):
            for ret1 in range(POISSON_MAX + 1):
                for ret2 in range(POISSON_MAX + 1):

                    prob = P_REQ_1[req1] * P_REQ_2[req2] * P_RET_1[ret1] * P_RET_2[ret2]

                    next_state, reward = transition(
                        state, action, req1, req2, ret1, ret2
                    )

                    total += prob * (reward + DISCOUNT * V[next_state])

    return total

In [23]:
def policy_evaluation(policy, V, theta=1e-3):

    deltas = []  # store max change per sweep

    while True:
        delta = 0.0

        for c1 in range(MAX_CARS + 1):
            for c2 in range(MAX_CARS + 1):

                s = (c1, c2)
                v_old = V[s]

                a = policy[s]
                V[s] = expected_return(s, a, V)

                delta = max(delta, abs(v_old - V[s]))

        deltas.append(delta)

        if delta < theta:
            break

    return V, deltas

In [24]:
def policy_improvement(policy, V):

    stable = True

    for c1 in range(MAX_CARS + 1):
        for c2 in range(MAX_CARS + 1):

            s = (c1, c2)
            old_action = policy[s]

            actions = valid_actions(s)
            values = [expected_return(s, a, V) for a in actions]

            best_action = actions[np.argmax(values)]
            policy[s] = best_action

            if best_action != old_action:
                stable = False

    return policy, stable

In [25]:
def policy_iteration_with_tracking():

    V = np.zeros((MAX_CARS + 1, MAX_CARS + 1))
    policy = np.zeros((MAX_CARS + 1, MAX_CARS + 1), dtype=int)

    all_deltas = []
    policy_returns = []

    while True:

        V, deltas = policy_evaluation(policy, V)
        all_deltas.extend(deltas)

        # total expected value under policy (sum over states)
        policy_returns.append(np.sum(V))

        policy, stable = policy_improvement(policy, V)

        if stable:
            break

    return policy, V, all_deltas, policy_returns

In [None]:
import matplotlib.pyplot as plt

policy, V, deltas, returns = policy_iteration_with_tracking()

# ---- Plot convergence (Bellman error) ----
plt.figure()
plt.plot(deltas)
plt.xlabel("Evaluation Sweep")
plt.ylabel("Max Value Change")
plt.title("Policy Evaluation Convergence")
plt.show()

# ---- Plot expected return per policy improvement ----
plt.figure()
plt.plot(returns)
plt.xlabel("Policy Iteration Step")
plt.ylabel("Sum of V(s)")
plt.title("Policy Improvement Progress")
plt.show()