# Diplomatura en ciencia de datos, aprendizaje automático y sus aplicaciones - Edición 2023 - FAMAF (UNC)

## Aprendizaje por refuerzos

### Trabajo práctico entregable 1/2 (materia completa)

**Estudiante:**
- [Chevallier-Boutell, Ignacio José.](https://www.linkedin.com/in/nachocheva/)

**Docentes:**
- Palombarini, Jorge (Mercado Libre).
- Barsce, Juan Cruz (Mercado Libre).

---

## Librerías

In [1]:
import itertools
from typing import Any, Callable, Tuple

import numpy as np
import matplotlib.pyplot as plt

import gymnasium as gym

Para ver los videos de las ejecuciones hay que tener instalado ffmpeg (`apt-get install ffmpeg`).

## Funciones predefinidas de ploteo

In [2]:
def plot_reward_per_episode(reward_ep) -> None:
    episode_rewards = np.array(reward_ep)

    # se suaviza la curva de convergencia
    episode_number = np.linspace(
        1, len(episode_rewards) + 1, len(episode_rewards) + 1)
    acumulated_rewards = np.cumsum(episode_rewards)

    reward_per_episode = [
        acumulated_rewards[i] / episode_number[i]
        for i in range(len(acumulated_rewards))
    ]

    plt.plot(reward_per_episode)
    plt.title("Recompensa acumulada por episodio")
    plt.show()

In [3]:
def plot_steps_per_episode(timesteps_ep) -> None:
    # se muestra la curva de aprendizaje de los pasos por episodio
    episode_steps = np.array(timesteps_ep)
    plt.plot(np.array(range(0, len(episode_steps))), episode_steps)
    plt.title("Pasos (timesteps) por episodio")
    plt.show()

In [4]:
def plot_steps_per_episode_smooth(timesteps_ep) -> None:
    episode_steps = np.array(timesteps_ep)

    # se suaviza la curva de aprendizaje
    episode_number = np.linspace(
        1, len(episode_steps) + 1, len(episode_steps) + 1)
    acumulated_steps = np.cumsum(episode_steps)

    steps_per_episode = [
        acumulated_steps[i] / episode_number[i] for i in range(
            len(acumulated_steps))
    ]

    plt.plot(steps_per_episode)
    plt.title("Pasos (timesteps) acumulados por episodio")
    plt.show()

In [5]:
def draw_value_matrix(q) -> None:
    n_rows = 4
    n_columns = 12
    n_actions = 4

    # se procede con los cálculos previos a la graficación de la matriz de valor
    q_value_matrix = np.empty((n_rows, n_columns))
    for row in range(n_rows):
        for column in range(n_columns):
            state_values = []

            for action in range(n_actions):
                state_values.append(
                    q.get((row * n_columns + column, action), -100))

            maximum_value = max(
                state_values
            )  # determinamos la acción que arroja máximo valor

            q_value_matrix[row, column] = maximum_value

    # el valor del estado objetivo se asigna en -1 (reward recibido al llegar)
    # para que se coloree de forma apropiada
    q_value_matrix[3, 11] = -1

    # se grafica la matriz de valor
    plt.imshow(q_value_matrix, cmap=plt.cm.RdYlGn)
    plt.tight_layout()
    plt.colorbar()

    for row, column in itertools.product(
        range(q_value_matrix.shape[0]), range(q_value_matrix.shape[1])
    ):
        left_action = q.get((row * n_columns + column, 3), -1000)
        down_action = q.get((row * n_columns + column, 2), -1000)
        right_action = q.get((row * n_columns + column, 1), -1000)
        up_action = q.get((row * n_columns + column, 0), -1000)

        arrow_direction = "D"
        best_action = down_action

        if best_action < right_action:
            arrow_direction = "R"
            best_action = right_action
        if best_action < left_action:
            arrow_direction = "L"
            best_action = left_action
        if best_action < up_action:
            arrow_direction = "U"
            best_action = up_action
        if best_action == -1:
            arrow_direction = ""

        # notar que column, row están invertidos en orden en la línea de abajo
        # porque representan a x,y del plot
        plt.text(column, row, arrow_direction, horizontalalignment="center")

    plt.xticks([])
    plt.yticks([])
    plt.show()

    print("\n Matriz de mejor acción-valor (en números): \n\n", q_value_matrix)

---
# Cliff walking

## Descripción del problema

Cliff walking es un juego que involucra moverse sobre un mapa (grilla 4x12) desde un punto inicial (Start, S) hasta un punto final (Goal, G), evitando caer por el precipicio (The Cliff).

![](https://github.com/GIDISIA/RLDiplodatos/blob/master/images/cliffwalking.png?raw=1)

Imagen: Sutton y Barto, 2018.

Al comenzar un episodio, el jugador se ubica en el elemento [3, 0] de la grilla y debe llegar hasta el elemento [3, 11] para terminar dicho episodio. El precipio se ubica en [3, 1:11]. Cuando el jugador llega a uno de estos elementos, *se cae por el precipicio* y vuelve al punto inicial [3, 0]. El jugador debe realizar tantos movimientos como sean necesarios para llegar a la meta y finalizar el episodio.

El espacio de acciones $\mathcal{A}$ tiene 4 elementos:
- 0 $\Rightarrow$ Se mueve hacia arriba
- 1 $\Rightarrow$ Se mueve hacia la derecha
- 2 $\Rightarrow$ Se mueve hacia abajo
- 3 $\Rightarrow$ Se mueve hacia la izquierda

A pesar de que la grilla tiene 48 elementos, el jugador no puede estar en el precipicio, ya que vuelve al punto S, y tampoco puede estar en la meta, sino sólo llegar a ésta, ya que es el estado terminal del episodio. Esto resulta en que el espacio de estados $\mathcal{S}$ tiene 37 elementos.

Se cumple que $\mathcal{A}, \mathcal{S} \in \mathbb{N}$, teniendo al cero como primer elemento.

La función de recompensa es tal que el jugador recibe:
- $-100$ cuando cae el precipicio.
- $-1$ en todos los demás casos.

In [6]:
env = gym.make("CliffWalking-v0", render_mode="human")
actions = range(env.action_space.n)

## Elementos del agente

### Heurísticas de selección de acciones

Se definen los métodos para decidir qué acciones tomar, pudiendo ser $\epsilon$-greedy o SoftMax.

In [7]:
def choose_action_e_greedy(state: int, 
                           actions: range, 
                           q: dict,
                           hyperparameters: dict,
                           random_state: np.random.RandomState
                           ) -> int:
    """
    Elije una acción de acuerdo al aprendizaje realizado previamente
    usando una política de exploración épsilon-greedy
    """
    # ej: para 4 acciones inicializa en [0,0,0,0]
    # q_values = [q.get((state, a), 0.0) for a in actions]
    q_values = [q.get(a) for a in actions]
    print(q_values)
    max_q = max(q_values)
    # sorteamos un número: es menor a épsilon?
    if random_state.uniform() < hyperparameters['epsilon']:
        # sí: se selecciona una acción aleatoria
        return random_state.choice(actions)

    count = q_values.count(max_q)

    # hay más de un máximo valor de estado-acción?
    if count > 1:
        # sí: seleccionamos uno de ellos aleatoriamente
        best = [i for i in range(len(actions)) if q_values[i] == max_q]
        i = random_state.choice(best)
    else:
        # no: seleccionamos el máximo valor de estado-acción
        i = q_values.index(max_q)

    return actions[i]

In [10]:
env = gym.make("CliffWalking-v0", render_mode="human")
actions = range(env.action_space.n)

q = {'arriba': 1,
     'derecha': 5,
     'abajo': 99,
     'izquierda': -20}

for a in actions:
    print(q.get(a))

None
None
None
None


: 

In [8]:
q = {'arriba': 1,
     'derecha': 5,
     'abajo': 99,
     'izquierda': -20}
hyperparameters = {"alpha": 0.5, 
                   "gamma": 1, 
                   "epsilon": 0.1, 
                   "tau": 25}
random_state = np.random.RandomState(42) # Semilla aleatoria

state, _ = env.reset()
action = choose_action_e_greedy(state, actions, q, hyperparameters, random_state)
print(action)
env.close()


[None, None, None, None]


TypeError: '>' not supported between instances of 'NoneType' and 'NoneType'

In [8]:
def choose_action_softmax() -> int:
    """
    Elije una acción de acuerdo al aprendizaje realizado previamente
    usando una política softmax
    """

    # TODO: implementar
    pass

### Aprendizaje por diferencias temporales

Se definen los métodos de aprendizaje, los cuales toman una transición y cambian el dict de los valores de Q de acuerdo a Sarsa o Q-learning.

In [9]:
def learn_SARSA(
        state: Any, # COMPLETAR tipo de cada parámetro
        action: Any,
        reward: Any,
        next_state: Any,
        next_action: Any,
        hyperparameters: Any,
        q: Any,
) -> None:
    """
    Realiza una actualización según el algoritmo SARSA, para una transición
    de estado dada
    Args:
        state: COMPLETAR
        action: COMPLETAR
        reward: COMPLETAR
        next_state: COMPLETAR
        next_action: COMPLETAR
        hyperparameters: COMPLETAR
        q: COMPLETAR
    """

    # TODO - completa con tu código aquí

    pass

In [10]:
# completar argumentos de la función para hacer una actualización Q-learning
def learn_Q_learning(
        # COMPLETAR
) -> None:
    """
    Realiza una actualización según el algoritmo Q-learning, para una transición
    de estado dada
    Args: COMPLETAR
    """

    # TODO - completa con tu código aquí

    pass

### Iteraciones

Definimos la manera en la que itera el algoritmo.

In [11]:
def run(learning_function: Callable, 
        hyperparameters: dict, 
        episodes_to_run: int, 
        env: gym.Env, 
        actions: range, 
        q: dict, 
        random_state: np.random.RandomState
        ) -> Tuple[float, np.ndarray, np.ndarray]:

    """
    Corre el algoritmo de RL para un dado entorno.
    Args:
        learning_function: función de actualización de algoritmo de aprendizaje
        hyperparameters: hiperparámetros del algoritmo de aprendizaje
        episodes_to_run: cantidad de episodios a ejecutar
        env: entorno de Gymnasium
        actions: lista de acciones posibles
        q: diccionario de valores de estado-acción
        random_state: generador de números aleatorios
    """

    # Registra la cantidad de pasos de cada episodio
    timesteps_of_episode = []

    # Registra el retorno de cada episodio
    reward_of_episode = []

    # Loop sobre los episodios
    # for _ in range(episodes_to_run):
    for k in range(episodes_to_run):
        print(f'Episodio {k}')
        # Instancea un nuevo agente en cada episodio
        # Fin del episodio: llegar a la salida o superar los 2000 pasos

        # Reinicia el entorno, obteniendo el estado inicial del mismo
        state, _ = env.reset()

        # Contador de recompensa
        episode_reward = 0

        # Contador de pasos temporales
        t = 0

        # Flag de finalización de iteración actual
        done = False

        # elige una acción basado en el estado actual.
        # Filtra el primer elemento de state, que es el estado en sí mismo
        action = choose_action_e_greedy(
            state, actions, q, hyperparameters, random_state)

        while not done:
            # el agente ejecuta la acción elegida y obtiene los resultados
            next_state, reward, terminated, truncated, _ = env.step(action)

            next_action = choose_action_e_greedy(
                next_state, actions, q, hyperparameters, random_state)

            episode_reward += reward
            learning_function(
                state,
                action,
                reward,
                next_state,
                next_action,
                hyperparameters,
                q
            )

            done = terminated or truncated

            # if the algorithm does not converge, it stops after 2000 timesteps
            if not done and t < 10:#2000:
                state = next_state
                action = next_action
            else:
                # el algoritmo no ha podido llegar a la meta antes de dar 2000 pasos
                done = True  # se establece manualmente la bandera done
                timesteps_of_episode = np.append(timesteps_of_episode, [int(t + 1)])
                reward_of_episode = np.append(
                    reward_of_episode, max(episode_reward, -100)
                )

            t += 1

    return reward_of_episode.mean(), timesteps_of_episode, reward_of_episode

## Implementaciones

### Sarsa + $\epsilon$-greedy

In [12]:
learning_function = learn_SARSA

hyperparameters = {"alpha": 0.5, 
                   "gamma": 1, 
                   "epsilon": 0.1, 
                   "tau": 25}

episodes_to_run = 3

q = {} # Contiene los valores de Q para cada tupla (estado, acción)

random_state = np.random.RandomState(42) # Semilla aleatoria

In [13]:
avg_steps_per_episode, timesteps_ep, reward_ep = run(
    learning_function,
    hyperparameters,
    episodes_to_run,
    env,
    actions,
    q,
    random_state
)

[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.0, 0.0]
[0.0, 0.0, 0.

KeyboardInterrupt: 

: 

La interfaz principal de los ambientes de gym es la interfaz Env. La misma posee cinco métodos principales:

* ```reset(self, seed)``` : Reinicia el estado del entorno, a su estado inicial, devolviendo una observación de dicho estado. Opcionalmente, establece la semilla aleatoria del generador de números aleatorios del presente entorno.

* ```step(self, action)``` : "Avanza" un timestep del entorno. Devuelve: ```observation, reward, terminated, truncated, info```.

* ```render(self)``` : Muestra en pantalla una parte del entorno.

* ```close(self)``` : Finaliza con la instancia del agente.


Por otra parte, cada entorno posee los siguientes tres atributos principales:

* ```action_space``` : El objeto de tipo Space correspondiente al espacio de acciones válidas.

* ```observation_space``` : El objeto de tipo Space correspondiente a todos los rangos posibles de observaciones.

* ```reward_range``` : Tupla que contiene los valores mínimo y máximo de recompensa posible.

### Sarsa + SoftMax

### Q-learning + $\epsilon$-greedy

### Q-learning + SoftMax