In [45]:
import numpy as np
from gym.envs.toy_text import discrete

states = ["high", "low"]
actions = ["wait", "search", "recharge"]

P = {}

P[0] = {}
P[1] = {}

alpha = 0.9
beta = 0.9
r_wait = 2.0
r_search = 4.0

# definimos un ambiente discreto con las transiciones según el gráfico

P[0][0] = [(1.0, 0, r_wait, False)]
P[0][1] = [(alpha, 0, r_search, False),
           (1-alpha, 1, r_search, False)]
P[0][2] = [(1.0, 0, 0.0, False)]

P[1][0] = [(1.0, 0, r_wait, False)]
P[1][1] = [(beta, 1, r_search, False), 
           (1-beta, 0, -3.0, False)]
P[1][2] = [(1.0, 0, 0.0, False)]

env = discrete.DiscreteEnv(2, 3, P, [0.0, 1.0])


In [46]:
# Del ejercicio de evaluación de política

def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    Evaluar una política dado un ambiente y una descripción completa
    de la dinámica del ambiente.
    
    Argumentos:
        política: matriz de tamaño [S, A] representando la política.
        env: ambiente de OpenAI representadno las probabilidades de transición
        del ambiente. 
        env.P[s][a] es una lista de vectores (probabilidad, próximo_estado, recompensa, done)
        env.nS es el número de estados en el ambiente
        env.nA es el número de acciones en el ambiente
        theta: para la evaluación de la política una vez que la función de valor cambia menos que
        theta para todos los estados
        discount_factor: factor de descuento gama.
        
    Retorna:
        Vector de longitud env.nS que representa la función de valor.
    """
    # Comenzar con función de valor aleatoria
    V = np.zeros(env.nS)
    while True:
        delta = 0
        # Para cada estado realizar un "full backup"
        for s in range(env.nS):
            v = 0
            # Fijarse en las posibles próximas acciones
            for a, action_prob in enumerate(policy[s]):
                # Para cada acción, fijarse en los próximos estados
                for prob, next_state, reward, done in env.P[s][a]:
                    # Calcular el valor esperado
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            # Cuál fue el máximo cambio de la función de valor
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Para de evaluar una vez que estamos debajo de un cierto umbral
        if delta < theta:
            break
    return np.array(V)

In [47]:
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    Algoritmo de mejora de una política. Evalúa iterativamente y mejora una política 
    hasta que encuentra la política óptima.
    
    Args:
        env: ambiente de OpenAI.
        policy_eval_fn: función de evaluación de política que toma 3 argumentos: policy, env, discount_factor
        discount_factor: factor de descuento gama
        
    Retorna:
        Un tuple (policy, V)
        A tuple (policy, V). 
        policy es la política óptima, una matriz de tamaño [S, A] en que cada estado s contiene una distribución de probabilidad 
        valida sobre el espacio de acciones.
        V es la función de valor para la política óptima.
        
    """

    policy = np.ones([env.nS, env.nA]) / env.nA
    
    while True:
        policy_stable = True
        V = policy_eval_fn(policy, env, discount_factor)
        
        for i in range(env.nS):
            old_action = np.argmax(policy[i])
            action_value_function_list = []
            
            for j in range(env.nA):
                action_value_function = 0
                
                for  prob, next_state, reward, done in env.P[i][j]:
                    action_value_function += prob * (reward + discount_factor * V[next_state])
                    
                action_value_function_list.append(action_value_function)

            best_action = np.argmax(action_value_function_list)
            policy[i, :] = 0
            policy[i, best_action] = 1
                    
            if old_action != best_action:
                policy_stable = False
                            
        if policy_stable:
            break
            
    return policy, V

In [59]:
env.reset()

observation = 0

for _ in range(20):
    
    policy, v = policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=0.25)
    action = policy[observation].argmax()
    observation, reward, done, info = env.step(action)
    
    print('value: %s' % v)
    print('observation: %s' % observation)
    print('reward: %s' % reward)
    print('------------------------------')


value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.42916616]
observation: 1
reward: 4.0
------------------------------
value: [5.30416578 4.