In [1]:
import numpy as np 
import gym

In [2]:
# create aFood Truck class with gym.Env module
class FoodTruck(gym.Env):
    def __init__(self):
        self.v_demand = [100, 200, 300, 400]
        self.p_demand = [0.3, 0.4, 0.2, 0.1]
        self.capacity = self.v_demand[-1]
        self.days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', "Weekend"]
        self.unit_cost = 4
        self.net_revenue = 7
        self.action_space = [0, 100, 200, 300, 400]
        self.state_space = [("Mon", 0)] + [(d, i) for d in self.days[1:] 
                                           for i in [0, 100, 200, 300]]
        
    def get_next_state_reward(self, state, action, demand):
        day, inventory = state
        result = {}
        result['next_day'] = self.days[self.days.index(day) + 1]
        result['starting_inventory'] = min(self.capacity, inventory + action)
        result['cost'] = self.unit_cost * action
        result['sales'] = min(result['starting_inventory'], demand)
        result['revenue'] = self.net_revenue * result['sales']
        result['next_inventory'] = result['starting_inventory'] - result['sales']
        result['reward'] = result['revenue'] - result['cost']
        return result
    
    def get_transition_prob(self, state, action):
        next_s_r_prob = {}
        for ix, demand in enumerate(self.v_demand):
            result = self.get_next_state_reward(state, action, demand)
            next_s = (result['next_day'], result['next_inventory'])
            reward = result['reward']
            prob = self.p_demand[ix]
            if (next_s, reward) not in next_s_r_prob:
                next_s_r_prob[next_s, reward] = prob
            else:
                next_s_r_prob[next_s, reward] += prob
        return next_s_r_prob
    
    def reset(self):
        self.day = "Mon"
        self.inventory = 0
        state = (self.day, self.inventory)
        return state
    
    def is_terminal(self, state):
        day, inventory = state
        if day == "Weekend":
            return True
        else:
            return False
        
    def step(self, action):
        demand = np.random.choice(self.v_demand, p=self.p_demand)
        result = self.get_next_state_reward((self.day, self.inventory), 
                                           action, demand)
        self.day = result['next_day']
        self.inventory = result['next_inventory']
        state = (self.day, self.inventory)
        reward = result['reward']
        done = self.is_terminal(state)
        info = {'demand': demand, 'sales': result['sales']}
        return state, reward, done, info

In [3]:
# simulating an arbitrary policy
np.random.seed(0)
foodtruck = FoodTruck()
rewards = []
for i_episode in range(10000):
    state = foodtruck.reset()
    done = False
    ep_reward = 0
    while not done:
        day, inventory = state
        action = max(0, 300 - inventory)
        state, reward, done, info = foodtruck.step(action)
        ep_reward += reward
    rewards.append(ep_reward)
np.mean(rewards)

2590.83

In [4]:
# # single day expected reward
# ucost = 4
# uprice = 7
# v_demand = [100, 200, 300, 400]
# p_demand = [0.3, 0.4, 0.2, 0.1]
# inv = 400
# profit = uprice * np.sum([p_demand[i]*min(v_demand[i], inv) for i in range(4)]) - inv * ucost
# print(profit)

In [5]:
# v_demand = [100, 200, 300, 400]
# p_demand = [0.3, 0.4, 0.2, 0.1]
# capacity = v_demand[-1]
# days = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', "Weekend"]
# unit_cost = 4
# net_revenue = 7
# action_space = [0, 100, 200, 300, 400]
# state_space = [("Mon", 0)] + [(d, i) for d in days[1:] for i in [0, 100, 200, 300]]
# state_space

In [6]:
# def get_next_state_reward(state, action, demand):
#     day, inventory = state
#     result = {}
#     result['next_day'] = days[days.index(day) + 1]
#     result['starting_inventory'] = min(capacity, inventory + action)
#     result['cost'] = unit_cost * action
#     result['sales'] = min(result['starting_inventory'], demand)
#     result['revenue'] = net_revenue * result['sales']
#     result['next_inventory'] = result['starting_inventory'] - result['sales']
#     result['reward'] = result['revenue'] - result['cost']
#     return result
    
# next_s_r_prob = {}
# for ix, demand in enumerate(v_demand):
#     result = get_next_state_reward(state_space[1], action_space[-1], demand)
#     next_s = (result['next_day'], result['next_inventory'])
#     reward = result['reward']
#     prob = p_demand[ix]
#     if (next_s, reward) not in next_s_r_prob:
#         next_s_r_prob[next_s, reward] = prob
#     else:
#         next_s_r_prob[next_s, reward] = prob
# next_s_r_prob

## Policy Evaluation

In [7]:
def base_policy(states):
    policy = {}
    for s in states:
        day, inventory = s
        prob_a = {} 
        if inventory >= 300:
            prob_a[0] = 1
        else:
            prob_a[200 - inventory] = 0.5
            prob_a[300 - inventory] = 0.5
        policy[s] = prob_a
    return policy

In [8]:
def expected_update(env, v, s, prob_a, gamma):
    expected_value = 0
    for a in prob_a:
        prob_next_s_r = env.get_transition_prob(s, a)
        for next_s, r in prob_next_s_r:
            expected_value += prob_a[a] * prob_next_s_r[next_s, r] * (r + gamma * v[next_s])
    return expected_value

In [9]:
def policy_evaluate(env, policy, max_iter=100,
                    v=None, eps=0.1, gamma=1):
    if not v:
        v = {s: 0 for s in env.state_space}
    k = 0
    while True:
        max_delta = 0
        for s in v:
            if not env.is_terminal(s):
                v_old = v[s]
                prob_a = policy[s]
                v[s] = expected_update(env, v, s, prob_a, gamma)
                max_delta = max(max_delta, abs(v[s] - v_old))
        
        k += 1
        
        if max_delta < eps:
            print("Converged in ", k, "iterations.")
            break
        elif k == max_iter:
            print("Terminating after ", k, 'iterations.')
            break
    return v

In [10]:
# create a foodtruck object from the class
foodtruck = FoodTruck()

# get the base policy for the environment
policy = base_policy(foodtruck.state_space)

In [11]:
# evaluate the base policy and get the corresponding state values
v = policy_evaluate(foodtruck, policy)
print("Expected weekly profit:", v["Mon", 0])

Converged in  6 iterations.
Expected weekly profit: 2515.0


In [12]:
print("The state values:")
v

The state values:


{('Mon', 0): 2515.0,
 ('Tue', 0): 1960.0,
 ('Tue', 100): 2360.0,
 ('Tue', 200): 2760.0,
 ('Tue', 300): 3205.0,
 ('Wed', 0): 1405.0,
 ('Wed', 100): 1805.0,
 ('Wed', 200): 2205.0,
 ('Wed', 300): 2650.0,
 ('Thu', 0): 850.0000000000001,
 ('Thu', 100): 1250.0,
 ('Thu', 200): 1650.0,
 ('Thu', 300): 2095.0,
 ('Fri', 0): 295.00000000000006,
 ('Fri', 100): 695.0000000000001,
 ('Fri', 200): 1095.0,
 ('Fri', 300): 1400.0,
 ('Weekend', 0): 0,
 ('Weekend', 100): 0,
 ('Weekend', 200): 0,
 ('Weekend', 300): 0}

In [14]:
def choose_action(state, policy):
    prob_a = policy[state]
    action = np.random.choice(a=list(prob_a.keys()),
                              p=list(prob_a.values()))
    return action

def simulate_policy(policy, n_episodes):
    np.random.seed(0)
    foodtruck = FoodTruck()
    rewards = []
    for i_episode in range(n_episodes):
        state = foodtruck.reset()
        done = False
        ep_reward = 0
        while not done:
            action = choose_action(state, policy)
            state, reward, done, info = foodtruck.step(action)
            ep_reward += reward
        rewards.append(ep_reward)
    print("Expected weekly profit:", np.mean(rewards))

In [15]:
simulate_policy(policy, 1000)

Expected weekly profit: 2518.1


## Policy Iteration

In [20]:
def policy_improvement(env, v, s, actions, gamma):
    prob_a = {}
    if not env.is_terminal(s):
        max_q = np.NINF     # negative infinity
        best_a = None
        for a in actions:
            q_sa = expected_update(env, v, s, {a: 1}, gamma)
            if q_sa >= max_q:
                max_q = q_sa
                best_a = a
        prob_a[best_a] = 1
    else:
        max_q = 0
    return prob_a, max_q

In [21]:
def policy_iteration(env, eps=0.1, gamma=1):
    np.random.seed(1)
    states = env.state_space
    actions = env.action_space
    policy = {s : {np.random.choice(actions): 1}
              for s in states}
    v = {s: 0 for s in states}
    while True:
        v = policy_evaluate(env, policy, v=v,
                            eps=eps, gamma=gamma)
        old_policy = policy
        policy = {}
        for s in states:
            policy[s], _ = policy_improvement(env, v, s,
                                              actions, gamma)
        if old_policy == policy:
            break
    print("Optimal policy found!")
    return policy, v

In [22]:
policy, v = policy_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])

Converged in  6 iterations.
Converged in  6 iterations.
Converged in  5 iterations.
Optimal policy found!
Expected weekly profit: 2880.0


In [23]:
print(policy)

{('Mon', 0): {400: 1}, ('Tue', 0): {400: 1}, ('Tue', 100): {300: 1}, ('Tue', 200): {200: 1}, ('Tue', 300): {100: 1}, ('Wed', 0): {400: 1}, ('Wed', 100): {300: 1}, ('Wed', 200): {200: 1}, ('Wed', 300): {100: 1}, ('Thu', 0): {300: 1}, ('Thu', 100): {200: 1}, ('Thu', 200): {100: 1}, ('Thu', 300): {0: 1}, ('Fri', 0): {200: 1}, ('Fri', 100): {100: 1}, ('Fri', 200): {0: 1}, ('Fri', 300): {0: 1}, ('Weekend', 0): {}, ('Weekend', 100): {}, ('Weekend', 200): {}, ('Weekend', 300): {}}


## Value iteration

In [26]:
def value_iteration(env, max_iter=100, eps=0.1, gamma=1):
    states = env.state_space
    actions = env.action_space
    v = {s: 0 for s in states}
    policy = {}
    k = 0
    while True:
        max_delta = 0
        for s in states:
            old_v = v[s]
            policy[s], v[s] = policy_improvement(env, v, s, actions, gamma)
            max_delta = max(max_delta, abs(v[s] - old_v))
        k += 1
        if max_delta < eps:
            print("Converged in", k, "iterations.")
            break
        elif k == max_iter:
            print('Terminating after', k, 'iterations.')
            break
    return policy, v

In [27]:
policy, v = value_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])

Converged in 6 iterations.
Expected weekly profit: 2880.0


In [28]:
print(policy)

{('Mon', 0): {400: 1}, ('Tue', 0): {400: 1}, ('Tue', 100): {300: 1}, ('Tue', 200): {200: 1}, ('Tue', 300): {100: 1}, ('Wed', 0): {400: 1}, ('Wed', 100): {300: 1}, ('Wed', 200): {200: 1}, ('Wed', 300): {100: 1}, ('Thu', 0): {300: 1}, ('Thu', 100): {200: 1}, ('Thu', 200): {100: 1}, ('Thu', 300): {0: 1}, ('Fri', 0): {200: 1}, ('Fri', 100): {100: 1}, ('Fri', 200): {0: 1}, ('Fri', 300): {0: 1}, ('Weekend', 0): {}, ('Weekend', 100): {}, ('Weekend', 200): {}, ('Weekend', 300): {}}
