In [1]:
import numpy as np
from food_truck_env import FoodTruck

In [2]:
def expected_update(env, v, s, prob_a, gamma):
    """
    Returns ∑𝜋(𝑎|𝑠) ∑𝑝(𝑠′, 𝑟|𝑠, 𝑎)[𝑟 + 𝛾𝑣(𝑠)] for given state s, action probabilites and current state-value function
    """
    expected_value = 0
    for a in prob_a:
        prob_next_s_r = env.get_transition_prob(s, a)
        for next_s, r in prob_next_s_r:
            expected_value += prob_a[a] * prob_next_s_r[next_s, r] * (r + gamma * v[next_s])

    return expected_value

In [3]:
def policy_evaluation(env, policy, max_iter=100, v=None, eps=0.1, gamma=1):
    """
    Calls expected_update for each state in given policy multiple times, until convergence
    And returns state-value function v
    """
    if not v:
        v = {s: 0 for s in env.state_space}

    k = 0
    while True:
        max_delta = 0
        for s in v:
            if not env.is_terminal(s):
                v_old = v[s] # The reason we keep v_old is just to measure delta, which is used to terminate the function
                prob_a = policy[s]
                v[s] = expected_update(env, v, s, prob_a, gamma)

                max_delta = max(max_delta, abs(v[s] - v_old))

        k += 1
        if max_delta < eps:
            print(f"Converged in {k} iterations")
            break
        elif k == max_iter:
            print(f"Terminating after {k} iterations")
            break

    return v

In [4]:
def some_policy(states):
    policy = {}
    for s in states:
        day, inventory = s
        prob_a = {}
        
        if inventory >= 300:
            prob_a[0] = 1
        else:
            prob_a[200 - inventory] = 0.5
            prob_a[300 - inventory] = 0.5

        policy[s] = prob_a
    
    return policy

In [5]:
food_truck = FoodTruck()
policy = some_policy(food_truck.state_space)

In [6]:
v = policy_evaluation(food_truck, policy)
print("Expected weekly profit for some policy is: ", v["Mon", 0])

Converged in 6 iterations
Expected weekly profit for some policy is:  2515.0


Lets simulate the environment to see if we can really get 2515

In [7]:
def choose_action(state, policy):
    prob_a = policy[state]
    action = np.random.choice(a=list(prob_a.keys()), p=list(prob_a.values()))

    return action

In [8]:
def simulate_policy(policy, n_episodes):
    env = FoodTruck()
    rewards = []

    for i_episode in range(n_episodes):
        state = env.reset()
        done = False
        ep_reward = 0

        while not done:
            action = choose_action(state, policy)
            state, reward, done, _ = env.step(action)

            ep_reward += reward

        rewards.append(ep_reward)

    print(f"Expected weekly profit: {np.mean(rewards)}")

In [9]:
simulate_policy(policy, 100)

Expected weekly profit: 2629.0


We see that simulation gives very similar result, now lets use policy iteration to find better policy

In [10]:
def policy_improvement(env, v, s, actions, gamma):
    """
    Maps the given state to an action based on the current value-function
    Mapping based on best action that can be taken from that state
    """
    prob_a = {}
    if not env.is_terminal(s):
        max_q = np.NINF
        best_a = None

        for a in actions:
            q_sa = expected_update(env, v, s, {a: 1}, gamma)
            if q_sa >= max_q:
                max_q = q_sa
                best_a = a

        prob_a[best_a] = 1
    else:
        # Terminal state has always state value 1 since we can not get any reward from that state
        max_q = 0

    return prob_a, max_q

In [11]:
def policy_iteration(env, eps=0.1, gamma=1):
    states = env.state_space
    actions = env.action_space

    policy = {s: {np.random.choice(actions): 1} for s in states}
    v = {s: 0 for s in states}

    while True:
        v = policy_evaluation(env, policy, v=v, eps=eps, gamma=gamma)
        old_policy = policy

        policy = {}
        for s in states:
            policy[s], _ = policy_improvement(env, v, s, actions, gamma)

        if old_policy == policy:
            break
    
    print("Optimal policy found!")
    return policy, v

In [12]:
optimal_policy, v = policy_iteration(food_truck)
print(f"Expected weekly profit for optimal policy: {v['Mon', 0]}")

Converged in 6 iterations
Converged in 6 iterations
Converged in 5 iterations
Optimal policy found!
Expected weekly profit for optimal policy: 2880.0


We successfully found the optimal policy using policy iteration, but we achieve the same thing with value iteration that is more efficient

In [13]:
def value_iteration(env, max_iter=100, eps=0.1, gamma=1):
    states = env.state_space
    actions = env.action_space

    v = {s: 0 for s in states}
    policy = {}

    k = 0
    while True:
        max_delta = 0
        for s in states:
            old_v = v[s]
            policy[s], v[s] = policy_improvement(env, v, s, actions, gamma)
            
            max_delta = max(max_delta, abs(v[s] - old_v))

        k += 1
        if max_delta < eps:
            print(f"Converged in {k} iterations")
            break
        elif k == max_iter:
            print(f"Terminating after {k} iterations")
            break

    return policy, v

In [14]:
optimal_policy, v = value_iteration(food_truck)
print(f"Expected weekly profit for optimal policy: {v['Mon', 0]}")

Converged in 6 iterations
Expected weekly profit for optimal policy: 2880.0
