# Dynamic Programming
Consideriamo il MDP in figura, tratto dalla [pagina di Wikipedia](https://en.wikipedia.org/wiki/Markov_decision_process) sui **Markov Decision Process**.

![img](img/MDP.png)

$S_0, S_1, S_2$ sono gli **stati**.
<br>
$a_0, a_1$ sono le **azioni**.
<br>
Le frecce nere rappresentano le possibili **transizioni** con le relative **probabilità**.
<br>
Le frecce rosse rappresentano i **reward** non nulli.

Importiamo le **librerie** che useremo nel notebook

In [1]:
import random
import numpy as np

Creaimo una classe per rappresentare un **Markov Decision Process**.
<br>
In particolare 
* `n_steps` permette di simulare n **transizioni** del processo data una policy
<br>
* `estimate_state_value` permette di effettuare una **valutazione empirica** del valore degli stati in funzione della policy scelta.

In [2]:
class Mdp:
    '''Markov Decision Process Class'''
    
    def __init__(self, mdp_dict, gamma, initial_state=None, seed=None):
        '''
        Parametri
        ----------
        mdp_dict : la struttura del MDP (vedi sotto)
        gamma : il discount factor
        initial_state : lo stato iniziale. Se è None viene scelto casualmente
        seed : è il seme per la generazione dei valori pseudo-casuali
        '''
        self.mdp_dict = mdp_dict
        self.gamma = gamma
        self.set_seed(seed)
        self.set_state(initial_state)
    
    def get_states(self):
        '''Ritorna tutti gli stati del MDP
        
        Ritorno
        ----------
        Una lista con gli stati
        '''
        return list(self.mdp_dict.keys())
    
    def get_actions(self, state):
        '''Ritorna tutte le azioni possibili di uno stato
        
        Parametri
        ----------        
        state : lo stato di cui ritornare le azioni
        
        Ritorno
        ----------
        Una lista con le azioni
        '''
        return list(self.mdp_dict[state].keys())
        
    def set_state(self, state=None):
        '''Imposta lo stato del MDP
        
        Parametri
        ----------        
        state : lo stato da impostare. Se None viene scelto casualmente
        '''
        if state is None:
            # scegle uno stato a caso
            self.state = random.choice(self.get_states())
        else:
            # sceglie lo stato indicato
            self.state = state
            
    def set_seed(self, n):
        '''Imposta il seed per la generazione dei numeri casuali
        
        Parametri
        ----------
        n : un intero che rappresenta il seed
        '''
        random.seed(n)
        
    def get_transition_prob(self, state, action, next_state):
        '''Ritorna la probabilità di una transizione
        
        Parametri
        ----------
        state : uno stato
        action : azione compiuta nello stato state
        next_state : stato successivo

        Ritorno
        ----------
        La probabilità di transire nello stato next_state se,
        trovandosi nello stato state, si compie l'azione action
        '''
        if next_state in self.mdp_dict[state][action]:
            prob, _ = self.mdp_dict[state][action][next_state]
            return prob
        else:
            return 0
        
    def get_transition_reward(self, state, action, next_state):
        '''Ritorna il reward associato ad una transizione
        
        Parametri
        ----------
        state : uno stato
        action : azione compiuta nello stato state
        next_state : stato successivo

        Ritorno
        ----------
        Il reward che si ottiene se, trovandosi nello stato state 
        e compiendo l'azione action, si transisce in next_state
        '''
        # NOTE se la transizione ha probabilità 0 che succede?
        # La funzione non deve essere chiamata
        _, reward = self.mdp_dict[state][action][next_state]
        return reward
    
    # NOTE sicuro che non venga mai chiamato con lo stato non settato?
    def step(self, policy):
        '''Esegue un'azione a partire dallo stato attuale
        
        Parametri
        ----------
        policy : la policy da seguire
        
        Ritorno
        ----------
        Una tupla composta da:
        elemento 1 : lo stato in cui si trova l'MDP dopo l'azione
        elemento 2 : il reward ottenuto
        '''
        # azione scelta
        action = policy.get_action(self.state)

        # lista dei possibili nuovi stati data l'azione scelta e rispettive probabilità
        next_states, states_probs = zip(*[(next_state, prob) for (next_state, (prob, _)) in self.mdp_dict[self.state][action].items()])

        # nuovo stato
        new_state = random.choices(next_states, weights=states_probs, k=1)[0]
        # reward
        _, reward = self.mdp_dict[self.state][action][new_state]
        # cambia il vecchio stato
        self.state = new_state
        return new_state, reward
        
    def n_steps(self, n, policy):
        '''Esegue n step a partire dallo stato attuale
        
        Parametri
        ----------
        n : numero di step da effettuare
        policy : la policy da seguire
        
        Ritorno
        ----------
        Una tupla composta da:
        elemento 1 : una lista degli n stati visitati
        elemento 2 : una lista degli n reward ottenuti
        '''
        states = []
        rewards = []
        for _ in range(n):
            state, reward = self.step(policy)
            states.append(state)
            rewards.append(reward)
        return states, rewards
    
    def estimate_state_value(self, state, n_steps, n_runs, policy):
        '''Esegue una stima del valore dello stato state
        
        Si eseguono n_steps passi a partire dallo stato state e si ottiene
        una stima del valore dello stato sommando i reward ottenuti.
        Il processo viene ripetuto n_runs volte e le stime vengono mediate

        Parametri
        ----------
        state : stato di partenza
        n_steps : numero di step da effettuare
        n_runs : numero di iterazioni del processo
        policy : la policy da seguire
        
        Ritorno
        ----------
        la media dei ritorni ottenuti
        '''
        discounts =  [self.gamma**n for n in range(n_steps)]
        returns = []
        for _ in range(n_runs):
            self.set_state(state)
            _, rewards = self.n_steps(n_steps, policy)
            return_ = sum([reward*discount for (reward, discount) in zip(rewards, discounts)])
            returns.append(return_)
        return np.mean(returns).tolist()

Creiamo una classe per rappresentare le **policy**.

In [3]:
class Policy:
    '''Policy Class'''
    
    def __init__(self, policy_dict):
        '''
        Parametri
        ----------
        policy_dict : un dizionario che rappresenta la policy (vedi sotto)
        '''
        self.policy_dict = policy_dict
        
    def get_action_prob(self, state, action):
        ''' Ritorna la probabilità di un'azione
        
        Parametri
        ----------
        state : stato
        action: azione
        
        Ritorno
        ----------
        La probabilità di compiere l'azione action
        nello stato state.
        '''
        if action in self.policy_dict[state]:
            return self.policy_dict[state][action]
        else:
            return 0
        
    def get_action(self, state):
        '''Ritorna un'azione
        
        Se la policy è stocastica l'azione viene scelta casualmente
        in accordo con le probabilità specificate
        
        Parametri
        ----------
        state : uno stato
        
        Ritorno
        ----------
        Un'azione
        '''
        # lista delle azioni possibili nello stato state e rispettive probabilità
        actions, action_probs = zip(*[(action, prob) for (action, prob) in self.policy_dict.items()])

        actions = list(self.policy_dict[state].keys())
        # lista delle rispettive probabilità
        action_probs = [self.policy_dict[state][action] for action in actions]
        # azione scelta
        return random.choices(actions, weights=action_probs, k=1)[0]
    
    def is_equal_to(self, other_policy):
        ''' Stabilisce se la policy è uguale a other_policy

        Parametri
        ----------
        other_policy : la policy da confrontare con la policy self
        
        Ritorno
        ----------
        True se le due policy sono uguali, False altrimenti
        '''
        for state in self.policy_dict.keys():
            actions = list(self.policy_dict[state].keys()) + \
                list(other_policy.policy_dict[state].keys())
            for action in actions:
                if self.get_action_prob(state, action) != other_policy.get_action_prob(state, action):
                    return False
        return True

Il dizionario seguente descrive il **Markov Decision Process** illustrato in figura.

In [4]:
# Dizionario con le azioni associate ad ogni stato,
# le probabilità di transizione e i reward associati.
# Ad esempio se nello stato S2 si sceglie l'azione a1
# con probabilità 0.3 si entra nello stato S0 ottenendo
# un reward di -1
mdp_dict = {
    'S0': {
        'a0': {'S0': (0.5, 0), 'S2': (0.5, 0)},
        'a1': {'S2': (1, 0)}
    },
    'S1': {
        'a0': {'S0': (0.7, +5), 'S1': (0.1, 0), 'S2': (0.2, 0)},
        'a1': {'S1': (0.95, 0), 'S2': (0.05, 0)}
    },
    'S2': {
        'a0': {'S0': (0.4, 0), 'S1': (0.6, 0)},
        'a1': {'S0': (0.3, -1), 'S1': (0.3, 0), 'S2': (0.4, 0)}
    }
}


Creaimo l'oggetto corrispondente al nostro MDP con un **discount factor** di $0.8$.

In [5]:
mdp = Mdp(mdp_dict, gamma=0.8, initial_state=None, seed=None)

Il dizionario seguente descrive una **policy stocastica**.

In [6]:
# Una policy stocastica con le probabilità delle varie azioni
policy_dict = {
    'S0': {'a0': 0.3, 'a1': 0.7},
    'S1': {'a0': 0.8, 'a1': 0.2},
    'S2': {'a0': 0.5, 'a1': 0.5}
}

Creiamo l'oggetto corrispondente

In [7]:
policy = Policy(policy_dict)

## Policy Evaluation

La funzione di valore di stato $v_{\pi }$ può essere approssimata iterando la rispettiva **equazione di Bellman**:
<br><br><br>
$v_{k+1}(s) = E_{\pi}\left [ R_{t+1} + \gamma v_{k}\left ( S_{t+1} \right )| 
 \, S_{t+1}\right ] = \sum_{a}\pi\left ( a|s \right )\sum_{s', r}p\left ( s',r|s,a \right ) \left [ r+\gamma v_{k}\left ( s' \right ) \right ]$
<br><br><br>
La **valutazione iniziale** $v_{0}$ può essere scelta arbitrariamente, ma $v_0(s)=0$ se $s$ è uno **stato terminale** (gli stati terminali non vengono aggiornati).
<br> 
$v_{\pi}$ è chiaramente un **punto fisso** dell'iterazione precedente e si può dimostrare che se $v_{\pi}$ esiste l'iterazione converge.
N.B. Ricorda di utilizzare due dizionari distinti per rappresentare $v_{k}$ e $v_{k+1}$. In realtà è anche possibile utilizzare un unico dizionario. Anche questa versione **in place** della policy iteration converge.

In [8]:
def policy_evaluation(mdp, policy, epsilon, V=None):
    '''Valuta la funzione V (valore di stato) relativa ad una policy
    
    Parametri
    ----------
    mdp : il markov decision process a cui applicare la policy
    policy : la policy da valutare
    epsilon : è la soglia che determina la fine delle iterazioni.
              Le iterazioni terminano se max(abs(new_value(S)-old_value(S))) 
              < epsilon per ogni stato S.
    V : è la funzione di valore iniziale. E' un dizionario in cui
        le chiavi sono gli stati (stringhe) e i valori i rispettivi valori (float). 
        Se None la valutazione iniziale assegna il valore 0 ad ogni stato
        
    Ritorno
    ----------
    La funzione di valore della policy. E' un dizionario con lo
    stesso formato di V
    '''
    if V is None:
        V = {state : 0 for state in mdp.get_states()}
    delta = float('inf')
    while delta > epsilon:
        # crea una copia del dizionario
        V_old = V.copy()
        delta = 0
        for s in mdp.get_states():
            # nuovo valore di s
            new_vs = 0
            for a in mdp.get_actions(s):
                acc = 0
                for s1 in mdp.get_states():
                    # aggiorna V utilizzando V_old
                    acc += 0 if mdp.get_transition_prob(s, a, s1) == 0 else \
                        mdp.get_transition_prob(s, a, s1) * (mdp.get_transition_reward(s, a, s1) + mdp.gamma*V_old[s1])
                new_vs += policy.get_action_prob(s, a) * acc
            V[s] = new_vs
            # alla fine del ciclo il delta è pari al massimo aggiornamento che si è avuto
            delta = max(delta, abs(V_old[s]-V[s]))
    return V 

Eeguiamo una stima empirica del valore dello stato $S_0$ ottenuta mediando i valori di $4.000$ simulazioni del MDP con stato di partenza $S_0$, ciascuna di $200$ passi.

In [9]:
approx_value = mdp.estimate_state_value("S0", n_steps=200, n_runs=4000, policy=policy)
print(f"Il valore di S0 è circa {approx_value}")

Il valore di S0 è circa 2.140967807665984


Confrontiamo il valore ottenuto con quello ricavato dall'esecuzione della **policy evaluation**.

In [10]:
v = policy_evaluation(mdp, policy, epsilon=1e-7); v

{'S0': 2.181849290090905, 'S1': 5.30800242662227, 'S2': 2.823569769795702}

Testa la correttezza della tua implementazione.

In [11]:
assert np.allclose(list(v.values()), [2.182, 5.308, 2.824], atol=1e-3, rtol=1e-3)
print("Test superato!")

Test superato!


## Policy Improvement
Dopo avere eseguito la policy iteration per una policy $\pi$ possiamo utilizzare il valore degli stati per generare una policy migliore $\pi'$ nel modo seguente:

$
v_{\pi'}\left ( s \right )=\underset{a}{\operatorname{argmax}}\: q\left ( s,a \right )=\underset{a}{\operatorname{argmax}}\: E\left [ R_{t+1} +\gamma v_{\pi}\left ( S_{t+1} \right )  | S_t=s, A_t=a \right]=\underset{a}{\operatorname{argmax}}\sum_{s',r}p\left ( s',r|s,a \left [ r+\gamma v_\pi\left ( s' \right ) \right ]\right )
$

In [12]:
def policy_improvement(mdp, V):
    '''Ritorna una policy ottimale rispetto alla valutazione V

    Parametri
    ----------
    mdp : un Markov Decision Process
    V : una funzione di valutazione di stato
    
    Ritorno
    ----------
    Una policy ottimale rispetto a V
    '''
    # inizializza il dizionario che rappresenta la policy. Tutte le azioni hanno probabilità 0
    new_policy_dict = {state : {action : 0 for action in mdp.get_actions(state)} for state in mdp.get_states()}
    for s in mdp.get_states():
        # rappresenta il valore q(s,a_max) dell'azione migliore nello stato s 
        # secondo la funzione di valore V
        q_a_max = -float('inf')
        for a in mdp.get_actions(s):
            # è il valore di una specifica azione a compita nello stato s
            q_a = 0
            # calcola il valore di a
            for s1 in mdp.get_states():
                q_a += 0 if mdp.get_transition_prob(s, a, s1) == 0 else \
                    mdp.get_transition_prob(s, a, s1)*(mdp.get_transition_reward(s, a, s1)+mdp.gamma*V[s1])
            if q_a > q_a_max:
                q_a_max = q_a
                a_max = a
        # la probabilità dell'azione migliore nello stato s viene impostata ad 1
        new_policy_dict[s][a_max] = 1
    # ritorna una policy deterministica, ottimale rispetto alla funzione di valutazione V
    return Policy(new_policy_dict)

Testa la correttezza della tua implementazione.

In [13]:
policy = policy_improvement(mdp, V={"S0":0, "S1":5, "S2":2})
assert policy.is_equal_to(Policy({'S0': {'a0': 0, 'a1': 1}, 'S1': {'a0': 1, 'a1': 0}, 'S2': {'a0': 1, 'a1': 0}}))
policy = policy_improvement(mdp, V={"S0":10, "S1":15, "S2":2})
assert policy.is_equal_to(Policy({'S0': {'a0': 1, 'a1': 0}, 'S1': {'a0': 0, 'a1': 1}, 'S2': {'a0': 1, 'a1': 0}}))
print("Test superato!")

Test superato!


## Policy Iteration


$
\pi_0 \xrightarrow[]{\text{evaluation}} v_{\pi_0} \xrightarrow[]{\text{improve}}
\pi_1 \xrightarrow[]{\text{evaluation}} v_{\pi_1} \xrightarrow[]{\text{improve}}
\pi_2 \xrightarrow[]{\text{evaluation}} \dots \xrightarrow[]{\text{improve}}
\pi_* \xrightarrow[]{\text{evaluation}} v_*
$


Nella policy iteration si parte da una policy arbitraria $v_0$ e si effettuano alternatamente una policy evaluation e una policy improvement. L'algoritmo termina quando $v_n(S)=v_{n-1}(S)$ per ogni stato $S$. La policy *v_n* è ottimale.

In [14]:
def policy_iteration(mdp, initial_policy, epsilon):
    ''' Esegue una policy iteration
    
    Parametri
    ----------
    mdp : un Markov Decision Process
    initial_policy : la policy iniziale
    epsilon : il valore di soglia per policy evaluation
    
    Ritorno
    ----------
    Una tupla composta da:
    Elemento 1 : Una policy ottimale per il Markov Decision Process mdp
    Elemento 2 : La funzione di valore di stato V della policy ottimale
    '''
    policy = initial_policy
    V = None
    while True:
        V = policy_evaluation(mdp, policy, epsilon, V)
        new_policy = policy_improvement(mdp, V)
        if new_policy.is_equal_to(policy):
            return (policy, V)
        else:
            policy = new_policy

In [15]:
optimal_policy, optimal_values = policy_iteration(mdp=mdp, initial_policy=policy, epsilon=1e-8)

In [16]:
assert optimal_policy.is_equal_to(Policy({'S0': {'a0': 0, 'a1': 1}, 'S1': {'a0': 1, 'a1': 0}, 'S2': {'a0': 1, 'a1': 0}}))
assert np.allclose(list(optimal_values.values()), [3.423, 6.632, 4.279], atol=1e-3, rtol=1e-3)
print("Test superato!")

Test superato!
