In [None]:
import numpy as np
import random
import math
from types import SimpleNamespace as sn
from tqdm import tqdm
from matplotlib import pyplot as plt
from scipy.stats import poisson
from mpl_toolkits.mplot3d import Axes3D



In [None]:

def poisson_pmf(lambd: float, n: int) -> float:
    """
    Calculate the Poisson probability of observing n events.
    
    :param lambd: The expected number of occurrences (rate parameter).
    :param n: The actual number of occurrences.
    :return: The probability of observing n events.
    """
    try:
        probability = (math.exp(-lambd) * lambd**n) / math.factorial(n)
        return probability
    except (OverflowError, ValueError):
        # If n is too large, math.factorial(n) will raise an OverflowError
        # If lambd or n is negative, it will raise a ValueError
        print("Error: The input values are not valid.")
        return 0

def all_states():
    """returns an iterator of all possible states (n_cars_a, n_cars_b)"""
    states = []
    for n_cars_a in range(MAX_CARS_AT_LOCATION + 1):
        for n_cars_b in range(MAX_CARS_AT_LOCATION + 1):
            states.append((n_cars_a, n_cars_b))
    #shuffle the states to make the learning more robust
    random.shuffle(states)
    return states

def simulate_environment(n_cars_a, n_cars_b, action):
    """"Simulate the environment for a given state and action. Return a list of (cars_a, cars_b, reward, probability) tuples."""

    max_requests_a = np.ceil(LAMBDA_REQUESTS_A + 3 * np.sqrt(LAMBDA_REQUESTS_A)).astype(int)
    max_requests_b = np.ceil(LAMBDA_REQUESTS_B + 3 * np.sqrt(LAMBDA_REQUESTS_B)).astype(int)
    max_returns_a = np.ceil(LAMBDA_RETURNS_A + 3 * np.sqrt(LAMBDA_RETURNS_A)).astype(int)
    max_returns_b = np.ceil(LAMBDA_RETURNS_B + 3 * np.sqrt(LAMBDA_RETURNS_B)).astype(int)

    all_transitions = []

    for requested_a in range(max_requests_a+1):
        for requested_b in range(max_requests_b+1):
            for returned_a in range(max_returns_a+1):
                for returned_b in range(max_returns_b+1):
                    prob_request_a = poisson_pmf(LAMBDA_REQUESTS_A, requested_a)
                    prob_request_b = poisson_pmf(LAMBDA_REQUESTS_B, requested_b)
                    prob_return_a = poisson_pmf(LAMBDA_RETURNS_A, returned_a)
                    prob_return_b = poisson_pmf(LAMBDA_RETURNS_B, returned_b)

                    probability = prob_request_a * prob_request_b * prob_return_a * prob_return_b
                    # calculate profit, ensure we dont rent more cars than we have
                    cars_rented_a = min(requested_a, n_cars_a)
                    cars_rented_b = min(requested_b, n_cars_b)
                    daily_profit = (cars_rented_a + cars_rented_b) * PROFIT_PER_CAR_RENTED

                    # update car counts
                    n_cars_a = min(n_cars_a - cars_rented_a + returned_a, MAX_CARS_AT_LOCATION)
                    n_cars_b = min(n_cars_b - cars_rented_b + returned_b, MAX_CARS_AT_LOCATION)

                    # Calculate the number of cars after moving them, without modifying n_cars_a and n_cars_b
                    if action > 0:
                        cars_moved = min(action, n_cars_a, MAX_CARS_AT_LOCATION - n_cars_b)
                    elif action < 0:
                        cars_moved = min(-action, n_cars_b, MAX_CARS_AT_LOCATION - n_cars_a)
                    else:
                        cars_moved = 0

                        
                    # Calculate new state without modifying the original state
                    new_n_cars_a = n_cars_a - cars_moved if action > 0 else n_cars_a + abs(action)
                    new_n_cars_b = n_cars_b + cars_moved if action > 0 else n_cars_b - abs(action)

                    # Ensure the new state is within the max car constraints
                    new_n_cars_a = int(min(max(new_n_cars_a, 0), MAX_CARS_AT_LOCATION))
                    new_n_cars_b = int(min(max(new_n_cars_b, 0), MAX_CARS_AT_LOCATION))

                    daily_profit -= abs(cars_moved) * COST_PER_CAR_MOVED
                    all_transitions.append((new_n_cars_a, new_n_cars_b, daily_profit, probability))
    
    return all_transitions

# def sample_transition(n_cars_a, n_cars_b, action):
#     # Sample the number of requests and returns for both locations
#     requests_a = poisson.rvs(LAMBDA_REQUESTS_A)
#     requests_b = poisson.rvs(LAMBDA_REQUESTS_B)
#     returns_a = poisson.rvs(LAMBDA_RETURNS_A)
#     returns_b = poisson.rvs(LAMBDA_RETURNS_B)

#     # Calculate rented cars, ensuring we don't rent more than we have
#     cars_rented_a = min(requests_a, n_cars_a)
#     cars_rented_b = min(requests_b, n_cars_b)
    
#     # Calculate the reward (profit from renting)
#     reward = (cars_rented_a + cars_rented_b) * PROFIT_PER_CAR_RENTED

#     # Move cars
#     if action > 0:
#         cars_moved = min(action, n_cars_a, MAX_CARS_AT_LOCATION - n_cars_b)
#     elif action < 0:
#         cars_moved = min(-action, n_cars_b, MAX_CARS_AT_LOCATION - n_cars_a)
#     else:
#         cars_moved = 0
    
#     # Update the number of cars for each location after moving them
#     new_n_cars_a = n_cars_a - cars_moved if action > 0 else n_cars_a + abs(action)
#     new_n_cars_b = n_cars_b + cars_moved if action > 0 else n_cars_b - abs(action)
    
#     # Ensure new car counts do not exceed location capacity or fall below zero
#     new_n_cars_a = max(min(new_n_cars_a + returns_a - cars_rented_a, MAX_CARS_AT_LOCATION), 0)
#     new_n_cars_b = max(min(new_n_cars_b + returns_b - cars_rented_b, MAX_CARS_AT_LOCATION), 0)
    
#     # Update the reward with the cost of moving cars
#     reward -= abs(cars_moved) * COST_PER_CAR_MOVED

#     # No need for probability as we are sampling directly
#     return new_n_cars_a, new_n_cars_b, reward


In [None]:

MAX_CARS_AT_LOCATION = 20
MAX_CARS_MOVED = 5
COST_PER_CAR_MOVED = 2
PROFIT_PER_CAR_RENTED = 10
LAMBDA_REQUESTS_A = 3
LAMBDA_RETURNS_A = 3
LAMBDA_REQUESTS_B = 4
LAMBDA_RETURNS_B = 2
GAMMA = 0.9
THETA = 1

policy = np.random.randint(-MAX_CARS_MOVED,MAX_CARS_MOVED,size=(MAX_CARS_AT_LOCATION + 1, MAX_CARS_AT_LOCATION + 1)) 
value = np.random.randint(-500,500, size=(MAX_CARS_AT_LOCATION + 1, MAX_CARS_AT_LOCATION + 1))


In [None]:

def policy_evaluation():

    while True:
        delta = 0
        for n_cars_a, n_cars_b in all_states():

            old_value = value[n_cars_a][ n_cars_b]
            action = policy[n_cars_a][ n_cars_b]
            
            # Get the list of all possible next states, rewards and probabilities
            transitions = simulate_environment(n_cars_a, n_cars_b, action)

            # Calculate the new value estimate
            new_value = 0

            for (next_n_cars_a, next_n_cars_b, reward, prob) in transitions:
                state_value = value[next_n_cars_a][next_n_cars_b]
                state_value = (state_value * GAMMA) + reward
                new_value += prob * state_value

            # Update the value function
            value[n_cars_a][ n_cars_b] = new_value
            delta = max(delta, abs(old_value - new_value))

        print("Delta: ", delta)
        if delta < THETA:
            break


def policy_improvement():
    policy_stable = True
    for n_car_a, n_car_b in all_states():
        old_action = policy[n_car_a][n_car_b]
        best_action_value = float('-inf')  # Reset the best action value for each state
        best_action = None

        # Evaluate each action's expected return
        for action in range(-MAX_CARS_MOVED, MAX_CARS_MOVED + 1):
            transitions = simulate_environment(n_car_a, n_car_b, action)
            action_value = 0

            # Calculate expected return for this action
            for (next_n_cars_a, next_n_cars_b, reward, prob) in transitions:
                state_value = value[next_n_cars_a][next_n_cars_b]
                action_value += prob * (reward + GAMMA * state_value)
                print("Action value: ", action_value, "best action value", best_action_value)
            # Update best action if this action is better than the current best
            if action_value > best_action_value:
                best_action = action
                best_action_value = action_value

        # Update the policy if a better action was found
        if best_action is not None and old_action != best_action:
            print("Policy changed for state: ", n_car_a, n_car_b, " from ", old_action, " to ", best_action)
            policy_stable = False
            policy[n_car_a][n_car_b] = best_action
        else:
            print("Policy stable for state: ", n_car_a, n_car_b)
            print("Old action: ", old_action)
            print("Best action: ", best_action)
            print("Best action value: ", best_action_value)


    return policy_stable



In [None]:
policy_improvement()

In [None]:
while True:
    policy_evaluation()
    if policy_improvement():
        break

In [None]:
def plot_policy(policy_matrix: np.ndarray):
    """
    Plot the optimal policy as a contour plot.

    Parameters:
    - policy_matrix: A numpy matrix representing the policy, where each entry policy_matrix[i, j] is the
                     optimal action to take in state (i, j).
    """
    plt.figure(figsize=(7, 7))
    plt.contourf(policy_matrix, levels=np.arange(-5, 6), cmap='jet')
    plt.colorbar()
    plt.title('Optimal Policy ($\pi^*$)')
    plt.xlabel('# Cars at first location')
    plt.ylabel('# Cars at second location')
    plt.show()

plot_policy(policy)


In [None]:
def plot_value_function(value_matrix: np.ndarray):
    """
    Plot the value function as a 3D surface plot.

    Parameters:
    - value_matrix: A numpy matrix representing the value function, where each entry value_matrix[i, j]
                    is the value of the state (i, j).
    """
    fig = plt.figure(figsize=(12, 7))
    ax = fig.add_subplot(111, projection='3d')
    X = np.arange(0, value_matrix.shape[0], 1)
    Y = np.arange(0, value_matrix.shape[1], 1)
    X, Y = np.meshgrid(X, Y)
    ax.plot_surface(X, Y, value_matrix, cmap='viridis')
    ax.set_title('Value Function $V^{\pi^*}$')
    ax.set_xlabel('# Cars at first location')
    ax.set_ylabel('# Cars at second location')
    plt.show()


plot_value_function(value)

In [None]:
policy
