**Autor:** Diego Mendieta Bustos

**Fecha:** 01 de julio de 2021

# Funciones

In [None]:
from copy import deepcopy


def valueDifference(v1, v2):
    """
    Retorna la suma de las diferencias absolutas entre elementos de dos listas.
    """
    assert len(v1) == len(v2)

    sum_ = 0
    for idx in range(len(v1)):
        sum_ += abs(v1[idx] - v2[idx])
    return sum_


def valueIteration(V, env, gamma=0.99, epsilon=0.1):
    """
    Corre el algoritmo de Value Iteration. Aplicado a FrozenLake8x8 (con espacio
    de observaciones y de acciones discretos).
    """
    obs_space_size = env.observation_space.n
    action_space_size = env.action_space.n
    transition_probs = env.P

    Q = [
         [None for action in range(action_space_size)] 
         for state in range(obs_space_size)
    ]

    last_V = deepcopy(V)
    while True:
        for state in range(obs_space_size):
            for action in range(action_space_size):
                Q[state][action] = 0

                for outcome in transition_probs[state][action]:
                    prob, new_state, reward, done = outcome
                    if done:
                        Q[state][action] += prob * (reward)
                    else:
                        Q[state][action] += prob * (reward + gamma * V[new_state])

            V[state] = max(Q[state])
        
        if valueDifference(V, last_V) > epsilon:
            last_V = deepcopy(V)
        else:
            break
    return Q, V

In [None]:
# https://stackoverflow.com/questions/16945518/finding-the-index-of-the-value-which-is-the-min-or-max-in-python
def argmax(iterable):
    """
    Retorna el índice del máximo de una lista.
    """
    return max(enumerate(iterable), key=lambda x: x[1])[0]


def retrievePolicy(Q):
    """
    Recupera la política a partir de la tabla Q.
    """
    policy = [None for _ in range(len(Q))]

    for state, action_values in enumerate(Q):
        best_action = argmax(action_values)
        policy[state] = best_action
    return policy

In [None]:
def policyIteration(env, policy=None, gamma=0.99, epsilon=0.01):
    """
    Aplica el algoritmo de Policy Iteration al ambiente. Programado para 
    FrozenLake.
    """
    obs_space_size = env.observation_space.n
    action_space_size = env.action_space.n

    if policy is None:
        policy = [np.random.randint(action_space_size) for _ in range(obs_space_size)]
    
    V = None
    while True:
        V = policyEvaluation(env, policy, gamma, epsilon, V)
        new_policy = policyImprovement(env, V, gamma)

        if new_policy == policy:
            return V, policy
        policy = new_policy
        

def policyEvaluation(env, policy, gamma=0.99, epsilon=0.01, V=None):
    """
    """
    obs_space_size = env.observation_space.n
    transition_probs = env.P

    if V is None:
        V = list(np.random.randn(obs_space_size))
    
    last_V = deepcopy(V)
    while True:
        for state in range(obs_space_size):
            action = policy[state]

            val = 0
            for outcome in transition_probs[state][action]:
                prob, new_state, reward, done = outcome

                val += prob * (reward + gamma * V[new_state] * (1 -done))
            V[state] = val
        
        if valueDifference(V, last_V) > epsilon:
            last_V = deepcopy(V)
        else:
            return V


def policyImprovement(env, V, gamma=0.99):
    """
    """
    obs_space_size = env.observation_space.n
    action_space_size = env.action_space.n
    transition_probs = env.P

    policy = [None for _ in range(obs_space_size)]

    for state in range(obs_space_size):
        best_action = None
        best_value = -float("inf")

        for action in range(action_space_size):

            value = 0
            for outcome in transition_probs[state][action]:
                prob, new_state, reward, done = outcome

                value += prob * (reward + gamma * V[new_state] * (1 - done))
            
            if value > best_value:
                best_value = value
                best_action = action

        policy[state] = best_action
    return policy

In [None]:
def visualizePolicy(env, policy, sleep_time=0.5, final_state=63):
    """
    Genera una visualización de la aplicación de una política.
    """
    obs = env.reset()

    step_count = 0
    while True:
        action = policy[obs]
        obs, rew, done, prob = env.step(action)

        step_count += 1
        
        env.render()
        print()
        sleep(sleep_time)

        if done:
            if obs == final_state:
                print(f"Has ganado en {step_count} pasos! :)")
                return True
            
            print("Has perdido :(")
            print(f"Pasos realizados: {step_count}")
            return False

In [None]:
gamma = 0.99
epsilon = 0.0001

# Implementación

## Visualización inicial

In [None]:
import numpy as np
import gym


game = "FrozenLake8x8-v0"
is_slippery = False

env = gym.make(game, is_slippery=is_slippery)

In [None]:
print(f"Action space:   {env.action_space}")
print(f"Obs space:      {env.observation_space}")

Action space:   Discrete(4)
Obs space:      Discrete(64)


In [None]:
env.render()


[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG


In [None]:
positions = np.array([[i + j for i in range(8)] for j in range(0, 64, 8)])
print(positions)

[[ 0  1  2  3  4  5  6  7]
 [ 8  9 10 11 12 13 14 15]
 [16 17 18 19 20 21 22 23]
 [24 25 26 27 28 29 30 31]
 [32 33 34 35 36 37 38 39]
 [40 41 42 43 44 45 46 47]
 [48 49 50 51 52 53 54 55]
 [56 57 58 59 60 61 62 63]]


In [None]:
env.reset()
for action in range(env.action_space.n):
    env.step(action)
    env.render()
    print()

  (Left)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
F[41mF[0mFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41mF[0mFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG



In [None]:
actions = {
    "Left": 0, 
    "Down": 1, 
    "Right": 2, 
    "Up": 3
}

In [None]:
# [env.P[s][a]]: P(s|a) = P(s'), s', r, done

print(f"0: {env.P[0]}")
print(f"1: {env.P[1]}")
print(f"2: {env.P[2]}")

0: {0: [(1.0, 0, 0.0, False)], 1: [(1.0, 8, 0.0, False)], 2: [(1.0, 1, 0.0, False)], 3: [(1.0, 0, 0.0, False)]}
1: {0: [(1.0, 0, 0.0, False)], 1: [(1.0, 9, 0.0, False)], 2: [(1.0, 2, 0.0, False)], 3: [(1.0, 1, 0.0, False)]}
2: {0: [(1.0, 1, 0.0, False)], 1: [(1.0, 10, 0.0, False)], 2: [(1.0, 3, 0.0, False)], 3: [(1.0, 2, 0.0, False)]}


## Aplicación determinística

In [None]:
is_slippery = False
env = gym.make(game, is_slippery=is_slippery)

### Value iteration

In [None]:
from time import time


np.random.seed(0)

V = [round(np.random.randn(), 2) for _ in range(env.observation_space.n)]

tic = time()
Q, V = valueIteration(V, env, gamma, epsilon)
policy = retrievePolicy(Q)
toc = time()

print(f"Value iteration finalizó en {toc - tic:.2f} segundos.")

Value iteration finalizó en 0.03 segundos.


In [None]:
from time import sleep

sleep_time = 0.5

visualize = True
if visualize:
    visualizePolicy(env, policy, sleep_time);

  (Down)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
[41mF[0mFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
[41mF[0mFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
F[41mF[0mFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FF[41mF[0mFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFF[41mF[0mFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFF[41mF[0mHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFH[41mF[0mFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHF[41mF[0mFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFF[41mF[0mHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFH[41mF[0mHF
FFFH

### Policy iteration

In [None]:
from time import time

tic = time()
V, policy = policyIteration(env, gamma=gamma, epsilon=epsilon)
toc = time()

print(f"Policy iteration finalizó en {toc - tic:.2f} segundos.")

Policy iteration finalizó en 0.22 segundos.


In [None]:
from time import sleep

sleep_time = 0.5

visualize = True
if visualize:
    visualizePolicy(env, policy, sleep_time);

  (Down)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
[41mF[0mFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
[41mF[0mFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
F[41mF[0mFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FF[41mF[0mFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFF[41mF[0mFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFF[41mF[0mHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFH[41mF[0mFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHF[41mF[0mFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFF[41mF[0mHF
FHFFHFHF
FFFHFFFG

  (Down)
SFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFH[41mF[0mHF
FFFH

## Aplicación estocástica

In [None]:
is_slippery = True
env = gym.make(game, is_slippery=is_slippery)

### Value iteration

In [None]:
from time import time


np.random.seed(0)

V = [round(np.random.randn(), 2) for _ in range(env.observation_space.n)]

tic = time()
Q, V = valueIteration(V, env, gamma, epsilon)
policy = retrievePolicy(Q)
toc = time()

print(f"Value iteration finalizó en {toc - tic:.2f} segundos.")

Value iteration finalizó en 0.08 segundos.


In [None]:
from time import sleep

seed = 0
env.seed(seed)

sleep_time = 0.2
while True:
    victory = visualizePolicy(env, policy, sleep_time)
    if victory:
        break
    print("\n\n\n", '-'*20, "\n\n\n")
    sleep(2)

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41mF[0mFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
F[41mF[0mFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41mF[0mFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SF[41mF[0mFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FF[41mF[0mFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
SFFFFFFF
F[41mF[0mFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41

### Policy iteration

In [None]:
from time import time 


tic = time()
V, policy = policyIteration(env, epsilon=epsilon)
toc = time()


print(f"Policy iteration finalizó en {toc - tic:.2f} segundos.")

Policy iteration finalizó en 0.32 segundos.


In [None]:
from time import sleep

seed = 0
env.seed(seed)

sleep_time = 0.2
while True:
    victory = visualizePolicy(env, policy, sleep_time)
    if victory:
        break
    print("\n\n\n", '-'*20, "\n\n\n")
    sleep(3)

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41mF[0mFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
F[41mF[0mFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
SFFFFFFF
[41mF[0mFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
[41mS[0mFFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41mF[0mFFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SF[41mF[0mFFFFF
FFFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Right)
SFFFFFFF
FF[41mF[0mFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
SFFFFFFF
F[41mF[0mFFFFFF
FFFHFFFF
FFFFFHFF
FFFHFFFF
FHHFFFHF
FHFFHFHF
FFFHFFFG

  (Up)
S[41