# Laboratorium 1 (1 pkt)

Celem pierwszego laboratorium jest zapoznanie się oraz zaimplementowanie algorytmów uczenia pasywnego. Zaimplementowane algorytmy będą testowane z wykorzystaniem wcześniej przygotowanego środowiska przedstawionego na schemacie poniżej:
![MDP, Markov Decision Process](assets/images/mdp.png)

Dołączenie biblioteki ze środowiskiem

In [18]:
from env.simpleMDP import simpleMDP
import numpy as np

Zapoznanie się z przygotowanym środowiskiem

In [3]:
mdp = simpleMDP()

states = mdp.get_all_states()
print(states)

for s in states:
    actions = mdp.get_possible_actions(s)
    for a in actions:
        next_states = mdp.get_next_states(s, a)
        print("\nState: " + s + " action: " + a + " " + "list of possible next states: ", next_states)
        for next_state, propability in next_states.items():
            reward = mdp.get_reward(s, a, next_state)
            print("from state: " + s + ", action: " + a + ", next state: " + next_state + ", propablity: " + str(propability) + ", reward: " + str(reward))

['s0', 's1', 's2']

State: s0 action: a0 list of possible next states:  {'s0': 0.5, 's2': 0.5}
from state: s0, action: a0, next state: s0, propablity: 0.5, reward: 0.0
from state: s0, action: a0, next state: s2, propablity: 0.5, reward: 0.0

State: s0 action: a1 list of possible next states:  {'s2': 1}
from state: s0, action: a1, next state: s2, propablity: 1, reward: 0.0

State: s1 action: a0 list of possible next states:  {'s0': 0.7, 's1': 0.1, 's2': 0.2}
from state: s1, action: a0, next state: s0, propablity: 0.7, reward: 5
from state: s1, action: a0, next state: s1, propablity: 0.1, reward: 0.0
from state: s1, action: a0, next state: s2, propablity: 0.2, reward: 0.0

State: s1 action: a1 list of possible next states:  {'s1': 0.95, 's2': 0.05}
from state: s1, action: a1, next state: s1, propablity: 0.95, reward: 0.0
from state: s1, action: a1, next state: s2, propablity: 0.05, reward: 0.0

State: s2 action: a0 list of possible next states:  {'s0': 0.4, 's2': 0.6}
from state: s2, act

## Zadanie 1 - Ocena strategii (*Policy evaluation*) (0.3 pkt)

<p style='text-align: justify;'>
Celem ćwiczenie jest zaimplementowanie algorytmu oceny strategii. Algorytm wyznacza funkcję oceny (wartości) strategii wykorzystując równanie Bellmana. Odbywa się to w sposób iteracyjny zgodnie ze wzorem:
\begin{equation}
	        V_{k + 1}(s) = \sum_a \pi(a | s) \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V_k(s')]
\end{equation}
</p>

Pierwszym krokiem jest zainicjalizowanie strategii, która będzie podlegała ocenie. W tym zadaniu ocenie będzie podlegała strategia stochastyczna, w której będzie prawdopodobieństwo wyboru każdej z możliwych opcji będzie takie samo.

In [32]:
policy = dict()

for s in states:
    actions = mdp.get_possible_actions(s)
    action_prob = 1 / len(actions)
    policy[s] = dict()
    for a in actions:
        policy[s][a] = action_prob

print(policy)

{'s0': {'a0': 0.5, 'a1': 0.5}, 's1': {'a0': 0.5, 'a1': 0.5}, 's2': {'a0': 0.5, 'a1': 0.5}}


Implementacja algorytmu oceny strategii - podejście z dwiema tablicami

In [33]:
def policy_eval_two_arrays(mdp, policy, gamma, theta):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = dict()
    V_prior = dict()

    for s in mdp.get_all_states():
        V[s] = 0
        V_prior[s] = 0
    
        
    #
    # INSERT CODE HERE to evaluate the policy using the 2 array approach
    
    cycles = 0
   
    while True:
        delta = 0
        cycles += 1
        
        for s in states:
            actions = mdp.get_possible_actions(s)
            V_s = 0
            
            for a in actions:
                transitions = mdp.get_next_states(s, a)
                
                for next_state, probability in transitions.items():
                    reward = mdp.get_reward(s, a, next_state)
                    V_s += policy[s][a] * probability * (reward + gamma * V_prior[next_state])
            delta = max(delta, abs(V_prior[s] - V_s))
            V[s]=V_s

        V_prior = V.copy()
        
        if delta < theta:
            break
    return cycles, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [34]:
cycles, V = policy_eval_two_arrays(mdp, policy, 0.9, 0.0001)


print(cycles, V)
assert np.isclose(V['s0'], 1.46785443374683)
assert np.isclose(V['s1'], 4.55336594491180)
assert np.isclose(V['s2'], 1.68544141660991)

74 {'s0': 1.46785443374683, 's1': 4.5533659449117945, 's2': 1.6854414166099074}


Implementacja algorytmu oceny strategii - obliczenia w miejscu

In [35]:
def policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate.
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = dict()

    for s in mdp.get_all_states():
        V[s] = 0

    # INSERT CODE HERE to evaluate the policy using the in-place approach
    
    cycles = 0
   
    while True:
        delta = 0
        cycles += 1
        
        for s in states:
            actions = mdp.get_possible_actions(s)
            V_s = 0
            
            for a in actions:
                transitions = mdp.get_next_states(s, a)
                
                for next_state, probability in transitions.items():
                    reward = mdp.get_reward(s, a, next_state)
                    V_s += policy[s][a] * probability * (reward + gamma * V[next_state])  
            delta = max(delta, abs(V[s] - V_s))
            V[s]=V_s 
        
        if delta < theta:
            break
    return cycles, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [36]:
cycles, V = policy_eval_in_place(mdp, policy, 0.9, 0.0001)

print(cycles, V)

assert np.isclose(V['s0'], 1.4681508097651)
assert np.isclose(V['s1'], 4.5536768132712)
assert np.isclose(V['s2'], 1.6857723431614)

54 {'s0': 1.4681508097650982, 's1': 4.553676813271177, 's2': 1.6857723431613716}


## Zadanie 2 - Iteracyjne doskonalenie strategii (*Policy iteration*) (0.3 pkt)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego doskonalenia strategii. Zadaniem algorytmu jest wyznaczenie optymalnej strategii dla danego środowiska. Pierwszym krokiem algorytmu jest zainicjalizowanie strategii losowymi akcjami, następnie jej ocenienie oraz poprawa, zgodnie ze wzorem:
\begin{equation}
    \pi(s) \leftarrow  \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')]
\end{equation}
Jeżeli poprawiona strategia będzie się różniła od poprzedniej, należy ją ponownie ocenić i poprawić. Algorytm kończy działanie w momencie kiedy poprawiona strategia będzie dokładnie taka sama, jak ta poddawana poprawie.
</p>

Zaimplementuj funkcję do oceny strategii, tylko tym razem przyjmij, że strategia przekazywana na wejściu będzie deteministyczna 

In [37]:
def deterministic_policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified deterministic policy for the specified MDP:

        'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


        'policy' - the deterministic policy (action probability for each state), for the given mdp, too evaluate
        'gamma' - discount factor for MDP
        'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                  smaller than theta
    """
    V = dict()

    for s in mdp.get_all_states():
        V[s] = 0
    #
    # INSERT CODE HERE to evaluate the deterministic policy using the in-place approach
    
       
    while True:
        delta = 0
        
        for s in states:
            a = policy[s]
            V_s = 0

            transitions = mdp.get_next_states(s, a)
            for next_state, probability in transitions.items():
                reward = mdp.get_reward(s, a, next_state)
                V_s += probability * (reward + gamma * V[next_state])  
            delta = max(delta, abs(V[s] - V_s))     
            V[s]=V_s
            
        if delta < theta:
            break
    return V

Zaimplementuj funkcję do poprawy strategii `policy` na podstawie funkcji oceny `value_function` wyznaczonej dla niej 

In [38]:
def policy_improvement(mdp, policy, value_function, gamma):
    """
            This function improves specified deterministic policy for the specified MDP using value_function:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'policy' - the deterministic policy (action for each state), for the given mdp, too improve.
           'value_function' - the value function, for the given policy.
            'gamma' - discount factor for MDP

           Function returns False if policy was improved or True otherwise
       """

    policy_stable = True

    #
    # INSERT CODE HERE to evaluate the improved policy
    #

    for s in states:
        prev_action = policy[s]
        actions = mdp.get_possible_actions(s)
        value = 0
        expected_rewards = []
        for a in actions:
            transitions = mdp.get_next_states(s, a)
            for next_state, probability in transitions.items():
                reward = mdp.get_reward(s, a, next_state)
                value += probability * (reward + gamma * value_function[next_state]) 
            expected_rewards.append(value)
            value = 0
        policy[s] = actions[expected_rewards.index(max(expected_rewards))]
        if prev_action != policy[s]:
            policy_stable = False
    
    return policy_stable

Zaimplementuj funkcję do iteracyjnego doskonalenia strategii

In [39]:
def policy_iteration(mdp, gamma, theta):

    """
            This function calculate optimal policy for the specified MDP:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'gamma' - discount factor for MDP
           'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
                      than theta
           Function returns optimal policy and value function for the policy
       """

    policy = dict()

    for s in states:
        actions = mdp.get_possible_actions(s)
        policy[s] = actions[0]

    V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

    policy_stable = False

    while not policy_stable:
        policy_stable = policy_improvement(mdp, policy, V, gamma)
        V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [40]:
optimal_policy, optimal_value = policy_iteration(mdp, 0.9, 0.001)

print(optimal_policy)
print(optimal_value)

assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

{'s0': 'a1', 's1': 'a0', 's2': 'a1'}
{'s0': 3.785366128142998, 's1': 7.298653645273432, 's2': 4.206831790079636}


## Zadanie 3 - Iteracyjne obliczanie funkcji wartości (*Value iteration*) (0.4 pkt)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego obliczania funkcji wartości. Algorytm ten łączy w sobie dwa wyżej wspomniane podejścia oceny i wyznaczania optymalnej strategii. Najpierw wyznaczana jest optymalna funkcja wartości stanu zgodnie ze wzorem:
\begin{equation}
    V(s) \leftarrow  \max_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
Po wyznaczeniu optymalnej funkcji wartości dla każdego stanu określana jest strategia postępowania w każdym możliwym stanie zgodnie ze wzorem:
\begin{equation}
    \pi(s) = \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
</p>

Implementacja algorytmu do iteracyjnego obliczania funkcji wartości

In [41]:
def value_iteration(mdp, gamma, theta):
    """
            This function calculate optimal policy for the specified MDP using Value Iteration approach:

            'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                get_reward - return the reward after taking action in state and landing on next_state


            'gamma' - discount factor for MDP
            'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                      smaller than theta
            Function returns optimal policy and value function for the policy
       """



    V = dict()
    policy = dict()

    # init with a policy with first avail action for each state
    for current_state in mdp.get_all_states():
        V[current_state] = 0
        actions = mdp.get_possible_actions(current_state)
        policy[current_state] = actions[0]

    #
    # INSERT CODE HERE to evaluate the best policy and value function for the given mdp
    #
    
    # policy_evaluation z algorytmem zachłannym
    while True:
        delta = 0
        for s in states:
            V_prev = V[s]
            V_s = 0
            value = 0
            expected_rewards = []
            actions = mdp.get_possible_actions(s)
            for a in actions:
                transitions = mdp.get_next_states(s, a)
                for next_state, probability in transitions.items():
                    reward = mdp.get_reward(s, a, next_state)
                    value += probability * (reward + gamma * V[next_state]) 
                expected_rewards.append(value)
                value = 0            
            V[s] = max(expected_rewards)
            delta = max(delta, abs(V_prev - V[s]))
        if delta < theta:
            break

    # policy_improvement
    for s in states:
        actions = mdp.get_possible_actions(s)
        value = 0
        expected_rewards = []
        for a in actions:
            transitions = mdp.get_next_states(s, a)
            for next_state, probability in transitions.items():
                reward = mdp.get_reward(s, a, next_state)
                value += probability * (reward + gamma * V[next_state]) 
            expected_rewards.append(value)
            value = 0
        policy[s] = actions[expected_rewards.index(max(expected_rewards))]

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [42]:
optimal_policy, optimal_value = value_iteration(mdp, 0.9, 0.001)

print(optimal_value)
print(optimal_policy)

assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

{'s0': 3.785366128142998, 's1': 7.298653645273432, 's2': 4.206831790079636}
{'s0': 'a1', 's1': 'a0', 's2': 'a1'}
