## Przykład - dostawa towarów do food-trucka

Problem: ile każdego dnia właściciel food trucka powinien kupić kotletów do burgera, aby wypełnić zapotrzebowanie. Food truck działa jedynie od poniedziałku do piątku.
- Każdego ranka właściciel podejmuje decyzję o kupnie $A=\{0, 100, 200, 300, 400\}$ burgerów. Koszt pojedynczego wynosi $c=4$.
- Pojemność lodówki wynosi $C=400$ burgerów. Każdy burger niewykorzystany w piątek zostaje wyrzucony.
- Każde zakupione burgery, które przekraczają aktualną pojemność lodówki, zostają wyrzucone.
- Codzienne zapotrzebowanie na burgery jest zmienną losową $D$ o następującym rozkładzie:
![](img/D_prob_mass_fn.png)
- Zarobek netto per burger wynosi $b=7$
- Liczba sprzedanych dziennie burgerów wynosi $min(zapotrzebowanie, dostępne-burgery)$

Cel: maksymalizacja tygodniowych zarobków $(b-c)$

In [1]:
import numpy as np
import gym

### Definicja środowiska

In [2]:
# Środowisko rozszerza klasę Env z biblioteki OpenAI Gym
class FoodTruck(gym.Env):
    def __init__(self):
        self.v_demand = [100, 200, 300, 400]
        self.p_demand = [0.3, 0.4, 0.2, 0.1]
        self.capacity = self.v_demand[-1]
        self.days = ['Mon', 'Tue', 'Wed',
                     'Thu', 'Fri', "Weekend"]
        self.unit_cost = 4
        self.net_revenue = 7
        self.action_space = [0, 100, 200, 300, 400]
        # stan środowiska - (dzień tygodnia, zapas burgerów na początku dnia)
        # stan środowiska = obserwacja agenta (środowisko w pełni obserwowalne)
        self.state_space = [("Mon", 0)] \
                           + [(d, i) for d in self.days[1:]
                              for i in [0, 100, 200, 300]]

    # metoda obliczająca następny stan środowiska oraz nagrodę
    def get_next_state_reward(self, state, action, demand):
        day, inventory = state
        result = {}
        result['next_day'] = self.days[self.days.index(day)
                                       + 1]
        result['starting_inventory'] = min(self.capacity, inventory + action)
        result['cost'] = self.unit_cost * action
        result['sales'] = min(result['starting_inventory'], demand)
        result['revenue'] = self.net_revenue * result['sales']
        result['next_inventory'] = result['starting_inventory'] - result['sales']
        result['reward'] = result['revenue'] - result['cost']
        return result

    def get_transition_prob(self, state, action):
        next_s_r_prob = {}
        for ix, demand in enumerate(self.v_demand):
            result = self.get_next_state_reward(state,
                                                action,
                                                demand)
            next_s = (result['next_day'], result['next_inventory'])
            reward = result['reward']
            prob = self.p_demand[ix]
            if (next_s, reward) not in next_s_r_prob:
                next_s_r_prob[next_s, reward] = prob
            else:
                next_s_r_prob[next_s, reward] += prob
        return next_s_r_prob

    # metody potrzebne do symulacji środowiska
    def reset(self):
        self.day = "Mon"
        self.inventory = 0
        state = (self.day, self.inventory)
        return state

    def is_terminal(self, state):
        day, inventory = state
        if day == "Weekend":
            return True
        else:
            return False

    def step(self, action):
        demand = np.random.choice(self.v_demand, p=self.p_demand)
        result = self.get_next_state_reward((self.day, self.inventory),
                                            action,
                                            demand)
        self.day = result['next_day']
        self.inventory = result['next_inventory']
        state = (self.day, self.inventory)
        reward = result['reward']
        done = self.is_terminal(state)
        info = {'demand': demand, 'sales': result['sales']}
        return state, reward, done, info

### Ewaluacja strategii (policy evaluation)

Rozpatrzmy prostą strategię: na początku dnia, właściciel kupuje tyle burgerów, aby zapas był równy 200 lub 300 burgerów (każdą z opcji wybiera z równym prawdopodobieństwem ($0.5$)).
Na przykład, jeżeli na początku dnia w lodówce jest 100 burgerów, zakupi on 100 lub 200 sztuk.

Dokonajmy ewaluacji tej strategii.

In [3]:
# metoda zwracająca strategię działania
def base_policy(states):
    policy = {}
    for s in states:
        day, inventory = s
        prob_a = {}
        if inventory >= 300:
            prob_a[0] = 1
        else:
            prob_a[200 - inventory] = 0.5
            prob_a[300 - inventory] = 0.5
        policy[s] = prob_a
    return policy  # dict: stan -> {akcja: prawdopodobieństwo}

In [4]:
# metoda zwracająca zaktualizowaną wartość dla stanu s
def expected_update(env, v, s, prob_a, gamma):
    expected_value = 0
    for a in prob_a:
        prob_next_s_r = env.get_transition_prob(s, a)
        for next_s, r in prob_next_s_r:
            expected_value += prob_a[a] * prob_next_s_r[next_s, r] * (r + gamma * v[next_s])
    return expected_value

In [5]:
# implementacja algorytmu 'ewaluacji strategii'
def policy_evaluation(env, policy, max_iter=100, v = None, eps=0.1, gamma=1):
    if not v:
        v = {s: 0 for s in env.state_space}
    k = 0
    while True:
        max_delta = 0
        for s in v:
            if not env.is_terminal(s):
                v_old = v[s]
                prob_a = policy[s]
                v[s] = expected_update(env, v, s, prob_a, gamma)
                max_delta = max(max_delta, abs(v[s] - v_old))
        k += 1
        if max_delta < eps:
            print("Converged in", k, "iterations.")
            break
        elif k == max_iter:
            print("Terminating after", k, "iterations.")
            break
    return v

In [7]:
foodtruck = FoodTruck()
policy = base_policy(foodtruck.state_space)
v = policy_evaluation(foodtruck, policy)
print("Expected weekly profit:", v["Mon", 0])
print("The state values:", v)

Converged in 6 iterations.
Expected weekly profit: 2515.0
The state values: {('Mon', 0): 2515.0, ('Tue', 0): 1960.0, ('Tue', 100): 2360.0, ('Tue', 200): 2760.0, ('Tue', 300): 3205.0, ('Wed', 0): 1405.0, ('Wed', 100): 1805.0, ('Wed', 200): 2205.0, ('Wed', 300): 2650.0, ('Thu', 0): 850.0000000000001, ('Thu', 100): 1250.0, ('Thu', 200): 1650.0, ('Thu', 300): 2095.0, ('Fri', 0): 295.00000000000006, ('Fri', 100): 695.0000000000001, ('Fri', 200): 1095.0, ('Fri', 300): 1400.0, ('Weekend', 0): 0, ('Weekend', 100): 0, ('Weekend', 200): 0, ('Weekend', 300): 0}


Sprawdźmy, czy symulacja środowiska dla powyższej strategii da nam podobną wartość nagrody.

In [8]:
def choose_action(state, policy):
    prob_a = policy[state]
    action = np.random.choice(a=list(prob_a.keys()), p=list(prob_a.values()))
    return action

def simulate_policy(policy, n_episodes):
    np.random.seed(0)
    foodtruck = FoodTruck()
    rewards = []
    for i_episode in range(n_episodes):
        state = foodtruck.reset()
        done = False
        ep_reward = 0
        while not done:
            action = choose_action(state, policy)
            state, reward, done, info = foodtruck.step(action)
            ep_reward += reward
        rewards.append(ep_reward)
    print("Expected weekly profit:", np.mean(rewards))

In [9]:
simulate_policy(policy, 1000)

Expected weekly profit: 2518.1


Otrzymana wartość jest bliska wartości wyznaczonej analitycznie!

### Iteracja strategii (policy iteration)

In [26]:
def policy_improvement(env, v, s, actions, gamma):
    prob_a = {}
    if not env.is_terminal(s):
        max_q = np.NINF
        best_a = None
        for a in actions:
            q_sa = expected_update(env, v, s, {a: 1}, gamma) # Aktualizacja wartości
            if q_sa >= max_q:
                max_q = q_sa
                best_a = a
        prob_a[best_a] = 1
    else:
        max_q = 0
    print(prob_a, max_q, "\n")
    return prob_a, max_q

In [27]:
def policy_iteration(env,  eps=0.1, gamma=1):
    np.random.seed(1)
    states = env.state_space
    actions = env.action_space
    policy = {s: {np.random.choice(actions): 1}
              for s in states} # 1. Inicjalizacja strategii
    v = {s: 0 for s in states}
    while True:
        v = policy_evaluation(env, policy, v=v,
                              eps=eps, gamma=gamma) # 2. Ewaluacja strategii
        old_policy = policy
        policy = {}
        for s in states:
            # 3. Aktualizacja wartości strategii
            policy[s], _ = policy_improvement(env, v, s,
                                              actions, gamma)
        if old_policy == policy:
            break
    print("Optimal policy found!")
    return policy, v

In [28]:
policy, v = policy_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])
print(policy)

Converged in 6 iterations.
{300: 1} 1205.844 

{300: 1} 932.64 

{200: 1} 1332.64 

{100: 1} 1732.64 

{0: 1} 2132.6400000000003 

{300: 1} 820.8000000000001 

{200: 1} 1220.8000000000002 

{100: 1} 1620.8000000000002 

{0: 1} 2020.8000000000002 

{100: 1} 689.9999999999999 

{0: 1} 1089.9999999999998 

{0: 1} 1424.0000000000002 

{0: 1} 1546.0 

{200: 1} 390.00000000000006 

{100: 1} 790.0000000000001 

{0: 1} 1190.0 

{0: 1} 1400.0 

{} 0 

{} 0 

{} 0 

{} 0 

Converged in 6 iterations.
{400: 1} 2583.0 

{400: 1} 1982.9999999999998 

{300: 1} 2383.0 

{200: 1} 2783.0 

{100: 1} 3183.0 

{400: 1} 1494.0 

{300: 1} 1894.0 

{200: 1} 2294.0 

{100: 1} 2694.0 

{300: 1} 990.0 

{200: 1} 1390.0 

{100: 1} 1790.0 

{0: 1} 2190.0 

{200: 1} 390.00000000000006 

{100: 1} 790.0000000000001 

{0: 1} 1190.0 

{0: 1} 1400.0 

{} 0 

{} 0 

{} 0 

{} 0 

Converged in 5 iterations.
{400: 1} 2880.0 

{400: 1} 2250.0 

{300: 1} 2650.0 

{200: 1} 3050.0 

{100: 1} 3450.0 

{400: 1} 1620.0 

{300: 1}

### Value iteration

In [14]:
def value_iteration(env, max_iter=100, eps=0.1, gamma=1):
    states = env.state_space
    actions = env.action_space
    v = {s: 0 for s in states}
    policy = {}
    k = 0
    while True:
        max_delta = 0
        for s in states:
            old_v = v[s]
            policy[s], v[s] = policy_improvement(env,
                                                 v,
                                                 s,
                                                 actions,
                                                 gamma)
            max_delta = max(max_delta, abs(v[s] - old_v))
        k += 1
        if max_delta < eps:
            print("Converged in", k, "iterations.")
            break
        elif k == max_iter:
            print("Terminating after", k, "iterations.")
            break
    return policy, v

In [15]:
policy, v = value_iteration(foodtruck)
print("Expected weekly profit:", v["Mon", 0])

Converged in 6 iterations.
Expected weekly profit: 2880.0


## Zadanie

1. Dokończ implementację metody *iteracja strategii (policy iteration)*.
2. Opisz, na czym polega metoda *iteracja wartości (value iteration)*. Czym różni się ona od metody iteracja strategii?
3. Podaj dwa powody, dla których zastosowanie programowania dynamicznego w praktyce jest trudne lub nawet niemożliwe.