In [1]:
import numpy as np
import gym

In [2]:
class FoodTruck(gym.Env):
    def __init__(self):
        """
        Simulation environment of a food truck.
        """        
        self.demand_values = [100, 200, 300, 400]
        self.demand_probabilities = [0.3, 0.4, 0.2, 0.1]
        self.capacity = 400
        self.days = ['Mon', 'Tue', 'Wed',
                    'Thu', 'Fri', 'Weekend']
        self.unit_cost = 4
        self.net_revenue = 7
        self.action_space = [0, 100, 200, 300, 400]
        self.state_space = [("Mon", 0)] \
                            + [(day, inventory) for day in self.days[1:] for inventory in [0, 100, 200, 300]]

    def get_next_state_reward(self, state, action, demand):
        """
        Get the next state based on the current statue, action and demand.

        Parameters
        ----------
        state : tuple
            The state is defined through the day and inventory: state = (day, inventory)
        action : int
            Number of burger patties bought to refill the inventory
        demand : int
            Number of burgers requested 

        Returns
        -------
        dict
            Resulting consequences for the next state
        """        
        day, inventory = state
        result = dict()
        result['next_day'] = self.days[self.days.index(day) + 1]
        result['starting_inventory'] = min(self.capacity, inventory + action)
        result['cost'] = self.unit_cost * action
        result['sales'] = min(result['starting_inventory'], demand)
        result['revenue'] = self.net_revenue * result['sales']
        result['next_inventory'] = result['starting_inventory'] - result['sales']
        result['reward'] = result['revenue'] - result['cost']
        return result

    def get_transition_probability(self, state, action):
        """
        Get the transition probability values for the given state and action.
        Rate the next state and the according reward with the probability of the event.
        The probability is derived from the assumed probability of the demand. 

        Parameters
        ----------
        state : tuple
            The state is defined through the day and inventory: state = (day, inventory)
        action : int
            Number of burger patties bought to refill the inventory

        Returns
        -------
        dict
            State and reward tuples with corresponding probability
        """        
        next_state_and_reward_probability = dict()
        for index, demand in enumerate(self.demand_values):
            result = self.get_next_state_reward(state, action, demand)
            next_state = (result['next_day'], result['next_inventory'])
            reward = result['reward']
            probability = self.demand_probabilities[index]
            if (next_state, reward) not in next_state_and_reward_probability:
                next_state_and_reward_probability[next_state, reward] = probability
            else:
                next_state_and_reward_probability[next_state, reward] += probability
        return next_state_and_reward_probability
    
    def is_terminal(self, state):
        day, _ = state
        if day == 'Weekend':
            return True
        else:
            return False
        
    def reset(self):
        self.day = "Mon"
        self.inventory = 0
        return (self.day, self.inventory)
    
    def step(self, action):
        """
        Simulate the environment for one time step for the current state and action.

        Parameters
        ----------
        action : int
            Action at the current state. Number of burger patties to buy for refilling the inventory.
        """        
        demand = np.random.choice(a=self.demand_values, p=self.demand_probabilities)
        result = self.get_next_state_reward(
            state=(self.day, self.inventory),
            demand=demand,
            action=action)
        self.day = result['next_day']
        self.inventory = result['next_inventory']
        state = (self.day, self.inventory)
        reward = result['reward']
        done = self.is_terminal(state)
        info = {'demand': demand, 'sales': result['sales']}
        return state, reward, done, info
        

# Policy evaluation

In [3]:
def base_policy(states):
    """
    Defining a base policy. States are mapped to actions.

    Parameters
    ----------
    states : list
        List of environment states

    Returns
    -------
    dict
        The resulting policy. Keys are states while the values are itself dictionaries composed 
        of action probability pairs.
    """    
    policy = {}
    for state in states:
        action_probability_pairs = dict()
        day, inventory = state
        if inventory >= 300:
            action_probability_pairs[0] = 1.0
        else:
            action_probability_pairs[400 - inventory] = 0.5
            action_probability_pairs[300 - inventory] = 0.5
        policy[state] = action_probability_pairs
    return policy

In [4]:
def expected_update(env, value, state, action_probability_pairs, gamma):
    """
    Update the expected value.
    For a given current state, the transition probabilities are calculated. To this end,
    the probabilities for the next state with the corresponding reward are available. From this
    events, the expected value is calculated. The expected value is the summed over all events. 
    
    Converting the Bellman equation into an update rule.
    

    Parameters
    ----------
    env : _type_
        _description_
    value : _type_
        _description_
    state : _type_
        _description_
    action_probability_pairs : _type_
        _description_
    gamma : _type_
        _description_

    Returns
    -------
    _type_
        _description_
    """    
    expected_value = 0
    for action in action_probability_pairs:
        probability_next_state_and_reward = env.get_transition_probability(state, action)
        for next_state, reward in probability_next_state_and_reward:
            expected_value += action_probability_pairs[action] \
                            * probability_next_state_and_reward[next_state, reward] \
                            * (reward + gamma * value[next_state])
    return expected_value

In [5]:
def policy_evaluation(env, policy, max_iter=100,
                      value=None, eps=0.1, gamma=1):
    """
    Policy evaluation executes expected updates for all states until the state value converges or 
    it reaches the maximum number of iterations.

    Parameters
    ----------
    env : class object
        simulation environment
    policy : dict
        A policy that maps states to actions. Keys are states while the values are itself dictionaries composed 
        of action probability pairs.
    max_iter : int, optional
        Maximum number of iterations if algorithm does not converge earlier, by default 100
    value : _type_, optional
        Expected discounted cumulative reward starting in state s and following the policy, by default None
    eps : float, optional
        Acceptance criteria for the convergence of the algorithm, by default 0.1
    gamma : int, optional
        Discount factor, by default 1

    Returns
    -------
    dict
        Converged expected discounted cumulative rewards
    """    
    if not value:
        value = {state: 0 for state in env.state_space}
    k = 0
    while True:
        max_delta = 0
        for state in value:
            if not env.is_terminal(state):
                value_old = value[state]
                action_probability_pairs = policy[state]
                value[state] = expected_update(env, value, state, action_probability_pairs, gamma)
                max_delta = max(max_delta, abs(value[state]- value_old))
        k += 1
        if max_delta < eps:
            print(f"Converged in {k} iterations.")
            break
        elif k == max_iter:
            print(f"Terminating after {k} iterations.")
            break
    return value

In [6]:
def choose_action(state, policy):
    action_probability = policy[state]
    action = np.random.choice(
        a=list(action_probability.keys()),
        p=list(action_probability.values())
        )
    return action


In [7]:
def simulate_policy(policy, num_episodes):
    np.random.seed(0)
    foodTruck = FoodTruck()
    rewards = []
    for _ in range(num_episodes):
        state = foodTruck.reset()
        done = False
        episode_reward = 0
        while not done:
            action = choose_action(state, policy=policy)
            state, reward, done, info = foodTruck.step(action)
            episode_reward += reward
        rewards.append(episode_reward)
    print(f"Expected weekly profit: {np.mean(rewards)}")
        

In [33]:
foodTruck = FoodTruck()
policy = base_policy(foodTruck.state_space)

In [35]:
value = policy_evaluation(env=foodTruck, policy=policy)
print(f"Expected weekly profit: {value['Mon', 0]}")

Converged in 6 iterations.
Expected weekly profit: 2510.3852187500006


## Simulate the environment

In [36]:
simulate_policy(policy=policy, num_episodes=100)

Expected weekly profit: 2575.0


# Monte Carlo estimation of state values

In [10]:
def first_visit_return(returns, trajectory, gamma):
    """
    Calculate the returns from the first visit on for each state that appears in the trajectory.

    Parameters
    ----------
    returns : dict
        Returns corresponding to trajectory. Keys are states and values are list of returns calculated from some other trajectory.
    trajectory : list
        List of state, action, reward tuples
    gamma : float
        Discount factor

    Returns
    -------
    dict
        Updated returns
    """    
    G = 0 # cumulated discounted reward
    T = len(trajectory) - 1
    for t, state_action_reward in enumerate(reversed(trajectory)):
        state, action, reward = state_action_reward
        G = reward + gamma * G
        first_visit = True
        for j in range(T - t):
            if state == trajectory[j][0]:
                first_visit = False
            if first_visit:
                if state in returns:
                    returns[state].append(G)
                else:
                    returns[state] = [G]
    return returns

In [11]:
def get_trajectory(env, policy):
    trajectory = []
    state = env.reset()
    done = False
    state_action_reward = [state]
    while not done:
        action = choose_action(state, policy)
        state, reward, done, info = env.step(action)
        state_action_reward.append(action)
        state_action_reward.append(reward)
        trajectory.append(state_action_reward)
        state_action_reward = [state]
    return trajectory

In [12]:
def first_visit_mc(env, policy, gamma, num_trajectories):
    np.random.seed(0)
    returns = {}
    values = {}
    for _ in range(num_trajectories):
        trajectory = get_trajectory(env, policy)
        returns = first_visit_return(returns, trajectory, gamma)
    for state in env.state_space:
        if state in returns:
            values[state] = np.round(np.mean(returns[state]), 1)
    return values

In [13]:
def first_visit_mc(env, policy, gamma, num_trajectories):
    np.random.seed(0)
    returns = {}
    values = {}
    for _ in range(num_trajectories):
        trajectory = get_trajectory(env, policy)
        returns = first_visit_return(returns, trajectory, gamma)
    for state in env.state_space:
        if state in returns:
            values[state] = np.round(np.mean(returns[state]), 1)
    return values

In [14]:
foodTruck = FoodTruck()
policy = base_policy(foodTruck.state_space)

In [18]:
foodTruck.state_space

[('Mon', 0),
 ('Tue', 0),
 ('Tue', 100),
 ('Tue', 200),
 ('Tue', 300),
 ('Wed', 0),
 ('Wed', 100),
 ('Wed', 200),
 ('Wed', 300),
 ('Thu', 0),
 ('Thu', 100),
 ('Thu', 200),
 ('Thu', 300),
 ('Fri', 0),
 ('Fri', 100),
 ('Fri', 200),
 ('Fri', 300),
 ('Weekend', 0),
 ('Weekend', 100),
 ('Weekend', 200),
 ('Weekend', 300)]

In [15]:
policy

{('Mon', 0): {400: 0.5, 300: 0.5},
 ('Tue', 0): {400: 0.5, 300: 0.5},
 ('Tue', 100): {300: 0.5, 200: 0.5},
 ('Tue', 200): {200: 0.5, 100: 0.5},
 ('Tue', 300): {0: 1.0},
 ('Wed', 0): {400: 0.5, 300: 0.5},
 ('Wed', 100): {300: 0.5, 200: 0.5},
 ('Wed', 200): {200: 0.5, 100: 0.5},
 ('Wed', 300): {0: 1.0},
 ('Thu', 0): {400: 0.5, 300: 0.5},
 ('Thu', 100): {300: 0.5, 200: 0.5},
 ('Thu', 200): {200: 0.5, 100: 0.5},
 ('Thu', 300): {0: 1.0},
 ('Fri', 0): {400: 0.5, 300: 0.5},
 ('Fri', 100): {300: 0.5, 200: 0.5},
 ('Fri', 200): {200: 0.5, 100: 0.5},
 ('Fri', 300): {0: 1.0},
 ('Weekend', 0): {400: 0.5, 300: 0.5},
 ('Weekend', 100): {300: 0.5, 200: 0.5},
 ('Weekend', 200): {200: 0.5, 100: 0.5},
 ('Weekend', 300): {0: 1.0}}

In [16]:
estimated_values = first_visit_mc(env=foodTruck, policy=policy, gamma=1.0, num_trajectories=1000)

In [17]:
estimated_values

{('Tue', 0): 1902.2,
 ('Tue', 100): 2335.6,
 ('Tue', 200): 2715.9,
 ('Tue', 300): 3086.8,
 ('Wed', 0): 1341.1,
 ('Wed', 100): 1729.8,
 ('Wed', 200): 2092.1,
 ('Wed', 300): 2389.6,
 ('Thu', 0): 634.4,
 ('Thu', 100): 1043.6,
 ('Thu', 200): 1557.2,
 ('Thu', 300): 1867.5,
 ('Fri', 0): 119.8,
 ('Fri', 100): 384.2,
 ('Fri', 200): 842.6,
 ('Fri', 300): 1444.7}

In [19]:
true_values = policy_evaluation(env=foodTruck, policy=policy)

Converged in 6 iterations.


In [20]:
true_values

{('Mon', 0): 2510.3852187500006,
 ('Tue', 0): 1897.4318750000002,
 ('Tue', 100): 2297.431875,
 ('Tue', 200): 2697.431875,
 ('Tue', 300): 3083.7875000000004,
 ('Wed', 0): 1283.7875000000001,
 ('Wed', 100): 1683.7875000000004,
 ('Wed', 200): 2083.7875,
 ('Wed', 300): 2474.75,
 ('Thu', 0): 674.75,
 ('Thu', 100): 1074.75,
 ('Thu', 200): 1474.75,
 ('Thu', 300): 1835.0,
 ('Fri', 0): 35.00000000000003,
 ('Fri', 100): 435.0,
 ('Fri', 200): 835.0,
 ('Fri', 300): 1400.0,
 ('Weekend', 0): 0,
 ('Weekend', 100): 0,
 ('Weekend', 200): 0,
 ('Weekend', 300): 0}

In [21]:
def get_eps_greedy(actions, eps, best_action):
    probabilities_actions = {}
    num_actions = len(actions)
    for action in actions:
        if action == best_action:
            probabilities_actions[action] = 1 - eps + eps / num_actions
        else:
            probabilities_actions[action] = eps / num_actions
    return probabilities_actions

## On policy Monte Carlo control

In [22]:
# create a function that generates a random policy, where all actions are equally likely to be taken
def get_random_policy(states, actions):
    policy = {}
    num_actions = len(actions)
    for state in states:
        policy[state] = {action: 1 / num_actions for action in actions}
    return policy

In [23]:
# on policy first-visit MC control algorithm
import operator

In [24]:
def on_policy_first_visit_mc(env, num_iterations, eps, gamma):
    np.random.seed(0)
    states = env.state_space
    actions = env.action_space
    policy = get_random_policy(states, actions)
    Q = {state: {action: 0 for action in actions} for state in states}
    Q_n = {state: {action: 0 for action in actions} for state in states}
    for iteration in range(num_iterations):
        if iteration % 10000 == 0:
            print(f"Iteration: {iteration}")
        trajectory = get_trajectory(env, policy)
        G = 0
        T = len(trajectory) - 1
        for t, state_action_reward in enumerate(reversed(trajectory)):
            state, action, reward = state_action_reward
            G = reward + gamma * G
            first_visit = True
            for j in range(T - t):
                state_j = trajectory[j][0]
                action_j = trajectory[j][1]
                if (state, action) == (state_j, action_j):
                    first_visit = False
                if first_visit:
                    Q[state][action] = Q_n[state][action] * Q_n[state][action] + G
                    Q_n[state][action] += 1
                    Q[state][action] /= Q_n[state][action]
                    best_action = max(Q[state].items(), key=operator.itemgetter(1))[0]
                    policy[state] = get_eps_greedy(actions, eps, best_action)
    return policy, Q, Q_n
            


In [25]:
# use the policy iteration
policy, Q, Q_n = on_policy_first_visit_mc(foodTruck, 300000, 0.05, 1)

Iteration: 0
Iteration: 10000
Iteration: 20000
Iteration: 30000
Iteration: 40000
Iteration: 50000
Iteration: 60000
Iteration: 70000
Iteration: 80000
Iteration: 90000
Iteration: 100000
Iteration: 110000
Iteration: 120000
Iteration: 130000
Iteration: 140000
Iteration: 150000
Iteration: 160000
Iteration: 170000
Iteration: 180000
Iteration: 190000
Iteration: 200000
Iteration: 210000
Iteration: 220000
Iteration: 230000
Iteration: 240000
Iteration: 250000
Iteration: 260000
Iteration: 270000
Iteration: 280000
Iteration: 290000


In [28]:
policy

{('Mon', 0): {0: 0.2, 100: 0.2, 200: 0.2, 300: 0.2, 400: 0.2},
 ('Tue', 0): {0: 0.01, 100: 0.01, 200: 0.01, 300: 0.01, 400: 0.96},
 ('Tue', 100): {0: 0.01, 100: 0.01, 200: 0.96, 300: 0.01, 400: 0.01},
 ('Tue', 200): {0: 0.01, 100: 0.96, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Tue', 300): {0: 0.01, 100: 0.96, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Wed', 0): {0: 0.01, 100: 0.01, 200: 0.96, 300: 0.01, 400: 0.01},
 ('Wed', 100): {0: 0.01, 100: 0.01, 200: 0.96, 300: 0.01, 400: 0.01},
 ('Wed', 200): {0: 0.01, 100: 0.01, 200: 0.96, 300: 0.01, 400: 0.01},
 ('Wed', 300): {0: 0.96, 100: 0.01, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Thu', 0): {0: 0.96, 100: 0.01, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Thu', 100): {0: 0.01, 100: 0.96, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Thu', 200): {0: 0.01, 100: 0.96, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Thu', 300): {0: 0.96, 100: 0.01, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Fri', 0): {0: 0.96, 100: 0.01, 200: 0.01, 300: 0.01, 400: 0.01},
 ('Fri', 100): {0: 0.96, 100: 0.01,