[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/eirasf/GCED-AA3/blob/main/lab4/lab4.ipynb)

# Lab4: Aprendizaje por refuerzo - Programación dinámica

En este laboratorio nos familizarizaremos con [OpenGym](https://gym.openai.com/), una librería de Python desarrollada por [OpenAI](https://openai.com/) para simular problemas que se pueden resolver utilizando aprendizaje por refuerzo.
Además, desarrollaremos algunos métodos de control basados en programación dinámica.

## OpenGym
OpenGym es una herramienta para desarrollar y comparar algoritmos de aprendizaje por refuerzo. Facilita la simulación de interacciones de un agente con entornos muy diversos.

Para utilizar OpenGym, el primer paso es importar la librería. Una vez hecho, podremos crear un entorno que nos permita simular el problema que deseemos de entre los [muchos disponibles](https://www.gymlibrary.dev/).

En esta práctica utilizaremos un entorno muy sencillo que simula un *GridWorld*, es decir, un mundo compuesto por casillas por las que el agente podrá desplazarse. En particular, cargaremos el entorno `MiniGrid-Empty-5x5-v0`.

**(Si instalas los paquetes y al hacer `gym.make` te muestra un error indicando que no se encuentra el environment MiniGrid-Empty-5x5-v0, es posible que tengas que reiniciar el entorno y volver a ejecutar)**

In [None]:
# Si los paquetes no están instalados, hay que ejecutar estas líneas:
#!pip install gym
#!pip install gym-minigrid 
import gym
import gym_minigrid
import numpy as np
env = gym.make('MiniGrid-Empty-5x5-v0', render_mode='rgb_array')

El método `gym.make` devuelve un objeto de tipo entorno que nos ofrece, entre otros, los siguientes métodos y propiedades:
 - `reset`: Devuelve el entorno a su estado original
 - `actions`: Muestra una lista de las acciones disponibles
 - `max_steps`: Fija el número máximo de acciones que puede realizar el agente en cada episodio
 - `render`: Devuelve una imagen donde aparece representada la situación actual
 - `step(accion)`: Ejecuta una acción y actualiza el entorno en consecuencia
 
El objetivo del agente en este problema es alcanzar la meta, representada por una casilla verde. El agente aparece representado como una flecha roja que refleja su posición y orientación.

In [None]:
# Muestra las acciones disponibles
acciones = env.actions
print([a.name for a in acciones])

# Resetea el entorno
env.reset()

# Muestra el entorno en su estado inicial
import matplotlib.pyplot as plt

def muestra_entorno(env):
    im = plt.imshow(env.render())
    plt.show()
muestra_entorno(env)

# TODO: Ejecuta la acción forward (puedes referirte a la acción i-ésima como acciones(i) o como acciones.nombre) y muestra el entorno de nuevo
...

# TODO: Prueba a ver cómo afectan las distintas acciones al entorno
...

El método `step(acción)` devuelve cuatro valores:
 - Observación: Representa lo que puede percibir el agente del entorno en su estado actual
 - Recompensa: Indica el valor de la señal de recompensa para la ejecución de la acción aplicada en el estado en que se aplicó.
 - Completado: Valor booleano que indica si se ha terminado el episodio
 - Info: Información adicional
 
De cara a aprender una política, debemos buscar una manera de representar los estados. Habitualmente, el agente se basaría en sus observaciones para generar una representación del estado. Sin embargo, para este problema en particular, vamos a utilizar una representación del estado más sencilla que la observación que puede hacer el agente. Esta labor de representación del estado la llevaría a cabo el **intérprete** en el esquema habitual de un problema de aprendizaje por refuerzo.
![Esquema de aprendizaje por refuerzo](./img/Reinforcement_learning_diagram.svg)

El intérprete identificará el estado con un vector de tres componentes que indicarán, respectivamente, la columna, fila y orientación del agente. Las filas/columnas se numerarán del 0 al 2 (de arriba a abajo y derecha a izquierda) y la orientación podrá tomar los siguientes valores:
 - 0 $\rightarrow$ derecha
 - 1 $\rightarrow$ abajo
 - 2 $\rightarrow$ izquierda
 - 3 $\rightarrow$ arriba
 
El entorno de este problema nos da acceso directo a dos variables que serán de utilidad:
 - `agent_pos`: Indica la casilla en la que está el agente. Las casillas del tablero están numeradas del 1 al 3, por lo que será necesario adaptar dicha numeración.
 - `agent_dir`: Indica la orientación del agente usando el código descrito en el párrafo anterior.
 
Creemos una función que haga las labores de intérprete y devuelva la codificación del estado actual del entorno:

In [None]:
def get_estado(env):
    # TODO - Completa la función
    ...

env.reset()

# COMPROBACIÓN
assert(get_estado(env)==[0,0,0])

### Simulación de un episodio con política aleatoria

Ahora que sabemos manejar el entorno, vamos a probar qué tal funciona una política aleatoria.

Crea un bucle que simule un episodio completo siguiendo una política aleatoria, es decir, que aplique una acción aleatoria hasta que el entorno nos indique que el episodio ha acabado. Puedes obtener una acción aleatoria llamando a `env.action_space.sample()`.

Muestra el entorno tras aplicar cada acción. Además muestra un mensaje que indique:
 - Qué número de paso se ha ejecutado
 - El nombre de la acción aplicada
 - El estado resultante
 - La recompensa obtenida en ese paso
 - Si ha terminado la ejecución

In [None]:
# Eliminamos el límite de pasos por episodio. Solo acabará el episodio al alcanzar la meta.
env.max_steps = float('inf')
print(f'Max steps modificado: {env.max_steps}')


done = False
contador = 0
while not done:
    # TODO - Completa el bucle
    ...

### Simplificación del espacio de acciones y definición de políticas
Habrás comprobado que hay acciones que no son útiles para este problema. Vamos a simplificar la búsqueda reduciendo el espacio de acciones a tres:
 1. left
 2. right
 3. forward
 
Al hacer esto, ya no podemos generar una acción aleatoria utilizando `env.action_space.sample()`. Además, dicha función elige una acción al azar, lo cual no nos interesa. Nuestro objetivo es obtener una **política** que determine la mejor acción en cada situación, así que vamos a aprovechar el momento para definir una estructura de datos que nos permita hacer eso.

La política debe indicar una probabilidad $\pi(a|s)$ de elegir la acción $a$ estando en el estado $s$ (así que $\sum_{a'}\pi(a'|s)=1$, con $s\in \mathcal{S}$ y $a\in \mathcal{A}$). Declararemos una política como un `numpy.array` que asocie a cada estado $s$ una distribución de probabilidad sobre las distintas acciones.

La política aleatoria otorgará la misma probabilidad a todas las acciones para cada estado (como hay tres acciones, para cualquier par estado-acción la probabilidad será 1/3).

In [None]:
# Seleccionamos solo las tres acciones indicadas
ACCIONES_UTILES = [acciones.left, acciones.right, acciones.forward]


# TODO - Indica el shape de la policy
policy_aleatoria = np.zeros(...)
# Inicializamos todos los valores a 1/3
policy_aleatoria[:] = 1.0/3

Una vez hemos definido la política, podemos definir una función para sustituir a `env.action_space.sample()`. Esta función recibirá la política que debe seguir y el estado para el que seleccionar la acción. Devolverá la acción muestreada siguiendo la distribución descrita por la política para ese estado.

In [None]:
def sample_policy(state, policy):
    # Obtenemos la distribución de probabilidad
    probs = policy[state[0],state[1],state[2],:]
    # Generamos un número aleatorio entre 0 y 1
    r = np.random.random()
    # Para muestrear la acción, trocearemos el intervalo [0-1] en trozos, uno para cada acción.
    # El tamaño del trozo que corresponde a cada acción se corresponde con la probabilidad que asigna la política a dicha acción para el estado en cuestión
    #   Ej: Si la distribución para el estado s es [0.1, 0.5, 0.4], se generarán 3 intervalos: [0-0.1], [0.1-0.6] y [0.6-1], correspondientes a las acciones 0, 1 y 2 respectivamente.
    # El resultado del muestreo será la acción correspondiente al intervalo en que esté contenido el valor aleatorio (uniforme) r.
    p_acumulada = probs[0]
    i=0
    while p_acumulada<r:
        i+=1
        p_acumulada+=probs[i]
    return ACCIONES_UTILES[i]
    
# TODO - Obtén la acción muestreada de la policy_aleatoria para el estado en que el agente está en la casilla central mirando hacia abajo
accion = ...

print(accion)

### Simulación de episodios siguiendo una política prefijada
Al disponer ya de una definición para políticas y de una función de muestreo sobre cualquier política, podemos adaptar nuestro bucle que simula un episodio para que lo haga siguiendo una política concreta.

Definiremos una función llamada `simula_episodio` para ello. La función debe recibir la política a seguir y dos parámetros que nos indicarán, respectivamente, si se mostrarán los *renders* de los pasos del episodio y si se mostrarán mensajes de texto.

La función deberá devolver una tupla con el **retorno** obtenido, es decir, la suma de las recompensas recibidas en cada paso y el **número de pasos** necesarios para completar el episodio.

In [None]:
def simula_episodio(politica, muestra_renders=False, imprime=False):
    env.reset()
    # TODO - Adapta el bucle que escribiste 3 celdas de código más arriba para que use la política para decidir qué acción tomar en cada paso
    ...
    return (ret, contador)

retorno,num_pasos = simula_episodio(policy_aleatoria, muestra_renders=False, imprime=True)
print(f'Simulado un episodio con retorno {retorno} ({num_pasos})')

### Comprobación del rendimiento de la política

Hagamos un experimento para evaluar la eficacia de la política aleatoria. Simularemos 200 episodios, anotando el número de pasos necesarios para completar cada uno. No es necesario que llevemos cuenta de los retornos obtenidos porque en este problema siempre será 1 (dado que solo la última acción da recompensa, con valor 1).

Mostraremos estadísticas y gráficos que nos puedan ayudar a tener una idea de cómo de bien funciona.

In [None]:
def comprueba_politica(politica):
    episodios_pasos = []
    
    # TODO - Simula 200 episodios, añadiendo sus respectivos números de pasos a episodios_pasos y mostrando por cada uno un mensaje de la forma '{iteracion} - Simulado un episodio con retorno {retorno} ({num_pasos})'
    ...

    assert(len(episodios_pasos)==200)
    
    # Mostramos un histograma para la duración de los episodios
    plt.hist(episodios_pasos)
    plt.show()

    # Mostramos también un diagrama de caja
    plt.boxplot(episodios_pasos)
    plt.show()

    # Por último, escribimos un par de estadísticas
    print('Se han necesitado de media',np.mean(episodios_pasos),'pasos (+-',np.std(episodios_pasos),')')
    print('El episodio más corto duró',np.min(episodios_pasos),'pasos y el episodio más largo duró',np.max(episodios_pasos),'pasos')
    
comprueba_politica(policy_aleatoria)

# Obtención de políticas óptimas
## Métodos de programación dinámica

A la hora de encontrar una política óptima que guíe las acciones de nuestro agente, los métodos basados en programación dinámica son una buena opción cuando disponemos de conocimento respecto a las dinámicas del entorno.

En particular, estos métodos requieren conocer $p(s',r | s,a)$, es decir, la probabilidad de acabar en un estado $s'$ recibiendo una recompensa $r$ si aplicamos la acción $a$ al estado $s$.

OpenGym no nos proporciona esta información, pero en este problema sencillo podemos reconstruirla. Este entorno es determinista, lo que quiere decir que si aplicamos la acción $a$ al estado $s$ siempre obtendremos el mismo estado $s'$ y la misma recompensa $r$ con probabilidad 1 (cualquier otro estado $s_j$ o recompensa $r_j$ tendrán $p(s_j,r_j|s,a)=0$).

Podemos, por tanto, representar el modelo con dos variables:
 - `transition_model`: Para cada combinación $s,a$, almacena el estado $s'$ al que conduce aplicar la acción $a$ a $s$.
 - `reward_model`: Para cada combinación $s,a$, almacena el valor de la recompensa obtenida al aplicar la acción $a$ a $s$.
 
Sabiendo cómo se comporta el entorno, definimos estas dos variables con los valores adecuados.

In [None]:
# DEFINICIÓN DEL MODELO

# Modelo de transiciones - Almacena a qué estado lleva aplicar cada acción a cada estado
# Lo almacenamos en un array numpy en el que para cada estado almacenemos, para cada acción, el estado al que lleva.
transition_model = np.zeros((3,3,4,3,3),dtype=np.int16)
# Recorremos cada estado para completar el estado al que lleva cada una de las tres acciones posibles
for i in range(3): # Columnas
    for j in range(3): # Filas
        for k in range(4): # Orientaciones
            # Aplicar la acción left gira el agente a la izquierda, así que el estado resultante tendrá el mismo valor para filas y columnas pero variará la orientación
            transition_model[i,j,k,0]=[i,j,(k+4-1)%4]
            # Aplicar la acción right gira el agente a la derecha, así que el estado resultante tendrá el mismo valor para filas y columnas pero variará la orientación
            transition_model[i,j,k,1]=[i,j,(k+1)%4]
        # Aplicar la acción forward mantiene la orientación, pero cambia filas o columnas en función de hacia dónde esté orientado
        if i<2:
            transition_model[i,j,0,2]=[i+1,j,0] # Derecha
        if j<2:
            transition_model[i,j,1,2]=[i,j+1,1] # Abajo
        if i>0:
            transition_model[i,j,2,2]=[i-1,j,2] # Izquierda
        if j>0:
            transition_model[i,j,3,2]=[i,j-1,3] # Arriba

# Modelo de transiciones - Almacena qué recompensa proporciona aplicar cada acción a cada estado
# Lo almacenamos en un array numpy en el que para cada estado almacenemos, para cada acción, la recompensa.
reward_model = np.zeros((3,3,4,3))
# Será 0 siempre menos en dos casos:
# 1 - Está en la segunda columna de la tercera fila, orientado a la derecha y se usa la acción forward
reward_model[1,2,0,2] = 1
# 2 - Está en la tercera columna de la segunda fila, orientado hacia abajo y se usa la acción forward
reward_model[2,1,1,2] = 1

# Función auxiliar para consultar los estados a los que lleva cada acción para un estado dado
def get_action_results(state):
    return transition_model[state[0],state[1],state[2]]

# Función auxiliar para consultar las recompensas de aplicar cada acción para un estado dado
def get_action_rewards(state):
    # TODO - Completa la función
    return reward_model[state[0],state[1],state[2]]

# TODO - Muestra los estados resultantes y las recompensas de aplicar las distintas acciones cuando el agente está en la tercera casilla de la segunda fila, orientado hacia abajo y se ejecuta la acción forward
...
...

## Algoritmo 1: Evaluación de políticas iterativa
El algoritmo de evaluación de políticas iterativa calcula el valor $v_\pi(s)$ para todo estado $s$ siguiendo la política $\pi$. La fórmula que utiliza, que está basada en la ecuación de Bellman para $v_\pi(s)$, es esta:

$$v_\pi(s)=\mathbb{E}_\pi [R_{t+1}+\gamma v_k(S_{t+1}) | S_t=s]$$

$$=\sum_a\pi(a|s)\sum_{s',r}p(s',r|s,a)[r+\gamma v_k(s')]$$

Al ser este un problema determinista, la expresión se simplifica, dado que $p(s',r|s,a)$ será 0 en todos los casos menos en 1 (porque aplicar la acción $a$ al estado $s$ conduce siempre a un estado $s'$ fijo y único y otorga siempre una recompensa $r$ fija y única; ambos valores los podemos obtener del modelo).

Puedes ver el algoritmo al completo aquí (y un ejemplo en la transparencia 19 del Tema 3 de teoría):
![Evaluación de políticas iterativa](./img/iterative-policy-evaluation.png)

Implementemos el algoritmo para obtener los valores asignados a cada estado siguiendo una política fija.

**CONSEJO: Para asegurar la convergencia, actualiza todos los valores de $V_{t+1}(s)$ a partir de los valores $V_t(s)$, es decir, crea una variable `new_state_values` donde almacenes todos los calculados en una iteración y al completar la iteración establece `state_values = new_state_values`**

In [None]:
GAMMA = 0.1

def iterative_policy_evaluation(policy):
    THETA = 0.00001
    
    # En esta variable almacenaremos los valores calculados para los estados
    # TODO - Indica el shape adecuado. Debes almacenar un valor por cada estado posible
    state_values = np.zeros(...)
    
    # TODO - Completa el algoritmo según lo descrito arriba
    ...
    return state_values

state_values = iterative_policy_evaluation(policy_aleatoria)
print(state_values)

# COMPROBACIÓN
np.testing.assert_almost_equal(state_values[0,0,1],4.11522634e-07)

### Optimización de políticas

En el apartado anterior hemos obtenido un valor para cada estado del problema cuando se sigue la política $\pi$. Según lo visto en teoría, podemos utilizar estos valores para encontrar una nueva política $\pi'$ que sea igual o mejor que $\pi$.

Cuando disponemos de $v_\pi(s)$ y del modelo que describe el funcionamiento del entorno, podemos obtener una política $\pi'\geq\pi$ simplemente seleccionando, para cada estado, la acción que nos da un mayor retorno, que podemos calcular como la recompensa inmediata más el valor del estado al que llegamos (descontado por $\gamma$).

$$\pi'(s)=\arg\max_a\sum_{s',r}p(s',r|s,a)\left[r+\gamma v_\pi(s')\right]$$

Escribamos un algoritmo que obtenga $\pi'$ a partir de `state_values`.

In [None]:
def greedify_policy(state_values):
    new_shape = list(state_values.shape)
    new_shape.append(3)
    # TODO - Indica la shape apropiada. Por cada par estado-acción debemos almacenar la probabilidad que da esta política de tomar esa acción estando en dicho estado
    # Como la política es determinista, para un estado concreto todas las acciones tendrán probabilidad 0 menos una que tendrá probabilidad 1
    policy = np.zeros(...)
    # TODO - Haz bucles anidados para recorrer todos los estados
    ...
                # Ahora para este estado debemos calcular, para todas las acciones a, G=(r + GAMMA*state_values[s[0],s[1],s[2]]) donde r es la recompensa de aplicar a y s es el estado resultante.
                # De todos esos valores, tomaremos el mejor (supongamos que es el i-ésimo) y pondremos probabilidad 1 en la acción i-ésima para dicho estado.
                # TODO - Recorre las acciones, calculando su G. Selecciona el mejor y pon a 1 la probabilidad de la acción correspondiente.
                ...
    return policy

policy_mejorada = greedify_policy(state_values)

# COMPROBACIÓN
assert(np.array_equal(policy_mejorada[2,1,1],[0., 0., 1.]))

## Obtención de la política óptima - Policy iteration
Ahora que podemos evaluar una política y, a partir de esos valores, obtener una política mejorada, podemos repetir el proceso para seguir mejorando nuestra política hasta que no cambie. Este procedimiento se denomina *policy iteration* y lo tienes aquí descrito:

![Policy iteration](./img/policy-iteration.png)

Implementémoslo (la mayoría ya lo tenemos hecho).

In [None]:
def policy_iteration(initial_policy):
    current_policy = initial_policy
    policy_stable = False
    # Llevaremos cuenta del paso en que estamos
    step = 0
    while not policy_stable:
        # TODO - Calcula los valores para current_policy
        current_values = ...
        # TODO - Obtén la nueva política mejorada a partir de los valores calculados
        new_policy = ...
        # Comprobamos si la política ha cambiado
        policy_stable = np.array_equal(current_policy, new_policy)
        # Preparamos la siguiente iteración
        current_policy = new_policy
        step+=1
        print('Paso #',step)
    # Al terminar el bucle habremos obtenido una política que ya no mejora
    return current_policy
        
policy_optima = policy_iteration(policy_aleatoria)
print(policy_optima)

# COMPROBACIÓN
assert(np.array_equal(policy_optima[2,0,1],[0., 0., 1.]))

Comprobemos que la política obtenida funciona bien simulando un episodio siguiéndola.

In [None]:
simula_episodio(policy_optima, muestra_renders=True, imprime=True)

## Obtención de la política óptima - Iteración de valores
El proceso de iteración de políticas es costoso porque requiere hacer una estimación iterativa de $v_\pi(s)$ para todas las políticas por las que se pasa. Este proceso se puede simplificar y obtener una política óptima siguiendo el algoritmo de iteración de valores:

![Value iteration](./img/value-iteration.png)

El algoritmo es muy similar al de evaluación de políticas iterativas. La diferencia es que, en lugar de calcular $\mathbb{E}_\pi [R_{t+1}+\gamma v_k(S_{t+1}) | S_t=s]$, se toma el valor de la mejor acción. Al final, se devuelve la política greedy derivada de los valores calculados. Implementémoslo.

In [None]:
def value_iteration(): # No recibe ningún parámetro. Calcula valores y política a partir del modelo del entorno.
    # TODO - Repite el algoritmo de evaluación de políticas iterativa pero tomando el G de la mejor acción para cada estado
    # Una vez tengas los state_values, devuelve la política derivada de ellos
    ...

otra_optima = value_iteration()
print(otra_optima)

# COMPROBACIÓN
assert(np.array_equal(otra_optima[2,0,1],[0., 0., 1.]))

Para terminar, comprobemos que la política obtenida así también funciona bien.

In [None]:
simula_episodio(otra_optima, muestra_renders=True, imprime=True)
env.close() # Cerramos, además, el entorno

# ¡Enhorabuena!
Has terminado este laboratorio. Ahora sabes cómo interactuar con OpenGym y cómo funcionan los principales algoritmos de programación dinámica para aprendizaje por refuerzo.