# Laboratorium 1 (1 pkt)

Celem pierwszego laboratorium jest zapoznanie się oraz zaimplementowanie algorytmów uczenia pasywnego. Zaimplementowane algorytmy będą testowane z wykorzystaniem wcześniej przygotowanego środowiska przedstawionego na schemacie poniżej:
![MDP, Markov Decision Process](assets/images/mdp.png)

Dołączenie biblioteki ze środowiskiem

In [19]:
from env.simpleMDP import simpleMDP
import numpy as np

Zapoznanie się z przygotowanym środowiskiem

In [20]:
mdp = simpleMDP()

states = mdp.get_all_states()
print(states)

for s in states:
    actions = mdp.get_possible_actions(s)
    for a in actions:
        next_states = mdp.get_next_states(s, a)
        print("State: " + s + " action: " + a + " " + "list of possible next states: ", next_states)


['s0', 's1', 's2']
State: s0 action: a0 list of possible next states:  {'s0': 0.5, 's2': 0.5}
State: s0 action: a1 list of possible next states:  {'s2': 1}
State: s1 action: a0 list of possible next states:  {'s0': 0.7, 's1': 0.1, 's2': 0.2}
State: s1 action: a1 list of possible next states:  {'s1': 0.95, 's2': 0.05}
State: s2 action: a0 list of possible next states:  {'s0': 0.4, 's2': 0.6}
State: s2 action: a1 list of possible next states:  {'s0': 0.3, 's1': 0.3, 's2': 0.4}


## Zadanie 1 - Ocena strategii (*Policy evaluation*) (0.3 pkt)

<p style='text-align: justify;'>
Celem ćwiczenie jest zaimplementowanie algorytmu oceny strategii. Algorytm wyznacza funkcję oceny (wartości) strategii wykorzystując równanie Bellmana. Odbywa się to w sposób iteracyjny zgodnie ze wzorem:
\begin{equation}
	        V_{k + 1}(s) = \sum_a \pi(a | s) \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V_k(s')]
\end{equation}
</p>

Pierwszym krokiem jest zainicjalizowanie strategii, która będzie podlegała ocenie. W tym zadaniu ocenie będzie podlegała strategia stochastyczna, w której będzie prawdopodobieństwo wyboru każdej z możliwych opcji będzie takie samo.

In [21]:
policy = dict()

for s in states:
    actions = mdp.get_possible_actions(s)
    action_prob = 1 / len(actions)
    policy[s] = dict()
    for a in actions:
        policy[s][a] = action_prob

print(list(policy.values()))

[{'a0': 0.5, 'a1': 0.5}, {'a0': 0.5, 'a1': 0.5}, {'a0': 0.5, 'a1': 0.5}]


In [22]:
for state in policy:
    for action in policy[state]:
        print(policy[state][action])

0.5
0.5
0.5
0.5
0.5
0.5


In [23]:
list(mdp.get_next_states('s0', 'a0').values())[0]

0.5

In [24]:
V_one = {'s0': 0, 's1': 0, 's2': 0}
V_two = {'s0': 0, 's1': 0, 's2': 0}

list(V_two.values())[-1]

0

In [25]:
next_states = mdp.get_next_states('s0', 'a0')
for state in next_states:
    print(type(state))

<class 'str'>
<class 'str'>


Implementacja algorytmu oceny strategii - podejście z dwiema tablicami

In [None]:
def policy_eval_two_arrays(mdp, policy, gamma, theta):
    """
    This function uses the two-array approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V_one = dict()
    V_two = dict()

    for state in mdp.get_all_states():
        V_one[state] = 0
        V_two[state] = 0

    turn = 0

    while True:

        for state in mdp.get_all_states():

            starting_value = 0

            for action in policy[state]:
                next_states = mdp.get_next_states(state, action)
                other_value = 0

                for next_state in next_states:
                    if not turn % 2:
                        other_value += next_states[next_state] * (mdp.get_reward(state, action, next_state) + gamma * V_one[next_state])
                    else:
                        other_value += next_states[next_state] * (mdp.get_reward(state, action, next_state) + gamma * V_two[next_state])

                starting_value += policy[state][action] * other_value

            if not turn % 2:
                V_two[state] = starting_value
            else:
                V_one[state] = starting_value

        turn += 1

        if abs(list(V_two.values())[-1] - list(V_one.values())[-1]) < theta:
            if not turn % 2:
                return V_one
            return V_two

Sprawdzenie poprawności zaimplementowanego algorytmu

In [None]:
V = policy_eval_two_arrays(mdp, policy, 0.9, 0.0001)

assert np.isclose(V['s0'], 1.46785443374683)
assert np.isclose(V['s1'], 4.55336594491180)
assert np.isclose(V['s2'], 1.68544141660991)

Implementacja algorytmu oceny strategii - obliczenia w miejscu

In [None]:
def policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified policy for the specified MDP:

    'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


    'policy' - the stochastic policy (action probability for each state), for the given mdp, too evaluate.
    'gamma' - discount factor for MDP
    'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
              than theta
    """
    V = dict()

    for state in mdp.get_all_states():
        V[state] = 0

    while True:
        last_value = list(V.values())[-1]
        for state in mdp.get_all_states():

            starting_value = 0
            for action in policy[state]:
                next_states = mdp.get_next_states(state, action)
                other_value = 0

                for next_state in next_states:
                        other_value += next_states[next_state] * (mdp.get_reward(state, action, next_state) + gamma * V[next_state])

                starting_value += policy[state][action] * other_value

            V[state] = starting_value


        if abs(list(V.values())[-1] - last_value) < theta:
            return V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [None]:
V = policy_eval_in_place(mdp, policy, 0.9, 0.0001)
print(V)
assert np.isclose(V['s0'], 1.4681508097651)
assert np.isclose(V['s1'], 4.5536768132712)
assert np.isclose(V['s2'], 1.6857723431614)

## Zadanie 2 - Iteracyjne doskonalenie strategii (*Policy iteration*) (0.3 pkt)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego doskonalenia strategii. Zadaniem algorytmu jest wyznaczenie optymalnej strategii dla danego środowiska. Pierwszym krokiem algorytmu jest zainicjalizowanie strategii losowymi akcjami, następnie jej ocenienie oraz poprawa, zgodnie ze wzorem:
\begin{equation}
    \pi(s) \leftarrow  \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')]
\end{equation}
Jeżeli poprawiona strategia będzie się różniła od poprzedniej, należy ją ponownie ocenić i poprawić. Algorytm kończy działanie w momencie kiedy poprawiona strategia będzie dokładnie taka sama, jak ta poddawana poprawie.
</p>

Zaimplementuj funkcję do oceny strategii, tylko tym razem przyjmij, że strategia przekazywana na wejściu będzie deteministyczna 

In [None]:
def deterministic_policy_eval_in_place(mdp, policy, gamma, theta):
    """
    This function uses the in-place approach to evaluate the specified deterministic policy for the specified MDP:

        'mdp' - model of the environment, use following functions:
        get_all_states - return list of all states available in the environment
        get_possible_actions - return list of possible actions for the given state
        get_next_states - return list of possible next states with a probability for transition from state by taking
                          action into next_state
        get_reward - return the reward after taking action in state and landing on next_state


        'policy' - the deterministic policy (action probability for each state), for the given mdp, too evaluate
        'gamma' - discount factor for MDP
        'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                  smaller than theta
    """
    V = dict()

    for state in mdp.get_all_states():
        V[state] = 0

    while True:
        last_value = list(V.values())[-1]
        for state in mdp.get_all_states():

            starting_value = 0
            action = policy[state]
            next_states = mdp.get_next_states(state, action)

            for next_state in next_states:
                    starting_value += next_states[next_state] * (mdp.get_reward(state, action, next_state) + gamma * V[next_state])

            V[state] = starting_value


        if abs(list(V.values())[-1] - last_value) < theta:
            return V

Zaimplementuj funkcję do poprawy strategii `policy` na podstawie funkcji oceny `value_function` wyznaczonej dla niej 

In [None]:
def policy_improvement(mdp, policy, value_function, gamma):
    """
            This function improves specified deterministic policy for the specified MDP using value_function:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'policy' - the deterministic policy (action for each state), for the given mdp, too improve.
           'value_function' - the value function, for the given policy.
            'gamma' - discount factor for MDP

           Function returns False if policy was improved or True otherwise
       """

    policy_stable = True


    for state in mdp.get_all_states():

        state_action_values = dict()
        actions = mdp.get_possible_actions(state)

        for action in actions:

            state_action_values[action] = 0
            next_states = mdp.get_next_states(state, action)

            for next_state in next_states:
                state_action_values[action] += next_states[next_state] * (mdp.get_reward(state, action, next_state) + gamma * value_function[next_state])


        max_value_action = max(state_action_values, key=state_action_values.get)

        if policy[state] != max_value_action:
            policy_stable = False
            policy[state] = max_value_action


    return policy_stable

Zaimplementuj funkcję do iteracyjnego doskonalenia strategii

In [None]:
def policy_iteration(mdp, gamma, theta):

    """
            This function calculate optimal policy for the specified MDP:

           'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                
                get_reward - return the reward after taking action in state and landing on next_state


           'gamma' - discount factor for MDP
           'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is smaller
                      than theta
           Function returns optimal policy and value function for the policy
       """

    policy = dict()

    for s in states:
        actions = mdp.get_possible_actions(s)
        policy[s] = actions[0]

    V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)
    policy_stable = False

    while not policy_stable:
        policy_stable = policy_improvement(mdp, policy, V, gamma)
        V = deterministic_policy_eval_in_place(mdp, policy, gamma, theta)

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [None]:
optimal_policy, optimal_value = policy_iteration(mdp, 0.9, 0.001)
assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)

## Zadanie 3 - Iteracyjne obliczanie funkcji wartości (*Value iteration*) (0.4 pkt)

<p style='text-align: justify;'>
Celem ćwiczenia jest zaimplementowanie algorytmu iteracyjnego obliczania funkcji wartości. Algorytm ten łączy w sobie dwa wyżej wspomniane podejścia oceny i wyznaczania optymalnej strategii. Najpierw wyznaczana jest optymalna funkcja wartości stanu zgodnie ze wzorem:
\begin{equation}
    V(s) \leftarrow  \max_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
Po wyznaczeniu optymalnej funkcji wartości dla każdego stanu określana jest strategia postępowania w każdym możliwym stanie zgodnie ze wzorem:
\begin{equation}
    \pi(s) = \text{argmax}_a \sum_{s'} \sum_r p(s', r|s, a)[r + \gamma V(s')].
\end{equation}
</p>

Implementacja algorytmu do iteracyjnego obliczania funkcji wartości

In [None]:
def value_iteration(mdp, gamma, theta):
    """
            This function calculate optimal policy for the specified MDP using Value Iteration approach:

            'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                get_reward - return the reward after taking action in state and landing on next_state


            'gamma' - discount factor for MDP
            'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                      smaller than theta
            Function returns optimal policy and value function for the policy
       """

    V = dict()
    for state in mdp.get_all_states():
        V[state] = 0

    last_value = list(V.values())[-1]
    policy = dict()
    for current_state in mdp.get_all_states():
        policy[current_state] = mdp.get_possible_actions(current_state)[0]

    # init with a policy with first avail action for each state

    while True:
        last_value = list(V.values())[-1]
        for current_state in mdp.get_all_states():
            actions = mdp.get_possible_actions(current_state)
            state_action_values = dict()

            for action in actions:

                state_action_values[action] = 0
                next_states = mdp.get_next_states(current_state, action)

                for next_state in next_states:
                    state_action_values[action] += next_states[next_state] * (mdp.get_reward(current_state, action, next_state) + gamma * V[next_state])

            V[current_state] = max(list(state_action_values.values()))
        if abs(list(V.values())[-1] - last_value) < theta:
            break

    for current_state in mdp.get_all_states():

        state_action_values = dict()
        actions = mdp.get_possible_actions(current_state)

        for action in actions:

            state_action_values[action] = 0
            next_states = mdp.get_next_states(current_state, action)

            for next_state in next_states:
                state_action_values[action] += next_states[next_state] * (mdp.get_reward(current_state, action, next_state) + gamma * V[next_state])

        max_value_action = max(state_action_values, key=state_action_values.get)

        if policy[current_state] != max_value_action:
            policy[current_state] = max_value_action

    return policy, V

In [None]:
def value_iteration(mdp, gamma, theta):
    """
            This function calculate optimal policy for the specified MDP using Value Iteration approach:

            'mdp' - model of the environment, use following functions:
                get_all_states - return list of all states available in the environment
                get_possible_actions - return list of possible actions for the given state
                get_next_states - return list of possible next states with a probability for transition from state by taking
                                  action into next_state
                get_reward - return the reward after taking action in state and landing on next_state


            'gamma' - discount factor for MDP
            'theta' - algorithm should stop when minimal difference between previous evaluation of policy and current is
                      smaller than theta
            Function returns optimal policy and value function for the policy
       """



    V = dict()

    for state in mdp.get_all_states():
        V[state] = 0

    last_value = list(V.values())[-1]
    policy = dict()
    for current_state in mdp.get_all_states():
        policy[current_state] = mdp.get_possible_actions(current_state)[0]

    # init with a policy with first avail action for each state

    while True:
        last_value = list(V.values())[-1]
        for current_state in mdp.get_all_states():
            actions = mdp.get_possible_actions(current_state)
            state_action_values = dict()

            for action in actions:

                state_action_values[action] = 0
                next_states = mdp.get_next_states(current_state, action)

                for next_state in next_states:
                    state_action_values[action] += next_states[next_state] * (mdp.get_reward(current_state, action, next_state) + gamma * V[next_state])

            V[current_state] = max(list(state_action_values.values()))
        if abs(list(V.values())[-1] - last_value) < theta:
            break

    for current_state in mdp.get_all_states():

        state_action_values = dict()
        actions = mdp.get_possible_actions(current_state)

        for action in actions:

            state_action_values[action] = 0
            next_states = mdp.get_next_states(current_state, action)

            for next_state in next_states:
                state_action_values[action] += next_states[next_state] * (mdp.get_reward(current_state, action, next_state) + gamma * V[next_state])

        max_value_action = max(state_action_values, key=state_action_values.get)

        if policy[current_state] != max_value_action:
            policy[current_state] = max_value_action

    return policy, V

Sprawdzenie poprawności zaimplementowanego algorytmu

In [36]:
optimal_policy, optimal_value = value_iteration(mdp, 0.9, 0.001)

assert optimal_policy['s0'] == 'a1'
assert optimal_policy['s1'] == 'a0'
assert optimal_policy['s2'] == 'a1'

assert np.isclose(optimal_value['s0'], 3.78536612814300)
assert np.isclose(optimal_value['s1'], 7.29865364527343)
assert np.isclose(optimal_value['s2'], 4.20683179007964)