# Laboratorium 1

Celem pierwszego laboratorium jest zapoznanie się oraz zaimplementowanie algorytmów uczenia pasywnego. Zaimplementowane algorytmy będą testowane z wykorzystaniem wcześniej przygotowanego środowiska przedstawionego na schemacie poniżej:
![MDP, Markov Decision Process](assets/images/mdp.png)

Dołączenie biblioteki ze środowiskiem

In [2]:
from env.simpleMDP import simpleMDP
import numpy as np

Zapoznanie się z przygotowanym środowiskiem

In [3]:
mdp = simpleMDP()

states = mdp.get_all_states()
print(states)

for s in states:
    actions = mdp.get_possible_actions(s)
    for a in actions:
        next_states = mdp.get_next_states(s, a)
        print("State: " + s + " action: " + a + " " + "list of possible next states: ", next_states)


['s0', 's1', 's2']
State: s0 action: a0 list of possible next states:  {'s0': 0.5, 's2': 0.5}
State: s0 action: a1 list of possible next states:  {'s2': 1}
State: s1 action: a0 list of possible next states:  {'s0': 0.7, 's1': 0.1, 's2': 0.2}
State: s1 action: a1 list of possible next states:  {'s1': 0.95, 's2': 0.05}
State: s2 action: a0 list of possible next states:  {'s0': 0.4, 's2': 0.6}
State: s2 action: a1 list of possible next states:  {'s0': 0.3, 's1': 0.3, 's2': 0.4}


## Zadanie 1 - Ocena strategii (*Policy evaluation*)

<p style='text-align: justify;'>
Celem ćwiczenie jest zaimplementowanie algorytmu oceny strategii. Algorytm wyznacza funkcję oceny (wartości) strategii wykorzystując równanie Bellmana. Odbywa się to w sposób iteracyjny zgodnie ze wzorem:
\begin{equation}
	        V_{k + 1}(s) = \sum_a \pi(a | s) \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V_k(s')]
\end{equation}
</p>

Pierwszym krokiem jest zainicjalizowanie strategii, która będzie podlegała ocenie. W tym zadaniu ocenie będzie podlegała strategia stochastyczna, w której będzie prawdopodobieństwo wyboru każdej z możliwych opcji będzie takie samo.

In [24]:
policy = dict()

for s in states:
    actions = mdp.get_possible_actions(s)
    action_prob = 1 / len(actions)
    policy[s] = dict()

    
    for a in actions:
        policy[s][a] = action_prob

# print(policy)

Implementacja algorytmu oceny strategii - podejście z dwiema tablicami

In [14]:
def policy_eval_two_arrays(mdp, policy, gamma, theta):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate
    'gamma' - discount factor for MDP
    
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = dict()
    states = mdp.get_all_states()

    for state in states:
        V[state] = 0
    
    while True:
        dt = 0
        V_pr = V.copy()
        for state in states:
            v_od_s = 0
            possible_actions = mdp.get_possible_actions(state)
            for action in possible_actions:
                action_probability = policy[state][action]
                action_val = 0
                s_prim_coll = mdp.get_next_states(state, action) 
                for s_prim_single in s_prim_coll.keys():
                    state_propability = s_prim_coll[s_prim_single]
                    reward = mdp.get_reward(state, action, s_prim_single)
                    next_state_values = state_propability * ( reward + gamma * V_pr[s_prim_single] )
                    action_val += next_state_values
                v_od_s += action_probability * action_val
            V[state] = v_od_s
            print(V)
            dt = max(dt, abs(v_od_s - V_pr[state]))
        if  dt < theta:
            break

    return V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [15]:
V = policy_eval_two_arrays(mdp, policy, 0.9, 0.0001)
# print(V['s0'])
# print(V)
assert np.isclose(V['s0'], 1.46785443374683)
assert np.isclose(V['s1'], 4.55336594491180)
assert np.isclose(V['s2'], 1.68544141660991)

{'s0': 0.0, 's1': 0, 's2': 0}
{'s0': 0.0, 's1': 1.75, 's2': 0}
{'s0': 0.0, 's1': 1.75, 's2': -0.15}
{'s0': -0.10125, 's1': 1.75, 's2': -0.15}
{'s0': -0.10125, 's1': 2.56, 's2': -0.15}
{'s0': -0.10125, 's1': 2.56, 's2': 0.01874999999999999}
{'s0': -0.01012500000000001, 's1': 2.56, 's2': 0.01874999999999999}
{'s0': -0.01012500000000001, 's1': 2.929815625, 's2': 0.01874999999999999}
{'s0': -0.01012500000000001, 's1': 2.929815625, 's2': 0.17214375000000004}
{'s0': 0.11391890625000002, 's1': 2.929815625, 's2': 0.17214375000000004}
{'s0': 0.11391890625000002, 's1': 3.1505146796874994, 's2': 0.17214375000000004}
{'s0': 0.11391890625000002, 's1': 3.1505146796874994, 's2': 0.31980042187499996}
{'s0': 0.24149703867187497, 's1': 3.1505146796874994, 's2': 0.31980042187499996}
{'s0': 0.24149703867187497, 's1': 3.3104801890820306, 's2': 0.31980042187499996}
{'s0': 0.24149703867187497, 's1': 3.3104801890820306, 's2': 0.4551141270703124}
{'s0': 0.3615388694736328, 's1': 3.3104801890820306, 's2': 0.455

Implementacja algorytmu oceny strategii - obliczenia w miejscu

In [21]:
def policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate.
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = dict()
    all_states = mdp.get_all_states()

    for s in all_states:
        V[s] = 0

    while True:
        dt = 0
        for state in states:
            v_od_s = 0
            possible_actions = mdp.get_possible_actions(state)
            for action in possible_actions:
                action_probability = policy[state][action]
                action_val = 0
                s_prim_coll = mdp.get_next_states(state, action) 
                for s_prim_single in s_prim_coll.keys():
                    next_state_probability = s_prim_coll[s_prim_single]
                    reward = mdp.get_reward(state, action, s_prim_single)
                    next_state_values = next_state_probability * ( reward + gamma * V[s_prim_single])
                    action_val += next_state_values
                v_od_s += action_probability * action_val
            dt = max(dt, abs(v_od_s - V[state]))
            V[state] = v_od_s
            print(V)
        if dt < theta:
            break

    return V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [22]:
V = policy_eval_in_place(mdp, policy, 0.9, 0.0001)

# print(V)
assert np.isclose(V['s0'], 1.4681508097651)
assert np.isclose(V['s1'], 4.5536768132712)
assert np.isclose(V['s2'], 1.6857723431614)

V[s_prim_single] 0
V[s_prim_single] 0
V[s_prim_single] 0
{'s0': 0.0, 's1': 0, 's2': 0}
V[s_prim_single] 0.0
V[s_prim_single] 0
V[s_prim_single] 0
V[s_prim_single] 0
V[s_prim_single] 0
{'s0': 0.0, 's1': 1.75, 's2': 0}
V[s_prim_single] 0.0
V[s_prim_single] 0
V[s_prim_single] 0.0
V[s_prim_single] 1.75
V[s_prim_single] 0
{'s0': 0.0, 's1': 1.75, 's2': 0.08625}
V[s_prim_single] 0.0
V[s_prim_single] 0.08625
V[s_prim_single] 0.08625
{'s0': 0.05821875, 's1': 1.75, 's2': 0.08625}
V[s_prim_single] 0.05821875
V[s_prim_single] 1.75
V[s_prim_single] 0.08625
V[s_prim_single] 1.75
V[s_prim_single] 0.08625
{'s0': 0.05821875, 's1': 2.60491703125, 's2': 0.08625}
V[s_prim_single] 0.05821875
V[s_prim_single] 0.08625
V[s_prim_single] 0.05821875
V[s_prim_single] 2.60491703125
V[s_prim_single] 0.08625
{'s0': 0.05821875, 's1': 2.60491703125, 's2': 0.25881520546875}
V[s_prim_single] 0.05821875
V[s_prim_single] 0.25881520546875
V[s_prim_single] 0.25881520546875
{'s0': 0.18779948244140626, 's1': 2.60491703125, 's

## Zadanie 2 - Iteracyjne doskonalenie strategii (*Policy iteration*)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego doskonalenia strategii. Zadaniem algorytmu jest wyznaczenie optymalnej strategii dla danego środowiska. Pierwszym krokiem algorytmu jest zainicjalizowanie strategii losowymi akcjami, następnie jej ocenienie oraz poprawa, zgodnie ze wzorem:
\begin{equation}
    \pi(s) \leftarrow  \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')]
\end{equation}
Jeżeli poprawiona strategia będzie się różniła od poprzedniej, należy ją ponownie ocenić i poprawić. Algorytm kończy działanie w momencie kiedy poprawiona strategia będzie dokładnie taka sama, jak ta poddawana poprawie.
</p>

Zaimplementuj funkcję do oceny strategii, tylko tym razem przyjmij, że strategia przekazywana na wejściu będzie deteministyczna 

In [25]:
def deterministic_policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified deterministic policy for the specified MDP:

        'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


        'policy' - the deterministic policy (action probability for each state), for the given mdp, too evaluate
        'gamma' - discount factor for MDP
        'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                  smaller than theta
    """
    V = dict()
    all_states = mdp.get_all_states()

    for s in all_states:
        V[s] = 0

    while True:
        dt = 0
        for state in all_states:
            v_od_s = 0
            possible_actions = mdp.get_possible_actions(state)
            for action in possible_actions:
                action_probability = 1 if policy[state] is action else 0
                action_val = 0
                s_prim_coll = mdp.get_next_states(state, action) 
                for s_prim_single in s_prim_coll.keys():
                    next_state_probability = s_prim_coll[s_prim_single]
                    reward = mdp.get_reward(state, action, s_prim_single)
                    next_state_values = next_state_probability * ( reward + gamma * V[s_prim_single] )
                    action_val += next_state_values
                v_od_s += action_probability * action_val
            dt = max(dt, abs(v_od_s - V[state]))
            V[state] = v_od_s
        if dt < theta:
            break

    return V


Zaimplementuj funkcję do poprawy strategii `policy` na podstawie funkcji oceny `value_function` wyznaczonej dla niej 

In [26]:
# for every state get the best action
def policy_improvement(mdp, policy, value_function, gamma):
    """
            This function improves specified deterministic policy for the specified MDP using value_function:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'policy' - the deterministic policy (action for each state), for the given mdp, too improve.
           'value_function' - the value function, for the given policy.
            'gamma' - discount factor for MDP

           Function returns False if policy was improved or True otherwise
       """
    poicy_im = True
    strategy_per_action = {}

    for state in mdp.get_all_states():
        actions = mdp.get_possible_actions(state)
        action_values = {}
        best_value = float('-inf')
        for action in actions:
            specific_action_value = 0
            next_states = mdp.get_next_states(state, action).keys()
            for next_state in mdp.get_next_states(state, action).keys():
                specific_action_value += mdp.get_next_states(state, action)[next_state] * (mdp.get_reward(state, action, next_state) + gamma * value_function[next_state])
            action_values[action] = specific_action_value
            best_value = max(specific_action_value, best_value)
        for action, value in action_values.items():
            if value == best_value:
                if action is not policy[state]:
                    poicy_im = False
                policy[state] = action
                break

    print('Finish improv')
    return poicy_im

In [32]:
 best_value = float('-inf')

In [34]:
 type(best_value)

float

Zaimplementuj funkcję do iteracyjnego doskonalenia strategii

In [27]:
def policy_iteration(mdp, gamma, theta):

    """
            This function calculate optimal policy for the specified MDP:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'gamma' - discount factor for MDP
           'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
                      than theta
           Function returns optimal policy and value function for the policy
       """

    policy = dict()
    print(1)

    for s in states:
        print(2)
        actions = mdp.get_possible_actions(s)
        policy[s] = actions[0]

    V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

    print(3)
    policy_stable = False

    while not policy_stable:
        print(4)
        policy_stable = policy_improvement(mdp, policy, V, gamma)
        V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [28]:
optimal_policy, optimal_value = policy_iteration(mdp, 0.9, 0.001)

print(mdp.get_all_states())
for s in mdp.get_all_states():
    print(mdp.get_possible_actions(s))
print(optimal_policy)
assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

print(optimal_value['s0'])

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

1
2
2
2
3
4
Finish improv
4
Finish improv
4
Finish improv
['s0', 's1', 's2']
('a0', 'a1')
('a0', 'a1')
('a0', 'a1')
{'s0': 'a1', 's1': 'a0', 's2': 'a1'}
3.785366128142998


## Zadanie 3 - Iteracyjne obliczanie funkcji wartości (*Value iteration*)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego obliczania funkcji wartości. Algorytm ten łączy w sobie dwa wyżej wspomniane podejścia oceny i wyznaczania optymalnej strategii. Najpierw wyznaczana jest optymalna funkcja wartości stanu zgodnie ze wzorem:
\begin{equation}
    V(s) \leftarrow  \max_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
Po wyznaczeniu optymalnej funkcji wartości dla każdego stanu określana jest strategia postępowania w każdym możliwym stanie zgodnie ze wzorem:
\begin{equation}
    \pi(s) = \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
</p>

Implementacja algorytmu do iteracyjnego obliczania funkcji wartości

In [29]:
def value_iteration(mdp, gamma, theta):
    """
            This function calculate optimal policy for the specified MDP using Value Iteration approach:

            'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                get_reward - return the reward after taking action in state and landing on next_state


            'gamma' - discount factor for MDP
            'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                      smaller than theta
            Function returns optimal policy and value function for the policy
       """   
    V = dict()
    policy = dict()

    # init with a policy with first avail action for each state
    for current_state in mdp.get_all_states():
        V[current_state] = 0
        policy[current_state] = actions[0]
        
        
    while True:
        dt = 0
        for state in mdp.get_all_states():
            v_od_s = 0
            possible_actions = mdp.get_possible_actions(state)
            v_od_s_sum = []
            for action in possible_actions:
                action_val = 0
                s_prim_coll = mdp.get_next_states(state, action) 
                for s_prim_single in s_prim_coll.keys():
                    next_state_probability = s_prim_coll[s_prim_single]
                    reward = mdp.get_reward(state, action, s_prim_single)
                    next_state_values = next_state_probability * ( reward + gamma * V[s_prim_single] )
                    action_val += next_state_values
                v_od_s_sum.append(action_val)
            v_od_s = max(v_od_s_sum)
            dt = max(dt, abs(v_od_s - V[state]))
            V[state] = v_od_s
        if dt < theta:
            break
    
    # strategia - brakujaca czesc
    while True:
        dt = 0
        for state in mdp.get_all_states():
            v_od_s = 0
            possible_actions = mdp.get_possible_actions(state)
            state_temp = V[state]
            for action in possible_actions:
                action_val = 0
                s_prim_coll = mdp.get_next_states(state, action) 
                for s_prim_single in s_prim_coll.keys():
                    next_state_probability = s_prim_coll[s_prim_single]
                    reward = mdp.get_reward(state, action, s_prim_single)
                    next_state_values = next_state_probability * ( reward + gamma * V[s_prim_single] )
                    action_val += next_state_values
                if action_val > V[state]:
                    policy[state] = action
            dt = max(dt, abs(state_temp - V[state]))
        if dt < theta:
            break
        

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [27]:
optimal_policy, optimal_value = value_iteration(mdp, 0.9, 0.001)

print(optimal_policy)
assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'


print(optimal_value['s0'])
print(optimal_value['s1'])
print(optimal_value['s2'])
assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

{'s0': 'a1', 's1': 'a0', 's2': 'a1'}
3.785366128142998
7.298653645273432
4.206831790079636
