# IMD1103 - Aprendizado por Reforço

### Professor: Dr. Leonardo Enzo Brito da Silva

### Aluno: João Antonio Costa Paiva Chagas

# Laboratório 7B: Sarsa (CliffWalking)

## Importações

In [None]:
# Instala os pacotes necessários:
# - gymnasium[toy-text]: inclui ambientes simples como FrozenLake, Taxi, etc.
!pip install gymnasium[toy-text]

In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import gymnasium as gym
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from IPython.display import Image
import matplotlib.patches as patches
from typing import Dict, Tuple, List, Optional
from matplotlib.collections import LineCollection

## Armazenamento

In [None]:
output_dir = "resultados_plots"
os.makedirs(output_dir, exist_ok=True)

## Funções auxiliares para visualização

In [None]:
def plotar_comparacao_metricas(
    results: Dict[str, Dict[str, list]],
    titulo_prefixo: str,
    janela: int = 100,
    save_path: Optional[str] = None
) -> None:
    """
    Plota a comparação de métricas (tamanho e retorno) de múltiplos
    experimentos em uma única figura.
    """
    fig, axs = plt.subplots(nrows=2, ncols=1, figsize=(12, 8), sharex=True)
    sns.set_palette("viridis", n_colors=len(results)) # Optional: color palette

    # Plot 1 – Tamanho do episódio
    for label, data in results.items():
        df = pd.DataFrame({'tamanho': data['T']})
        tamanho_ma = df['tamanho'].rolling(window=janela).mean()
        sns.lineplot(x=tamanho_ma.index, y=tamanho_ma, ax=axs[0], label=label)

    axs[0].set_title(f'{titulo_prefixo} - Tamanho do Episódio')
    axs[0].set_ylabel(f'Passos por Episódio (Média Móvel {janela})')
    axs[0].legend()
    axs[0].grid()

    # Plot 2 – Retorno
    for label, data in results.items():
        df = pd.DataFrame({'retorno': data['G']})
        retorno_ma = df['retorno'].rolling(window=janela).mean()
        sns.lineplot(x=retorno_ma.index, y=retorno_ma, ax=axs[1], label=label)

    axs[1].set_title(f'{titulo_prefixo} - Retorno do Episódio')
    axs[1].set_xlabel('Episódio')
    axs[1].set_ylabel(f'Recompensa Total (Média Móvel {janela})')
    axs[1].legend()
    axs[1].grid()

    plt.tight_layout()
    if save_path:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        fig.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
        print(f"Plot de comparação salvo em: {save_path}")
    else:
        plt.show()

In [None]:
def plotar_metricas_single(
    episodio_len: list[int],
    episodio_return: list[float],
    titulo: str,
    janela: int = 100,
    save_path: Optional[str] = None
) -> None:
    """
    Usa Pandas + Seaborn para plotar as métricas de uma única execução.
    """
    df = pd.DataFrame({
        'episodio': np.arange(len(episodio_len)),
        'tamanho': episodio_len,
        'retorno': episodio_return
    })

    df['tamanho_ma'] = df['tamanho'].rolling(window=janela).mean()
    df['retorno_ma'] = df['retorno'].rolling(window=janela).mean()

    fig, axs = plt.subplots(nrows=2, ncols=1, figsize=(12, 8), sharex=True)
    fig.suptitle(titulo, fontsize=16)

    # Plot 1 – Tamanho do episódio
    sns.lineplot(data=df, x='episodio', y='tamanho', ax=axs[0], label='Tamanho do Episódio', alpha=0.3)
    sns.lineplot(data=df, x='episodio', y='tamanho_ma', ax=axs[0], label=f'Média móvel ({janela})')
    axs[0].set_ylabel('Passos por Episódio')
    axs[0].legend()
    axs[0].grid()

    # Plot 2 – Retorno
    sns.lineplot(data=df, x='episodio', y='retorno', ax=axs[1], label='Retorno do Episódio', color='orange', alpha=0.3)
    sns.lineplot(data=df, x='episodio', y='retorno_ma', ax=axs[1], label=f'Média móvel ({janela})', color='red')
    axs[1].set_xlabel('Episódio')
    axs[1].set_ylabel('Recompensa Total')
    axs[1].legend()
    axs[1].grid()

    plt.tight_layout(rect=[0, 0.03, 1, 0.95]) # Adjust layout to make room for suptitle
    if save_path:
        os.makedirs(os.path.dirname(save_path), exist_ok=True)
        fig.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
        print(f"Plot de métricas salvo em: {save_path}")
    else:
        plt.show()

In [None]:
# helpers de grid/máscaras
def _grid_info_from_env(env_name: str, map_name: str | None = None, is_slippery: bool = False):
    """
    Retorna (n_rows, n_cols, holes_set, cliffs_set, goals_set, start_rc)
    para FrozenLake-v1 ou CliffWalking-v1.
    """
    holes, cliffs, goals = set(), set(), set()
    start_rc = None

    if env_name == "FrozenLake-v1":
        kwargs = {}
        if map_name is not None:
            kwargs["map_name"] = map_name
        kwargs["is_slippery"] = is_slippery

        env = gym.make("FrozenLake-v1", **kwargs)
        desc = env.unwrapped.desc
        # decode se dtype for bytes
        desc = np.array(desc, dtype=str) if desc.dtype.kind != "S" else np.char.decode(desc, "utf-8")
        n_rows, n_cols = desc.shape
        for r in range(n_rows):
            for c in range(n_cols):
                ch = desc[r, c]
                if ch == "H": holes.add((r, c))
                elif ch == "G": goals.add((r, c))
                elif ch == "S": start_rc = (r, c)
        if start_rc is None:
            start_rc = (0, 0)
        env.close()
        return n_rows, n_cols, holes, cliffs, goals, start_rc

    elif env_name == "CliffWalking-v1":
        n_rows, n_cols = 4, 12
        start_rc = (n_rows - 1, 0)
        goal_rc  = (n_rows - 1, n_cols - 1)
        goals.add(goal_rc)
        # penhasco: última linha, colunas 1..10
        for c in range(1, n_cols - 1):
            cliffs.add((n_rows - 1, c))
        return n_rows, n_cols, holes, cliffs, goals, start_rc

    else:
        raise ValueError("env_name deve ser 'FrozenLake-v1' ou 'CliffWalking-v1'.")

def _rc_from_state(s: int, n_cols: int) -> tuple[int, int]:
    return divmod(s, n_cols)  # (r,c)

# simulação de trajetória
def simular_trajetoria_gym(
    Pi: np.ndarray,
    env_name: str,
    *,
    map_name: str | None = None,
    is_slippery: bool = False,
    max_steps: int = 200
):
    """
    Executa uma trajetória determinística (gulosa em Pi) no Gym (FrozenLake/CliffWalking).
    Retorna:
      - estados: lista de (r,c)
      - acoes: lista de ints
      - recompensas: lista de floats
    """
    # cria env
    if env_name == "FrozenLake-v1":
        kwargs = {}
        if map_name is not None:
            kwargs["map_name"] = map_name
        kwargs["is_slippery"] = is_slippery
        env = gym.make("FrozenLake-v1", **kwargs)
    elif env_name == "CliffWalking-v1":
        env = gym.make("CliffWalking-v1")
    else:
        raise ValueError("env_name deve ser 'FrozenLake-v1' ou 'CliffWalking-v1'.")

    n_rows, n_cols, holes, cliffs, goals, start_rc = _grid_info_from_env(env_name, map_name, is_slippery)

    # sanity check de dimensões
    n_states, n_actions = Pi.shape
    assert n_states == n_rows * n_cols, f"Pi.shape[0]={n_states} != n_rows*n_cols={n_rows*n_cols}"
    assert n_actions == env.action_space.n, f"Pi.shape[1]={n_actions} != action_space.n={env.action_space.n}"

    state, _ = env.reset()
    # se o env permitir setar estado diretamente, tenta (FrozenLake/cliff costumam expor .unwrapped.s)
    if hasattr(env.unwrapped, "s"):
        env.unwrapped.s = int(state)

    estados_rc = [_rc_from_state(int(state), n_cols)]
    acoes, recompensas = [], []

    for _ in range(max_steps):
        s = int(state)
        a = int(np.argmax(Pi[s]))
        next_state, reward, terminated, truncated, _ = env.step(a)

        acoes.append(a)
        recompensas.append(float(reward))
        estados_rc.append(_rc_from_state(int(next_state), n_cols))

        state = next_state
        if terminated or truncated:
            break

    env.close()
    return estados_rc, acoes, recompensas

In [None]:
# --------------------------
# plot da trajetória no grid
# --------------------------
def plot_trajetoria_gym(
    env_name: str,
    estados: list[tuple[int, int]],
    *,
    map_name: str | None = None,
    is_slippery: bool = False,
    titulo: str = "Trajetória gulosa (Gym)",
    gradiente_temporal: bool = True,
    mostrar_setas: bool = True,
    save_path: Optional[str] = None
):
    """
    Plota a trajetória sobre um grid para FrozenLake/CliffWalking com destaques de buracos/penhasco/goal.
    """
    n_rows, n_cols, holes, cliffs, goals, start_rc = _grid_info_from_env(env_name, map_name, is_slippery)

    # figura/base do grid
    fig, ax = plt.subplots(figsize=(n_cols, n_rows))
    ax.set_xlim(0, n_cols); ax.set_ylim(0, n_rows)
    ax.set_xticks(np.arange(0, n_cols + 1, 1))
    ax.set_yticks(np.arange(0, n_rows + 1, 1))
    ax.grid(True); ax.set_aspect('equal'); ax.invert_yaxis()

    # células coloridas
    for r in range(n_rows):
        for c in range(n_cols):
            cell = (r, c)
            if env_name == "FrozenLake-v1":
                if cell in holes:
                    color = (1.0, 0.0, 0.0, 0.25)  # vermelho translúcido
                elif cell in goals:
                    color = (0.0, 1.0, 0.0, 0.25)  # verde translúcido
                elif cell == start_rc:
                    color = (1.0, 1.0, 0.0, 0.18)  # amarelo leve
                else:
                    color = 'white'
            else:  # CliffWalking
                if cell in cliffs:
                    color = (1.0, 0.0, 0.0, 0.25)
                elif cell in goals:
                    color = (0.0, 1.0, 0.0, 0.25)
                elif cell == start_rc:
                    color = (1.0, 1.0, 0.0, 0.18)
                else:
                    color = 'white'

            rect = patches.Rectangle((c, r), 1, 1, facecolor=color, edgecolor='gray')
            ax.add_patch(rect)

    # extrai xs, ys (centros)
    xs = [c + 0.5 for (_, c) in estados]
    ys = [r + 0.5 for (r, _) in estados]

    # linha com gradiente temporal
    if gradiente_temporal and len(xs) > 1:
        pontos = np.array([xs, ys]).T
        segmentos = np.stack([pontos[:-1], pontos[1:]], axis=1)
        lc = LineCollection(segmentos, linewidths=2.5)
        cores = np.linspace(0, 1, len(segmentos))
        lc.set_array(cores)
        ax.add_collection(lc)
        cbar = plt.colorbar(lc, ax=ax, fraction=0.046, pad=0.04)
        cbar.set_label("Progresso temporal")
    else:
        ax.plot(xs, ys, '-o', linewidth=2.5, markersize=4)

    # início/fim
    ax.scatter(xs[0], ys[0], marker='*', s=220, edgecolor='black', facecolor='yellow', zorder=5, linewidths=1.2, label='Início')
    ax.scatter(xs[-1], ys[-1], marker='o', s=150, edgecolor='black', facecolor='none', zorder=5, linewidths=1.2, label='Fim')

    # setas entre células
    if mostrar_setas:
        for i in range(len(xs) - 1):
            dx, dy = xs[i+1] - xs[i], ys[i+1] - ys[i]
            ax.arrow(xs[i], ys[i], dx*0.85, dy*0.85, head_width=0.15, head_length=0.15,
                     length_includes_head=True, fc='black', ec='black', alpha=0.8)

    ax.set_title(titulo)
    ax.legend(loc='upper right', frameon=True)
    plt.tight_layout()
    if save_path:
        directory = os.path.dirname(save_path)
        if directory:
            os.makedirs(directory, exist_ok=True)
        fig.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
        print(f"Plot da política salvo em: {save_path}")
    else:
        plt.show()

## Algoritmo: Sarsa

In [None]:
def _atualiza_Pi(Pi: np.ndarray, Q: np.ndarray, s: int, eps: float) -> None:
    """
    Deixa Pi[s, :] ε-suave em torno de argmax_a Q[s, a].
    """
    n_acoes = Q.shape[1]
    a_star = int(np.argmax(Q[s]))
    Pi[s, :] = eps / n_acoes
    Pi[s, a_star] += 1.0 - eps

def sarsa(
    env,
    gamma: float = 0.9,
    N: int = 500,               # episódios
    T: int = 200,               # passos por episódio
    epsilon: float = 0.1,
    alpha: float = 0.1,
    seed: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, int, List[int], List[float]]:
    """
    SARSA(0) on-policy com política ε-gulosa (ε-suave) para ambientes Gymnasium de espaço discreto.

    Parâmetros
    ----------
    env : gymnasium.Env
        Ambiente com observation_space/action_space do tipo Discrete.
    gamma : float
        Fator de desconto.
    N : int
        Número de episódios.
    eps : float
        Parâmetro ε da política ε-gulosa (0 ≤ ε ≤ 1).
    alpha : float
        Taxa de aprendizado.
    seed : int | None
        Semente de aleatoriedade (usada para `np.random.default_rng` e opcionalmente em `env.reset`).

    Retorna
    -------
    Q   : np.ndarray, shape (n_states, n_actions)
        Estimativas finais Q(s,a).
    Pi  : np.ndarray, shape (n_states, n_actions)
        Política ε-suave resultante.
    numero_de_visitas : np.ndarray, shape (n_states, n_actions)
        Contagem de visitas por par (s,a).
    k   : int
        Número de episódios efetivamente executados (== N).
    episodio_T : list[int]
        Comprimento (passos) de cada episódio.
    episodio_G : list[float]
        Retorno (soma de recompensas não-descontadas) de cada episódio.
    """

    rng = np.random.default_rng(seed)

    # Listas de métricas
    episodio_T = []
    episodio_G = []

    # Atalhos
    n_estados  = env.observation_space.n
    n_acoes    = env.action_space.n

    # Inicializações
    Q                 = np.zeros((n_estados, n_acoes), dtype=float)
    numero_de_visitas = np.zeros((n_estados, n_acoes), dtype=float)

    # Política inicial ε-suave: uniforme
    Pi = np.full((n_estados, n_acoes), 1.0 / n_acoes, dtype=float)

    # Loop episódios
    for k in tqdm(range(1, N + 1), desc="Episódios (SARSA)", leave=True):
        # Reset para o estado inicial (s0)
        s, _ = env.reset()

        ############################################################################
        # Implementação aqui
        # Dica:
        # usar
        # próximo estado, recompensa, terminated (flag), truncated (flag), _ = env.step(ação)
        # para fazer a trasição de um estado para o outro dada uma ação do agente

        # Melhoria de politica
        _atualiza_Pi(Pi, Q, s, epsilon)

        # Escolhe ação a_0 ~ Pi(s_0)
        a = int(rng.choice(n_acoes, p=Pi[s]))

        # Acumuladores do episódio
        G = 0.0

        # Flag de controle do laço
        terminated = False
        truncated = False

        # Enquanto não exceder T
        for t in range(T):

            # Número de visitas ao par (s,a)
            numero_de_visitas[s, a] += 1.0

            # Passo no ambiente
            s_next, r, terminated, truncated, _ = env.step(a)

            # Incrementa acumulador
            G += r

            # Escolhe ação a_{t+1} ~ Pi(s_{t+1})
            _atualiza_Pi(Pi, Q, s_next, epsilon)
            a_next = int(rng.choice(n_acoes, p=Pi[s_next]))

            if terminated:
                td_target = r
            else:
                td_target = r + gamma * Q[s_next, a_next]

            td_error = Q[s, a] - td_target
            Q[s, a] -= alpha * td_error

            # Melhoria de politica (para o estado 's' que acabamos de deixar)
            _atualiza_Pi(Pi, Q, s, epsilon)

            # Avança
            s, a = s_next, a_next

            # Verifica se o episódio terminou
            if terminated or truncated:
                break

            ############################################################################

        # Registra métricas do episódio
        episodio_T.append(t + 1)
        episodio_G.append(G)

        ############################################################################

    return Q, Pi, numero_de_visitas, N, episodio_T, episodio_G

## Experimento

In [None]:
def executar_experimento(
    nome_experimento: str,
    ambiente: str,
    algoritmo: callable,
    **kwargs
) -> Tuple[np.ndarray, List[int], List[float]]:
    """
    Executa o algoritmo e retorna a política final, T e G.
    """
    print(f"--- Executando: {nome_experimento} ---")
    env = gym.make(ambiente)
    try:
        _, Pi, _, _, T, G = algoritmo(env=env, **kwargs)
        return Pi, T, G
    finally:
        env.close()

In [None]:
def executar_variacao_de_hiperparametro(
    nome_variacao: str,
    param_valores: list,
    algoritmo: callable,
    ambiente: str,
    output_dir: str,
    params_fixos: dict
):
    """
    Executa variações de um hiperparâmetro,
    gerando plots comparativos de métricas e trajetórias individuais.
    """
    print(f"\n==============================================")
    print(f"  Iniciando: Variação de {nome_variacao}")
    print(f"==============================================\n")

    results_metricas = {}
    param_valores_sorted = sorted(list(param_valores))

    # 1. Resultados de todas as execuções
    for valor in param_valores_sorted:
        nome_exp = f"{nome_variacao}={valor}"
        params_atuais = params_fixos.copy()
        chave_param = {"EPISODIOS": "N", "ALPHA": "alpha", "GAMMA": "gamma", "EPSILON": "epsilon"}[nome_variacao]
        params_atuais[chave_param] = valor

        Pi, T, G = executar_experimento(
            nome_experimento=nome_exp,
            ambiente=ambiente,
            algoritmo=algoritmo,
            **params_atuais
        )

        results_metricas[nome_exp] = {'T': T, 'G': G}

        # 2. Plota a trajetória individual para cada execução
        estados, _, _ = simular_trajetoria_gym(Pi, ambiente, max_steps=200)
        plot_trajetoria_gym(
            ambiente,
            estados,
            titulo=f"Trajetória (gulosa) — {nome_exp}",
            save_path=f"{output_dir}/trajetoria_{nome_exp}.pdf"
        )

    # 3. Plota comparação de métricas para a variação do hiperparâmetro
    plotar_comparacao_metricas(
        results=results_metricas,
        titulo_prefixo=f"Variação de {nome_variacao}",
        save_path=f"{output_dir}/comparacao_{nome_variacao}.pdf"
    )
    print(f"\n--- Variação de '{nome_variacao}' concluída. ---")

# Tarefa:

1. Implemente o algoritmo Sarsa para resolver o ambiente `'CliffWalking-v1'` do [gymnasium](https://gymnasium.farama.org/environments/toy_text/cliff_walking/).
2. Considere os 4 hiperparametros (EPISODIOS, ALPHA, GAMMA, EPSILON)
    - Varie um dos hiperparametros (ex.: EPISODIOS) e fixe os demais (ex.: ALPHA, GAMMA, EPSILON).
        - Obs.: 3 valores para cada hiperparâmetro.
    - Para cada estudo de hiperparâmetro plote:
        - A duração do episódio por episódio
        - A recompensa total por episodio
        - Uma trajetória gulosa utilizando `plot_trajetoria_gym`.
    - Observação: as curvas para cada estudo de hiperparâmetro devem estar na mesma figura, isto é, se o hiperparâmetro a ser variado é o EPSILON com 3 valores, então o gráfico de duração do episódio deve mostrar as 3 curvas relativas a cada valor de EPSILON (com legenda) e de maneira similar para a recompensa total por episodio.
3. Repita o procedimento para cada um dos hiperparametros.
4. Reporte suas observações.

**Entregáveis:**

2. **Código** (notebook `*.ipynb`)
1. **Relatório** (`*.pdf`).
- O PDF deve conter:
  - **Setup** (hiperparâmetros usados).
  - **Resultados** (figuras e tabelas organizadas por experimento).
  - **Análises curtas** por experimento.
- O PDF **NÃO** deve conter:
    - Códigos.

## Hiperparâmetros Fixos

In [None]:
params_fixos = {
    "N": 2000,
    "T": 1000,
    "alpha": 0.01,
    "gamma": 0.9,
    "epsilon": 0.3,
    "seed": 42
}

## Baseline

In [None]:
print("\n################# EXPERIMENTO BASELINE #################")
Pi_base, T_base, G_base = executar_experimento(
    nome_experimento="BASELINE",
    ambiente="CliffWalking-v1",
    algoritmo=sarsa,
    **params_fixos
)

plotar_metricas_single(
    episodio_len=T_base,
    episodio_return=G_base,
    titulo="Métricas da Execução Baseline",
    save_path=f"{output_dir}/comparacao_BASELINE.pdf"
)

estados_base, _, _ = simular_trajetoria_gym(Pi_base, "CliffWalking-v1", max_steps=200)
plot_trajetoria_gym(
    "CliffWalking-v1",
    estados_base,
    titulo="Trajetória (gulosa) — BASELINE",
    save_path=f"{output_dir}/trajetoria_BASELINE.pdf"
)

## Experimento 1 - Variação de EPISODIOS

In [None]:
print("\n################# EXPERIMENTO 1 #################")
params_variacao_episodios = {k: v for k, v in params_fixos.items() if k != "N"}
executar_variacao_de_hiperparametro(
    nome_variacao="EPISODIOS",
    param_valores=[1000, 1500, 2500, 3000],
    algoritmo=sarsa,
    ambiente="CliffWalking-v1",
    output_dir=output_dir,
    params_fixos=params_variacao_episodios
)

## Experimento 2 - Variação de ALPHA

In [None]:
print("\n################# EXPERIMENTO 2 #################")
params_variacao_alpha = {k: v for k, v in params_fixos.items() if k != "alpha"}
executar_variacao_de_hiperparametro(
    nome_variacao="ALPHA",
    param_valores=[0.0001, 0.001, 0.05, 0.1],
    algoritmo=sarsa,
    ambiente="CliffWalking-v1",
    output_dir=output_dir,
    params_fixos=params_variacao_alpha
)

## Experimento 3 - Variação de GAMMA

In [None]:
print("\n################# EXPERIMENTO 3 #################")
params_variacao_gamma = {k: v for k, v in params_fixos.items() if k != "gamma"}
executar_variacao_de_hiperparametro(
    nome_variacao="GAMMA",
    param_valores=[0.5, 0.7, 0.95, 0.99],
    algoritmo=sarsa,
    ambiente="CliffWalking-v1",
    output_dir=output_dir,
    params_fixos=params_variacao_gamma
)

## Experimento 4 - Variação de EPSILON

In [None]:
print("\n################# EXPERIMENTO 4 #################")
params_variacao_epsilon = {k: v for k, v in params_fixos.items() if k != "epsilon"}
executar_variacao_de_hiperparametro(
    nome_variacao="EPSILON",
    param_valores=[0.1, 0.2, 0.6, 0.9],
    algoritmo=sarsa,
    ambiente="CliffWalking-v1",
    output_dir=output_dir,
    params_fixos=params_variacao_epsilon
)

In [None]:
!zip -r /content/resultados_plots.zip /content/resultados_plots