## Zaawansowane Metody Inteligencji Obliczeniowej

## Wprowadzenie

Całe zadanie jest oparte o różne wersje środowiska `FrozenLake` ze znanej biblioteki OpenAI Gym (https://gym.openai.com), która agreguje różnego rodzaju środowiska pod postacią jednego zunifikowanego API.

In [None]:
!pip install gym



In [None]:
# Zaimportuj środowisko FrozenLake z OpenAI Gym
from gym.envs.toy_text.frozen_lake import FrozenLakeEnv 

# Stwórzmy deterministyczne (`is_slippper=False`) środowisko w oparciu o jedną z zpredefiniowanych map (`map_name="4x4"`)
env = FrozenLakeEnv(map_name="4x4", is_slippery=False) 

# Po stworzeniu środowiska musimy je zresetować 
env.reset()
# W każdym momencie możemy wyświetlić stan naszego środowiska przy użyciu fukcji `render`
env.render()

In [None]:
from pprint import pprint

print("Przestrzeń akcji: ", env.action_space) # Dyskretne akcje od 0 do 3: LEFT = 0, DOWN = 1, RIGHT = 2, UP = 3
print("Przestrzeń obserwacji: ", env.observation_space) # Dyskretne stany od 0 do 15
print("Opis środowiska (mapa):")
print(env.desc)
print("Model przejść w środowisku:")
pprint(env.P) # gdzie P[s][a] == [(probability, nextstate, reward, done), ...]
print("Aktualny stan: ", env.s)

In [None]:
# Nasz agent może wejść w interakcje ze środowiskiem  poprzez wywołanie funkcji `step(action)`, 
# gdzie `action` to jedna z możliwych akcji (int od 0 do env.action_space.n - 1)
s = env.reset() # `reset()` zwraca początkowy stan
env.render()
for i in range(5):
    # Wybierzmy losową akcje
    random_a = env.action_space.sample() 
    # `step(action)` zwraca nowy stan (`s`), nagrodę (`r`), informację czy stan jest terminalny (`term`) 
    # oraz dodatkowe informacje, które pomijamy
    # w tym wypadku nowy stan to jedynie id, ale dla innych środowisk może być to inny typ reprezentujący obserwację
    s, r, term, _ = env.step(random_a) 
    env.render()
    if term:
        break

## Zad 1 - Policy iteration + value iteration

In [None]:
import numpy as np

def policy_evaluation(policy, P, discount_factor=1.0, delta_min=0.0001):
    V = [0] * len(policy)
    while(True):
        delta = 0
        for state in range(len(policy)):
            v = V[state]
            V[state] = 0
            for probability, next_state, reward, is_terminal_state in P[state][policy[state]]:
                V[state] += probability * (reward + discount_factor * V[next_state])
            delta = max(delta, np.abs(v - V[state]))
        if delta < delta_min:
            break
    return np.array(V)

def policy_iteration(P, gamma, delta=0.001):
    """
    Argumenty:
        P - model przejścia, gdzie P[s][a] == [(prawdopodobieństwo, kolejny stan, nagroda, czy stan terminalny), ...]
        gamma - współczynnik dyskontujący
        delta - tolerancja warunku stopu
    Zwracane wartości:
        V - lista o długości len(P) zawierający oszacowane wartość stanu s: V[s]
        pi - lista o długości len(P) zawierający wyznaczoną deterministyczną politykę - akcję dla stanu s: pi[s]
        i - ilość iteracji algorytmu po wszystkich stanach
    """
    V = [0] * len(P)
    pi = [0] * len(P)
    i = 0
    
    # Miejsce na twoją implementację
    policy = np.zeros(len(P), dtype=int)
    while True:
        V = policy_evaluation(pi, P, discount_factor=gamma)
        policy_stable = True
        for state in range(len(P)):
            chosen_action = pi[state]
            action_values = np.zeros(4)
            for action in P[state]:
                for probability, next_state, reward, is_terminal_state in P[state][action]:
                    action_values[action] += probability * (reward + gamma * V[next_state])
            best_action = np.argmax(action_values)
            if chosen_action != best_action:
                policy_stable = False
            policy[state] = best_action
        i += 1
        if policy_stable:
            break
    return V, pi, i

In [None]:
import numpy as np

def value_iteration(P, gamma, delta=0.0001):
    """
    Argumenty:
        P - model przejścia, gdzie P[s][a] == [(prawdopodobieństwo, kolejny stan, nagroda, czy stan terminalny), ...]
        gamma - współczynnik dyskontujący
        delta - tolerancja warunku stopu
    Zwracane wartości:
        Q - lista o długości len(P) zawierający listy z oszacowanymi wartościami dla stanu s i akcji a: Q[s][a]
        pi - lista o długości len(P) zawierający wyznaczoną deterministyczną politykę - akcję dla stanu s: pi[s]
        i - ilość iteracji algorytmu po wszystkich stanach
    """
    pi = [0] * len(P)
    Q = [[0] * len(P[s]) for s in P.keys()]
    V = [0] * len(P)
    i = 0

    # Miejsce na twoją implementację
    while True:
        delta_temp = 0
        for state in range(len(P)):
            v = V[state]
            for action in P[state]:
                Q[state][action] = 0
                for probablitiy, next_state, reward, is_terminal_state in P[state][action]:
                    Q[state][action] += probablitiy * (reward + gamma * V[next_state])
            V[state] = max(Q[state])
            delta_temp = max(delta_temp, np.abs(v - V[state]))
        i += 1
        if delta_temp < delta:
            break
    
    for state in range(len(P)):
        pi[state] = np.argmax(Q[state])

    return Q, pi, i+1

In [None]:
# Przykładowy kod do testowania zaimplementowanych metod

# Zaimportuj generator map dla środowiska FrozenLake z OpenAI Gym
from gym.envs.toy_text.frozen_lake import generate_random_map

# Wygeneruj losową mapę jeziora o zadanym rozmiarze (`size=`)
lake_map = generate_random_map(size=8)

# Stwórz środowisko w oparciu o wygenerowaną mapę, 
# sprawdz deterministyczną (`is_slippery=False`) jak i stochastyczną wersję środowiska (`is_slippery=True`)
env = FrozenLakeEnv(desc=lake_map, is_slippery=False) 
env.render()

env = FrozenLakeEnv(map_name="8x8", is_slippery=False) 
env.render()

In [None]:
V, pi1, i1 = policy_iteration(env.P, 0.9)
Q, pi2, i2 = value_iteration(env.P, 0.9)

In [None]:
# Wprowadzmy teraz funkcję, które empirycznie zewauluje naszą politykę
# po prostu rozgrywając odpowiednią liczbę episodów zgodnie z naszą polityką.
def evaluate_empirically(env, pi, episodes=1000, max_actions=100):
    mean_r = 0
    for e in range(episodes):
        s = env.reset()
        total_r = 0
        for _ in range(max_actions): # Na wypadek polityki, która nigdy nie dojdzie od stanu terminalnego
            s, r, final, _ = env.step(pi[s])
            total_r += r
            if final:
                break
        mean_r += 1/(e + 1) * (total_r - mean_r)
    return mean_r       

In [None]:
evaluate_empirically(env, pi2)

1.0

## Zad 2 - On-policy Monte Carlo dla polityki epsilon-greedy.

In [None]:
def on_policy_eps_greedy_monte_carlo(env, eps, gamma):
    """
    Argumenty:
        env - środowisko implementujące metody `reset()` oraz `step(action)`
        eps - współczynnik eksploracji
        gamma - współczynnik dyskontujący
    Zwracane wartości:
        Q - lista o długości len(P) zawierający listy z oszacowanymi wartościami dla stanu s i akcji a: Q[s][a]
        pi - lista o długości len(P) zawierający wyznaczoną deterministyczną (zachłanną) politykę - akcję dla stanu s: pi[s]
        i - ilość epizodów wygenerowanych przez algorytm
    """
    # Miejsce na twoją implementację

    pi = np.ones((env.nS, env.nA))
    Q = np.zeros((env.nS, env.nA))
    returns_list = [[[] for _ in range(env.nA)] for s in env.P.keys()]

    i = 0
    while i < 5000:
        i += 1

        state = env.reset()
        episode = []

        while True:
            probability = [pi[state][action] for action in range(env.nA)]
            probability = np.divide(probability, sum(probability))\

            possible_actions = [action for action in range(env.nA)]
            action = np.random.choice(possible_actions, p = probability)

            next_state, reward, term, _ = env.step(action)
            episode.append((state, action, reward))
            state = next_state

            if term:
                break
        
        G = 0
        for index, (state,action,reward) in enumerate(reversed(episode)):
            G = reward + gamma * G

            if (state,action) not in [x[0:2] for x in episode[:-index - 1]]:
                returns_list[state][action].append(G)
                Q[state][action] = np.mean(returns_list[state][action])
                A_star = np.argmax(Q[state])

                for a in range(env.nA):
                    if a == A_star:
                        pi[state][a] = 1 - eps + eps / env.nA
                    else:
                        pi[state][a] = eps / env.nA

    return Q, np.argmax(pi, axis=1), i

def modify_environment(env):
    for state in env.P:
        for action in env.P[state]:
            for prob, next_state, reward, done in env.P[state][action]:
                if reward == 0.0 and done == True:
                    env.P[state][action] = [(prob, next_state, -5.0, done)]
                if reward == 1.0:
                    env.P[state][action] = [(prob, next_state, 5.0, done)]
                if reward == 0.0:
                    env.P[state][action] = [(prob, next_state, -0.1, done)]

Q3, pi3, i3 = on_policy_eps_greedy_monte_carlo(env, 0.9, 0.9)
evaluate_empirically(env, pi3)

1.0