In [None]:
!pip install optuna



In [None]:

import gymnasium as gym
import numpy as np


# Esta função define uma política, em função da tabela Q e do epsilon
# Escolhe a ação gulosa (greedy) com probabilidade 1-epsilon e uma ação aleatória com probabilidade epsilon.
def epsilon_greedy(Q, state, epsilon):
    Q_state = Q[state]
    num_actions = len(Q_state)
    if np.random.random() < epsilon:
        return np.random.randint(0, num_actions)
    else:
        # em caso de empates, retorna sempre o menor índice -- mais eficiente, porém não é bom para alguns ambientes
        return np.argmax(Q_state)


# Esta função define uma política, em função da tabela Q e do epsilon.
# Como a anterior, mas aleatoriza a escolha em caso de haver mais de uma opção gulosa empatada.
def epsilon_greedy_random_tiebreak(qtable, state, epsilon):
    q_state = qtable[state]
    num_actions = len(q_state)
    if np.random.random() < epsilon:
        return np.random.randint(0, num_actions)
    else:
        # retorna uma ação de valor máximo -- aleatoriza em caso de empates
        return np.random.choice(np.where(q_state == q_state.max())[0])


def _delete_files(folder, prefix, suffix):
    import os
    # check if folder exists
    if not os.path.exists(folder):
        return
    # list files and delete all files with the given prefix and the given suffix
    for file in os.listdir(folder):
        if file.startswith(prefix) and file.endswith(suffix):
            os.remove(os.path.join(folder, file))


def record_video_qtable(env_name, qtable, episodes=2, folder='videos/', prefix='rl-video', epsilon=0.0, max_episode_length=500):
    """
    Grava um vídeo a partir de uma política epsilon-greedy definida pela 'qtable' e pelo valor de 'epsilon'.
    - env_name: A string do ambiente cadastrada no gymnasium ou uma instância da classe. Ao final, o ambiente é fechado (função `close()`).
    - qtable: A tabela Q (Q-table) na forma de array bidimensional, com linha representando estados e colunas representando ações.
    - length: Número de passos do ambiente usados no vídeo.
    - prefiz: Prefixo do nome dos arquivos de vídeo.
    - folder: Pasta onde os arquivos de vídeo serão salvos.
    - epsilon: Valor do parâmetro da política "epsilon-greedy" usada para escolher as ações.
    """
    if isinstance(env_name, str):
        env = gym.make(env_name, render_mode="rgb_array")
    else:
        env = env_name

    # delete .mp4 files with the given prefix from the folder
    _delete_files(folder, prefix, ".mp4")

    rec_env = gym.wrappers.RecordVideo(env, folder, episode_trigger=lambda i : True, video_length=max_episode_length, name_prefix=prefix)
    for _ in range(episodes):
        state, _ = rec_env.reset()
        ep_steps = 0
        done = False
        while (not done) and (ep_steps < max_episode_length-1):  # porque o reset conta no tamanho do vídeo
            action = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, _, termi, trunc, _ = rec_env.step(action)
            done = termi or trunc
            ep_steps += 1
    rec_env.close()
    env.close()


def evaluate_qtable_policy(env, qtable, num_episodes=100, epsilon=0.0, verbose=False):
    """
    Avalia a política epsilon-greedy definida implicitamente por uma Q-table.
    Por padrão, executa com epsilon=0.0; ou seja, executa, em todo estado s, escolhe a ação "a = argmax Q(s,_)".
    - env: O ambiente instanciado. Ele não é fechado ao final.
    - qtable: A Q-table (tabela Q) que será usada.
    - num_episodes: Quantidade de episódios a serem executados.
    - epsilon: Valor do parâmetro para a escolha epsilon-greedy da ação.

    Retorna:
    - um par contendo:
       -  o valor escalar do retorno médio por episódio
       -  e a lista de retornos de todos os episódios
    """
    episode_returns = []
    total_steps = 0

    for i in range(num_episodes):
        if verbose:
            print(f"Episódio {i+1}: ", end="")
        state, _ = env.reset()
        done = False
        episode_step = 0
        episode_returns.append(0.0)

        while not done:
            action = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, reward, termi, trunc, _ = env.step(action)
            done = termi or trunc
            episode_step += 1
            total_steps += 1
            episode_returns[-1] += reward
            if episode_step == 1500:
                print(f"Too long episode, truncating at step {episode_step}.")
                break
        if verbose:
            print(episode_returns[-1])

    mean_return = np.mean(episode_returns)
    print(f"Retorno médio (por episódio): {mean_return:.2f}, episódios: {len(episode_returns)}, total de passos: {total_steps}")

    return mean_return, episode_returns


def repeated_exec_qtable_policy(executions, alg_name, qtable, env, num_iterations, epsilon=0.0):
    """
    Executa várias vezes uma política epsilon-greedy definida com a qtable dada.
    Internamente usa o `experiments.repeated_exec()`.
    """

    def run_q_greedy(env, num_steps):
        state, _ = env.reset()
        rewards = []
        for i in range(num_steps):
            a = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated
            rewards.append(r)
            if done:
                state, _ = env.reset()
        return rewards, None

    return repeated_exec(executions, alg_name, run_q_greedy, env, num_iterations)


import time
import os
from tqdm import tqdm

import numpy as np
import gymnasium as gym


def process_returns_linear_interpolation(step_return_list, total_time):
    #assert total_time == step_return_list[-1][0], "The algorithm did not return a final (partial) return at the last time step!"
    partial_returns = [0] * total_time
    X = 0
    t1 = 0
    for i in range(len(step_return_list)):
        t2, Y = step_return_list[i]

        # if t1+1 > total_time, it wont't enter the loop
        # if t2 > total_time, it will calculate up to total_time
        for t in range(t1+1, min(t2, total_time) + 1):
            partial_returns[t - 1] = X + ( (Y - X) * (t - t1) / (t2 - t1) )
            #alt.: partial_returns[t - 1] = ((t2 - t) * X + (t - t1) * Y) / (t2 - t1)

        t1 = t2
        X = Y

    return partial_returns


def repeated_exec(executions, alg_name, algorithm, env, num_iterations, *args, **kwargs):
    '''
    This file runs repeatedly the given algorithm with the given parameters and returns
    results compatible with the functions in 'plot'.

    Parameters:
    - executions: number of times that the 'algorithm' will be run from the start
    - alg_name: a string to represent this setting of algorithm (with the given parameters)
    - algorithm: must be a function that receives 'env' then and integer (corresponding to the 'num_iterations') then the list of arguments (given by'*args' and/or **kwargs)
    - num_iterations: number of steps or episodes
    - *args: list of arguments for 'algorithm'
    - **kwargs: named arguments for 'algorithm'
    - auto_load: to save the results and reload automatically when re-executed with exactly the same parameters (including the number of executions)
    '''
    # gets a string to identify the environment
    if isinstance(env, gym.Env):
        env_name = str(env).replace('<', '_').replace('>', '')
    else:
        env_name = type(env).__name__
    auto_load = False
    if ('auto_load' in kwargs):
        auto_load = kwargs['auto_load']
        kwargs.pop('auto_load')
    result_file_name = f"results/{env_name}-{alg_name}-episodes{num_iterations}-execs{executions}.npy"
    if auto_load and os.path.exists(result_file_name):
        print("Loading results from", result_file_name)
        RESULTS = np.load(result_file_name, allow_pickle=True)
        return RESULTS
    rewards = np.zeros(shape=(executions, num_iterations))
    t = time.time()
    print(f"Executing {alg_name} ({algorithm}):")
    for i in tqdm(range(executions)):
        alg_output = algorithm(env, num_iterations, *args, **kwargs)
        temp_returns = alg_output[0]
        if isinstance(temp_returns[0], tuple):
            # when the algorithm outputs a list of pairs (time, return)
            rewards[i] = process_returns_linear_interpolation(temp_returns, num_iterations)
        else:
            # when the algoritm outputs a simple list of returns
            rewards[i] = temp_returns
    t = time.time() - t
    print(f"  ({executions} executions of {alg_name} finished in {t:.2f} secs)")
    RESULTS = np.array([alg_name, rewards], dtype=object)
    directory = os.path.dirname(result_file_name)
    if auto_load:
        if not os.path.exists(directory):
            os.makedirs(directory)
        np.save(result_file_name, RESULTS, allow_pickle=True)
    return alg_name, rewards


import multiprocessing

def worker(args):
    i, algorithm, env, num_iterations, alg_args, alg_kwargs = args
    try:
        temp_returns, _ = algorithm(env, num_iterations, *alg_args, **alg_kwargs)
        if isinstance(temp_returns[0], tuple):
            # when the algorithm outputs a list of pairs (time, return)
            return process_returns_linear_interpolation(temp_returns, num_iterations)
        else:
            # when the algoritm outputs a simple list of returns
            return temp_returns
    except Exception as e:
        print(f"Error in execution {i} of {algorithm}: {str(e)}")
        return None

def repeated_exec_parallel(executions, num_cpus, alg_name, algorithm, env_factory, num_iterations, args=(), kwargs=dict(), auto_save_load=False):
    env = env_factory()
    assert isinstance(env, gym.Env)
    env_name = str(env).replace('<', '_').replace('>', '')
    result_file_name = f"results/{env_name}-{alg_name}-episodes{num_iterations}-execs{executions}.npy"
    if auto_save_load and os.path.exists(result_file_name):
        print("Loading results from", result_file_name)
        RESULTS = np.load(result_file_name, allow_pickle=True)
        return RESULTS

    rewards = None  # expected final shape: (executions, num_iterations)
    t = time.time()
    print(f"Executing {alg_name} ({algorithm}) in {num_cpus} cpus:")

    with multiprocessing.Pool(num_cpus) as p:
        args_for_worker = [(i, algorithm, env_factory(), num_iterations, args, kwargs) for i in range(executions)]
        rewards_list = p.map(worker, args_for_worker)
        # catches any excetion raised for invalid rewards list
        try:
            rewards = np.array(rewards_list)
        except:
            print("ERROR: invalid rewards list returned by the algorithm!")
            print("rewards_list =", rewards_list)
            raise

    t = time.time() - t
    print(f"  ({executions} executions of {alg_name} finished in {t:.2f} secs)")

    RESULTS = np.array([alg_name, rewards], dtype=object)
    directory = os.path.dirname(result_file_name)
    if auto_save_load:
        if not os.path.exists(directory):
            os.makedirs(directory)
        np.save(result_file_name, RESULTS, allow_pickle=True)

    return alg_name, rewards


import numpy as np
import matplotlib.pyplot as plt



def smooth(data, window):
  data = np.array(data)
  n = len(data)
  y = np.zeros(n)
  for i in range(n):
    start = max(0, i-window+1)
    y[i] = data[start:(i+1)].mean()
  return y

#def moving_average(x, w):
#    return np.convolve(x, np.ones(w), 'valid') / w


# TODO: remover e trocar por plot_single_result
def plot_result(returns, ymax_suggested=None, x_log_scale=False, window=None, x_axis='episode', filename=None, cumulative=False):
    '''Exibe um gráfico "episódio/passo x retorno", fazendo a média a cada `window` retornos, para suavizar.

    Parâmetros:
    - returns: se return_type=='episode', este parâmetro é uma lista de retornos a cada episódio; se return_type=='step', é uma lista de pares (passo,retorno)
    - ymax_suggested (opcional): valor máximo de retorno (eixo y), se tiver um valor máximo conhecido previamente
    - x_log_scale: se for True, mostra o eixo x na escala log (para detalhar mais os resultados iniciais)
    - window: permite fazer a média dos últimos resultados, para suavizar o gráfico
    - return_type: use 'episode' ou 'step' para indicar o que representa o eixo x; também afeta como será lido o parâmetro 'returns'
    - filename: se for fornecida uma string, salva um arquivo de imagem ao invés de exibir.
    '''
    plt.figure(figsize=(12,7))

    # TODO: uniformizar com a outra função
    if cumulative == 'no':
        cumulative = False

    if x_axis == 'episode':
        plt.xlabel('Episódios')
        if cumulative:
            returns = np.array(returns)
            returns = np.cumsum(returns)
            title = "Retorno acumulado"
            if window is not None:
                print("Attention: 'window' is ignored when 'cumulative'==True")
            window = 1
        else:
            if window is None:
                window = 10
            title = f"Retorno médio a cada {window} episódios"
        yvalues = smooth(returns, window)
        xvalues = np.arange(1, len(returns)+1)
        plt.plot(xvalues, yvalues)
        #plt.title(f"Retorno médio a cada {window} episódios")
        plt.title(title)
    #elif x_axis == 'step':
    else:
        print("Attention: 'window' is ignored for 'step' type of returns")
        plt.xlabel('Passos')
        xvalues, yvalues = list(zip(*returns))
        xvalues = np.array(xvalues) + 1
        if cumulative:
            yvalues = np.array(yvalues)
            yvalues = np.cumsum(yvalues)
            title = "Retorno acumulado"
            # window = 1
        else:
            #if window is None:
            #    window = 10
            title = "Retorno"
        #yvalues = smooth(returns, window)
        plt.plot(xvalues, yvalues)
        #plt.title(f"Retorno médio a cada {window} passos")
        plt.title(title)

    if x_log_scale:
        plt.xscale('log')

    plt.ylabel('Retorno')
    if ymax_suggested is not None:
        ymax = np.max([ymax_suggested, np.max(yvalues)])
        plt.ylim(top=ymax)

    if filename is None:
        plt.show()
    else:
        plt.savefig(filename)
        print("Arquivo salvo:", filename)

    plt.close()


def plot_multiple_results(list_returns, cumulative='no', x_log_scale=False, x_axis='episode', window=10, plot_stddev=False, yreference=None, y_min=None):
    '''Exibe um gráfico "episódio/passo x retorno" com vários resultados.

    Parâmetros:
    - list_returns: uma lista de pares (nome do resultado, retorno por episódio/passo)
    - cumulative: indica se as recompensas anteriores devem ser acumuladas, para calcular a soma ou média histórica por episódio
    - x_log_scale: se for True, mostra o eixo x na escala log (para detalhar mais os resultados iniciais)
    - x_axis: use 'episode' ou 'step' para indicar o que representa o eixo x
    - window: permite fazer a média dos últimos resultados, para suavizar o gráfico; só é usado se cumulative='no'
    - plot_stddev: exibe sombra com o desvio padrão, ou seja, entre média-desvio e média+desvio
    - yreference: if not None, should be an integer, where will be plot a horizontal gray dashed line, used for reference
    - y_min: valor mínimo do eixo y; caso os dados tenham valor menor, o gráfico será ajustado para adotar este valor como mínimo
    '''
    # True and False are here for backward compatibility (remove!)
    if cumulative is None:
        cumulative = 'no'
    assert cumulative in ['no', 'sum', 'avg']
    assert x_axis in ['step', 'episode']

    total_steps = list_returns[0][1].shape[1]
    plt.figure(figsize=(12,7))

    for (alg_name, returns) in list_returns:
        xvalues = np.arange(1, total_steps+1)
        # TODO: bug -- isso está errado para cumulative='avg', quando x_axis='step'
        if cumulative == 'sum' or cumulative == 'avg':
            # calculate the cumulative sum along axis 1
            cumreturns = np.cumsum(returns, axis=1)
            if cumulative == 'avg':
                cumreturns = cumreturns / xvalues
            yvalues = cumreturns.mean(axis=0)
            std = cumreturns.std(axis=0)
        else:
            yvalues = smooth(returns.mean(axis=0),window)
            std = returns.std(axis=0)
        plt.plot(xvalues, yvalues, label=alg_name)
        if plot_stddev:
            plt.fill_between(xvalues, yvalues-std, yvalues+std, alpha=0.4)

    if yreference is not None:
        y_ref_line = [ yreference ] * total_steps
        plt.plot(y_ref_line, linestyle="--", color="gray")

    if x_log_scale:
        plt.xscale('log')

    if x_axis == 'episode':
        plt.xlabel('Episódio')
        payoff = 'Retorno'
    else:
        plt.xlabel('Passo')
        payoff = 'Recompensa'

    plt.ylabel('Retorno')

    if cumulative == 'no':
        plt.title(f"{payoff} (média móvel a cada {window})")
    elif cumulative == 'avg':
        gen = payoff[-1]
        plt.title(f"{payoff} acumulad{gen} médi{gen}")
    else:
        gen = payoff[-1]
        plt.title(f"{payoff} acumulad{gen}")

    if y_min is not None:
        min_value = min(np.min(returns) for (_, returns) in list_returns)
        if min_value < y_min:
            plt.ylim(bottom=y_min)

    plt.legend()
    plt.show()


# TODO: corrigir bug no caso cumulative='avg' e x_axis='step'
def plot_single_result(returns, *args, **kwargs):
    '''Exibe um gráfico "episódio/passo x retorno", para um único resultado.

    Parâmetros:
    - cumulative: indica se as recompensas anteriores devem ser acumuladas, para calcular a soma ou média histórica por episódio
    - x_log_scale: se for True, mostra o eixo x na escala log (para detalhar mais os resultados iniciais)
    - window: permite fazer a média dos últimos resultados, para suavizar o gráfico; só é usado se cumulative='no'
    - plot_stddev: exibe sombra com o desvio padrão, ou seja, entre média-desvio e média+desvio
    - yreference: if not None, should be an integer, where will be plot a horizontal gray dashed line, used for reference
    '''
    if isinstance(returns[0], tuple):
        # when the algorithm outputs a list of pairs (time, return)
        x_axis = 'step'
        total_time = returns[-1][0]
        returns = process_returns_linear_interpolation(returns, total_time)
    else:
        # when the algoritm outputs a simple list of returns
        x_axis = 'episode'

    processed_returns = np.array([returns])
    plot_multiple_results([(None, processed_returns)], x_axis=x_axis, *args, **kwargs)


import gymnasium as gym
import numpy as np


# Esta função define uma política, em função da tabela Q e do epsilon
# Escolhe a ação gulosa (greedy) com probabilidade 1-epsilon e uma ação aleatória com probabilidade epsilon.
def epsilon_greedy(Q, state, epsilon):
    Q_state = Q[state]
    num_actions = len(Q_state)
    if np.random.random() < epsilon:
        return np.random.randint(0, num_actions)
    else:
        # em caso de empates, retorna sempre o menor índice -- mais eficiente, porém não é bom para alguns ambientes
        return np.argmax(Q_state)


# Esta função define uma política, em função da tabela Q e do epsilon.
# Como a anterior, mas aleatoriza a escolha em caso de haver mais de uma opção gulosa empatada.
def epsilon_greedy_random_tiebreak(qtable, state, epsilon):
    q_state = qtable[state]
    num_actions = len(q_state)
    if np.random.random() < epsilon:
        return np.random.randint(0, num_actions)
    else:
        # retorna uma ação de valor máximo -- aleatoriza em caso de empates
        return np.random.choice(np.where(q_state == q_state.max())[0])


def _delete_files(folder, prefix, suffix):
    import os
    # check if folder exists
    if not os.path.exists(folder):
        return
    # list files and delete all files with the given prefix and the given suffix
    for file in os.listdir(folder):
        if file.startswith(prefix) and file.endswith(suffix):
            os.remove(os.path.join(folder, file))


def record_video_qtable(env_name, qtable, episodes=2, folder='videos/', prefix='rl-video', epsilon=0.0, max_episode_length=500):
    """
    Grava um vídeo a partir de uma política epsilon-greedy definida pela 'qtable' e pelo valor de 'epsilon'.
    - env_name: A string do ambiente cadastrada no gymnasium ou uma instância da classe. Ao final, o ambiente é fechado (função `close()`).
    - qtable: A tabela Q (Q-table) na forma de array bidimensional, com linha representando estados e colunas representando ações.
    - length: Número de passos do ambiente usados no vídeo.
    - prefiz: Prefixo do nome dos arquivos de vídeo.
    - folder: Pasta onde os arquivos de vídeo serão salvos.
    - epsilon: Valor do parâmetro da política "epsilon-greedy" usada para escolher as ações.
    """
    if isinstance(env_name, str):
        env = gym.make(env_name, render_mode="rgb_array")
    else:
        env = env_name

    # delete .mp4 files with the given prefix from the folder
    _delete_files(folder, prefix, ".mp4")

    rec_env = gym.wrappers.RecordVideo(env, folder, episode_trigger=lambda i : True, video_length=max_episode_length, name_prefix=prefix)
    for _ in range(episodes):
        state, _ = rec_env.reset()
        ep_steps = 0
        done = False
        while (not done) and (ep_steps < max_episode_length-1):  # porque o reset conta no tamanho do vídeo
            action = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, _, termi, trunc, _ = rec_env.step(action)
            done = termi or trunc
            ep_steps += 1
    rec_env.close()
    env.close()


def evaluate_qtable_policy(env, qtable, num_episodes=100, epsilon=0.0, verbose=False):
    """
    Avalia a política epsilon-greedy definida implicitamente por uma Q-table.
    Por padrão, executa com epsilon=0.0; ou seja, executa, em todo estado s, escolhe a ação "a = argmax Q(s,_)".
    - env: O ambiente instanciado. Ele não é fechado ao final.
    - qtable: A Q-table (tabela Q) que será usada.
    - num_episodes: Quantidade de episódios a serem executados.
    - epsilon: Valor do parâmetro para a escolha epsilon-greedy da ação.

    Retorna:
    - um par contendo:
       -  o valor escalar do retorno médio por episódio
       -  e a lista de retornos de todos os episódios
    """
    episode_returns = []
    total_steps = 0

    for i in range(num_episodes):
        if verbose:
            print(f"Episódio {i+1}: ", end="")
        state, _ = env.reset()
        done = False
        episode_step = 0
        episode_returns.append(0.0)

        while not done:
            action = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, reward, termi, trunc, _ = env.step(action)
            done = termi or trunc
            episode_step += 1
            total_steps += 1
            episode_returns[-1] += reward
            if episode_step == 1500:
                print(f"Too long episode, truncating at step {episode_step}.")
                break
        if verbose:
            print(episode_returns[-1])

    mean_return = np.mean(episode_returns)
    print(f"Retorno médio (por episódio): {mean_return:.2f}, episódios: {len(episode_returns)}, total de passos: {total_steps}")

    return mean_return, episode_returns


def repeated_exec_qtable_policy(executions, alg_name, qtable, env, num_iterations, epsilon=0.0):
    """
    Executa várias vezes uma política epsilon-greedy definida com a qtable dada.
    Internamente usa o `experiments.repeated_exec()`.
    """
    from experiments import repeated_exec

    def run_q_greedy(env, num_steps):
        state, _ = env.reset()
        rewards = []
        for i in range(num_steps):
            a = epsilon_greedy_random_tiebreak(qtable, state, epsilon)
            state, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated
            rewards.append(r)
            if done:
                state, _ = env.reset()
        return rewards, None

    return repeated_exec(executions, alg_name, run_q_greedy, env, num_iterations)

In [None]:
# Cell 1: Importações
import matplotlib.pyplot as plt
import time
import optuna

In [None]:
# Cell 2: Funções Auxiliares e Estratégias de Seleção de Ação

def plot_learning_curve(ax, rewards, window=50, label=None):
    """Plota a média móvel dos retornos no eixo 'ax'."""
    moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
    ax.plot(moving_avg, label=label)
    ax.set_xlabel("Episódios")
    ax.set_ylabel("Retorno médio")

def softmax_probs(Q_state):
    values = Q_state - np.max(Q_state)
    exp_values = np.exp(values)
    return exp_values / np.sum(exp_values)

def softmax_choice(Q, state):
    probs = softmax_probs(Q[state])
    return np.random.choice(len(probs), p=probs)

def epsilon_greedy_probs(Q, state, epsilon):
    Q_state = Q[state]
    num_actions = len(Q_state)
    q_max = np.max(Q_state)
    non_greedy_prob = epsilon / num_actions
    num_greedy = np.sum(Q_state == q_max)
    greedy_prob = ((1 - epsilon) / num_greedy) + non_greedy_prob
    probs = np.where(Q_state == q_max, greedy_prob, non_greedy_prob)
    return probs

def epsilon_greedy_choice(Q, state, epsilon=0.1):
    num_actions = len(Q[state])
    probs = epsilon_greedy_probs(Q, state, epsilon)
    return np.random.choice(num_actions, p=probs)


In [None]:
# Cell 3: Definição dos Algoritmos

def run_expected_sarsa(env, episodes, lr=0.1, gamma=0.95, epsilon=0.1):
    """Algoritmo Expected-SARSA"""
    assert isinstance(env.observation_space, gym.spaces.Discrete)
    assert isinstance(env.action_space, gym.spaces.Discrete)

    num_actions = env.action_space.n
    Q = np.random.uniform(low=-0.01, high=0.01, size=(env.observation_space.n, num_actions))
    episode_rewards = []

    for i in range(episodes):
        done = False
        total_reward = 0
        state, _ = env.reset()

        while not done:
            action = epsilon_greedy_choice(Q, state, epsilon)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            if terminated:
                V_next = 0
            else:
                p_next = epsilon_greedy_probs(Q, next_state, epsilon)
                V_next = np.sum(p_next * Q[next_state])

            Q[state, action] += lr * ((reward + gamma * V_next) - Q[state, action])
            total_reward += reward
            state = next_state

        episode_rewards.append(total_reward)
    return episode_rewards, Q

def run_montecarlo2(env, episodes, lr=0.1, gamma=0.95, epsilon=0.1, render_env=None):
    """Algoritmo Monte Carlo (toda-visita)"""
    assert isinstance(env.observation_space, gym.spaces.Discrete)
    assert isinstance(env.action_space, gym.spaces.Discrete)

    num_actions = env.action_space.n
    Q = np.zeros((env.observation_space.n, num_actions))
    episode_rewards = []

    for i in range(episodes):
        done = False
        total_reward = 0
        trajectory = []

        train_env = render_env if (render_env is not None and ((i+1) % 1000 == 0 or i >= episodes-5)) else env

        state, _ = train_env.reset()
        while not done:
            if np.random.random() < epsilon:
                action = np.random.randint(0, num_actions)
            else:
                action = np.argmax(Q[state])
            next_state, reward, terminated, truncated, _ = train_env.step(action)
            done = terminated or truncated
            trajectory.append((state, action, reward))
            total_reward += reward
            state = next_state

        episode_rewards.append(total_reward)
        G = 0
        for (s, a, r) in reversed(trajectory):
            G = r + gamma * G
            Q[s, a] += lr * (G - Q[s, a])
    return episode_rewards, Q

def run_hybrid_expected_sarsa_mc(env, episodes, lr=0.1, gamma=0.95, epsilon=0.1):

    # Verifica se os espaços do ambiente são discretos (necessário para indexar a Q-table)
    assert isinstance(env.observation_space, gym.spaces.Discrete)
    assert isinstance(env.action_space, gym.spaces.Discrete)

    # Número de ações disponíveis no ambiente
    num_actions = env.action_space.n

    # Inicializa a Q-table com valores pequenos aleatórios para evitar empates
    Q = np.random.uniform(low=-0.01, high=+0.01, size=(env.observation_space.n, num_actions))

    # Lista para armazenar o retorno (soma de recompensas) de cada episódio
    all_episode_rewards = []

    # Loop principal de treinamento: itera sobre cada episódio
    for i in range(episodes):
        done = False               # Flag para controlar quando o episódio termina
        sum_rewards = 0            # Acumulador para a recompensa total do episódio
        episode_trajectory = []    # Armazena a sequência de transições (estado, ação, recompensa)

        # Reinicia o ambiente e obtém o estado inicial
        state, _ = env.reset()

        # -------------------------------
        # Parte Online: Atualização Expected-SARSA
        # -------------------------------
        while not done:
            # Seleciona uma ação utilizando a política epsilon-greedy
            action = epsilon_greedy_choice(Q, state, epsilon)

            # Executa a ação no ambiente e obtém o próximo estado, recompensa e sinal de término
            next_state, reward, terminated, truncated, _ = env.step(action)
            # O episódio termina se houver 'terminated' ou 'truncated'
            done = terminated or truncated

            # Armazena a transição para uso na atualização offline
            episode_trajectory.append((state, action, reward))

            # Se o episódio terminou, o valor do próximo estado é considerado 0
            if terminated:
                V_next_state = 0
            else:
                # Calcula as probabilidades das ações no próximo estado (epsilon-greedy)
                p_next_actions = epsilon_greedy_probs(Q, next_state, epsilon)
                # Valor esperado para o próximo estado é a soma ponderada dos Q-values
                V_next_state = np.sum(p_next_actions * Q[next_state])

            # Calcula o erro temporal (TD error) para a atualização online
            delta = (reward + gamma * V_next_state) - Q[state, action]
            # Atualiza o Q-value para a ação tomada no estado atual
            Q[state, action] += lr * delta

            # Acumula a recompensa obtida neste passo
            sum_rewards += reward
            # Atualiza o estado atual para o próximo estado
            state = next_state

        # Fim do episódio: armazena o total de recompensas obtidas
        all_episode_rewards.append(sum_rewards)

        # -------------------------------
        # Parte Offline: Atualização Monte Carlo
        # -------------------------------
        # Reinicia o retorno acumulado para o episódio (Gt)
        Gt = 0
        # Percorre a trajetória do episódio em ordem reversa (do final para o início)
        for (s, a, r) in reversed(episode_trajectory):
            # Calcula o retorno descontado (G_t = r + gamma * G_{t+1})
            Gt = r + gamma * Gt
            # Calcula a diferença entre o retorno real e a estimativa atual
            delta = Gt - Q[s, a]
            # Atualiza o Q-value usando o learning rate
            Q[s, a] += lr * delta

        # Opcional: a cada 100 episódios, imprime a média dos retornos dos últimos 100 episódios
        if (i + 1) % 100 == 0:
            avg_reward = np.mean(all_episode_rewards[-100:])
            print(f"[Híbrido] Episódio {i+1} - Média últimos 100: {avg_reward:.3f}")

    # Retorna a lista de retornos por episódio e a Q-table final
    return all_episode_rewards, Q


In [None]:
# Cell 4: Otimização dos Hiperparâmetros via Optuna para Cada Algoritmo

def optimize_expected_sarsa_params(env_name, episodes=200):
    """Otimiza os hiperparâmetros do Expected-SARSA para um ambiente dado."""
    def objective(trial):
        epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
        lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
        gamma = trial.suggest_uniform("gamma", 0.8, 1.0)
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        rewards, _ = run_expected_sarsa(env, episodes, lr=lr_opt, gamma=gamma, epsilon=epsilon)
        env.close()
        return np.mean(rewards[-100:])

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=10)
    print(f"Optuna: Melhores parâmetros para Expected-SARSA em {env_name}: {study.best_params}")
    return study.best_params

def optimize_montecarlo_params(env_name, episodes=200):
    """Otimiza os hiperparâmetros do Monte Carlo para um ambiente dado."""
    def objective(trial):
        epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
        lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
        gamma = trial.suggest_uniform("gamma", 0.8, 1.0)
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        rewards, _ = run_montecarlo2(env, episodes, lr=lr_opt, gamma=gamma, epsilon=epsilon)
        env.close()
        return np.mean(rewards[-100:])

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=10)
    print(f"Optuna: Melhores parâmetros para Monte Carlo em {env_name}: {study.best_params}")
    return study.best_params

def optimize_hybrid_params(env_name, episodes=200):
    """Otimiza os hiperparâmetros do algoritmo Híbrido para um ambiente dado."""
    def objective(trial):
        epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
        lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
        gamma = trial.suggest_uniform("gamma", 0.8, 1.0)
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        rewards, _ = run_hybrid_expected_sarsa_mc(env, episodes, lr=lr_opt, gamma=gamma, epsilon=epsilon)
        env.close()
        return np.mean(rewards[-100:])

    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=10)
    print(f"Optuna: Melhores parâmetros para Híbrido em {env_name}: {study.best_params}")
    return study.best_params


In [None]:
# Cell 5: Parâmetros dos Experimentos

# Lista de learning rates para variar (usados em Expected-SARSA e Monte Carlo)
learning_rates = [0.01, 0.05, 0.1, 0.2]

# Definindo os ambientes e o número de episódios para cada um
env_configs = {
    "Taxi-v3": {"episodes": 5000},
    "FrozenLake-v1": {"episodes": 5000},
    "CliffWalking-v0": {"episodes": 5000}
}


In [None]:
# Cell 6: Loop de Experimentos, Otimização e Plotagem

# Para cada ambiente, otimiza os hiperparâmetros de cada algoritmo via Optuna,
# executa o experimento completo com os melhores parâmetros e plota as curvas de aprendizado.
for env_name, config in env_configs.items():
    fig, ax = plt.subplots(figsize=(8, 6))

    # Expected-SARSA
    print(f"\nOtimização do Expected-SARSA para {env_name}...")
    best_params_ES = optimize_expected_sarsa_params(env_name, episodes=200)
    if env_name == "CliffWalking-v0":
        env = gym.make(env_name, max_episode_steps=500)
    else:
        env = gym.make(env_name)
    print(f"Executando Expected-SARSA em {env_name} com parâmetros: {best_params_ES}")
    rewards_ES, _ = run_expected_sarsa(env, episodes=config["episodes"],
                                       lr=best_params_ES["lr"],
                                       gamma=best_params_ES["gamma"],
                                       epsilon=best_params_ES["epsilon"])
    env.close()
    plot_learning_curve(ax, rewards_ES, window=50, label="Expected-SARSA")

    # Monte Carlo
    print(f"\nOtimização do Monte Carlo para {env_name}...")
    best_params_MC = optimize_montecarlo_params(env_name, episodes=200)
    if env_name == "CliffWalking-v0":
        env = gym.make(env_name, max_episode_steps=500)
    else:
        env = gym.make(env_name)
    print(f"Executando Monte Carlo em {env_name} com parâmetros: {best_params_MC}")
    rewards_MC, _ = run_montecarlo2(env, episodes=config["episodes"],
                                    lr=best_params_MC["lr"],
                                    gamma=best_params_MC["gamma"],
                                    epsilon=best_params_MC["epsilon"])
    env.close()
    plot_learning_curve(ax, rewards_MC, window=50, label="Monte Carlo")

    # Híbrido
    print(f"\nOtimização do Híbrido para {env_name}...")
    best_params_HYB = optimize_hybrid_params(env_name, episodes=200)
    if env_name == "CliffWalking-v0":
        env = gym.make(env_name, max_episode_steps=500)
    else:
        env = gym.make(env_name)
    print(f"Executando Híbrido em {env_name} com parâmetros: {best_params_HYB}")
    rewards_HYB, _ = run_hybrid_expected_sarsa_mc(env, episodes=config["episodes"],
                                                  lr=best_params_HYB["lr"],
                                                  gamma=best_params_HYB["gamma"],
                                                  epsilon=best_params_HYB["epsilon"])
    env.close()
    plot_learning_curve(ax, rewards_HYB, window=50, label="Híbrido")

    ax.set_title(f"{env_name} - Comparação dos Algoritmos (com Optuna)")
    ax.legend()
    plt.tight_layout()
    plt.show()


[I 2025-02-21 01:48:43,489] A new study created in memory with name: no-name-5c504672-9271-4b61-804b-1ed2539719f2



Otimização do Expected-SARSA para Taxi-v3...


  epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
  lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
  gamma = trial.suggest_uniform("gamma", 0.8, 1.0)
[I 2025-02-21 01:48:54,091] Trial 0 finished with value: -286.9 and parameters: {'epsilon': 0.04713339470847364, 'lr': 0.0001576871491840037, 'gamma': 0.8239351943205514}. Best is trial 0 with value: -286.9.
[I 2025-02-21 01:49:01,115] Trial 1 finished with value: -261.35 and parameters: {'epsilon': 0.07422009310191259, 'lr': 0.01085996432270055, 'gamma': 0.822968385381648}. Best is trial 1 with value: -261.35.
[I 2025-02-21 01:49:06,477] Trial 2 finished with value: -268.54 and parameters: {'epsilon': 0.0906158218430787, 'lr': 0.008554563190940525, 'gamma': 0.8568565486521351}. Best is trial 1 with value: -261.35.
[I 2025-02-21 01:49:11,427] Trial 3 finished with value: -270.14 and parameters: {'epsilon': 0.03372971788073343, 'lr': 0.00032449956765381535, 'gamma': 0.9626863402435875}. Best is trial 1 with value: -261.35.
[I 20

Optuna: Melhores parâmetros para Expected-SARSA em Taxi-v3: {'epsilon': 0.07422009310191259, 'lr': 0.01085996432270055, 'gamma': 0.822968385381648}
Executando Expected-SARSA em Taxi-v3 com parâmetros: {'epsilon': 0.07422009310191259, 'lr': 0.01085996432270055, 'gamma': 0.822968385381648}


[I 2025-02-21 01:51:09,752] A new study created in memory with name: no-name-41e8ae71-4009-48ca-813d-60aaafa20200



Otimização do Monte Carlo para Taxi-v3...


  epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
  lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
  gamma = trial.suggest_uniform("gamma", 0.8, 1.0)
[I 2025-02-21 01:51:10,925] Trial 0 finished with value: -857.36 and parameters: {'epsilon': 0.035314743110489655, 'lr': 2.286395930349363e-05, 'gamma': 0.8792573074286198}. Best is trial 0 with value: -857.36.
[I 2025-02-21 01:51:12,112] Trial 1 finished with value: -905.23 and parameters: {'epsilon': 0.07606286992670788, 'lr': 0.010868693233837979, 'gamma': 0.8295971836044354}. Best is trial 0 with value: -857.36.
[I 2025-02-21 01:51:13,057] Trial 2 finished with value: -784.73 and parameters: {'epsilon': 0.02698956069614663, 'lr': 0.0011811162059817304, 'gamma': 0.9825395523421797}. Best is trial 2 with value: -784.73.
[I 2025-02-21 01:51:14,003] Trial 3 finished with value: -737.3 and parameters: {'epsilon': 0.024507936348339768, 'lr': 1.1197944455955811e-05, 'gamma': 0.9829857220482494}. Best is trial 3 with value: -737.3.

Optuna: Melhores parâmetros para Monte Carlo em Taxi-v3: {'epsilon': 0.02309892013391228, 'lr': 0.0012608272268817343, 'gamma': 0.942509250543687}
Executando Monte Carlo em Taxi-v3 com parâmetros: {'epsilon': 0.02309892013391228, 'lr': 0.0012608272268817343, 'gamma': 0.942509250543687}


[I 2025-02-21 01:51:40,267] A new study created in memory with name: no-name-2b3d9e0c-60a9-413b-909d-98954c7bf445



Otimização do Híbrido para Taxi-v3...


  epsilon = trial.suggest_uniform("epsilon", 0.01, 0.1)
  lr_opt = trial.suggest_loguniform("lr", 1e-5, 1e-1)
  gamma = trial.suggest_uniform("gamma", 0.8, 1.0)


[Híbrido] Episódio 100 - Média últimos 100: -363.940


[I 2025-02-21 01:51:45,228] Trial 0 finished with value: -313.85 and parameters: {'epsilon': 0.014544694684307403, 'lr': 0.05889262823650827, 'gamma': 0.8990429177788452}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -313.850
[Híbrido] Episódio 100 - Média últimos 100: -434.240


[I 2025-02-21 01:51:50,789] Trial 1 finished with value: -494.22 and parameters: {'epsilon': 0.07141525964359295, 'lr': 0.0009940448922967178, 'gamma': 0.9188266121349343}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -494.220
[Híbrido] Episódio 100 - Média últimos 100: -403.850


[I 2025-02-21 01:51:58,523] Trial 2 finished with value: -429.22 and parameters: {'epsilon': 0.019805470832897288, 'lr': 0.0007777855488462078, 'gamma': 0.8926711003270273}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -429.220
[Híbrido] Episódio 100 - Média últimos 100: -438.380


[I 2025-02-21 01:52:04,245] Trial 3 finished with value: -445.79 and parameters: {'epsilon': 0.09918132024840881, 'lr': 0.002801921585642179, 'gamma': 0.8796824584171996}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -445.790
[Híbrido] Episódio 100 - Média últimos 100: -465.050


[I 2025-02-21 01:52:09,483] Trial 4 finished with value: -415.37 and parameters: {'epsilon': 0.010244836950964564, 'lr': 1.5426422287519976e-05, 'gamma': 0.9434083463650674}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -415.370
[Híbrido] Episódio 100 - Média últimos 100: -403.170


[I 2025-02-21 01:52:14,842] Trial 5 finished with value: -480.44 and parameters: {'epsilon': 0.07873344867744825, 'lr': 0.00029280001356382326, 'gamma': 0.9181220614031725}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -480.440
[Híbrido] Episódio 100 - Média últimos 100: -411.950


[I 2025-02-21 01:52:20,149] Trial 6 finished with value: -563.6 and parameters: {'epsilon': 0.05727481932995039, 'lr': 7.474057627118825e-05, 'gamma': 0.9814191019328273}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -563.600
[Híbrido] Episódio 100 - Média últimos 100: -489.800


[I 2025-02-21 01:52:25,231] Trial 7 finished with value: -473.6 and parameters: {'epsilon': 0.09858625085592157, 'lr': 0.010140053306812906, 'gamma': 0.9508975101378554}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -473.600
[Híbrido] Episódio 100 - Média últimos 100: -403.460


[I 2025-02-21 01:52:30,598] Trial 8 finished with value: -409.51 and parameters: {'epsilon': 0.07271053573891635, 'lr': 0.0020323300156581472, 'gamma': 0.8520360954441575}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -409.510
[Híbrido] Episódio 100 - Média últimos 100: -417.770


[I 2025-02-21 01:52:35,339] Trial 9 finished with value: -384.77 and parameters: {'epsilon': 0.04397601592222559, 'lr': 2.3949912698919786e-05, 'gamma': 0.8140705070265737}. Best is trial 0 with value: -313.85.


[Híbrido] Episódio 200 - Média últimos 100: -384.770
Optuna: Melhores parâmetros para Híbrido em Taxi-v3: {'epsilon': 0.014544694684307403, 'lr': 0.05889262823650827, 'gamma': 0.8990429177788452}
Executando Híbrido em Taxi-v3 com parâmetros: {'epsilon': 0.014544694684307403, 'lr': 0.05889262823650827, 'gamma': 0.8990429177788452}
[Híbrido] Episódio 100 - Média últimos 100: -361.120
[Híbrido] Episódio 200 - Média últimos 100: -327.590
[Híbrido] Episódio 300 - Média últimos 100: -269.880
[Híbrido] Episódio 400 - Média últimos 100: -249.340
[Híbrido] Episódio 500 - Média últimos 100: -217.420
[Híbrido] Episódio 600 - Média últimos 100: -218.120
[Híbrido] Episódio 700 - Média últimos 100: -207.810
[Híbrido] Episódio 800 - Média últimos 100: -202.830
[Híbrido] Episódio 900 - Média últimos 100: -198.830
[Híbrido] Episódio 1000 - Média últimos 100: -205.980
[Híbrido] Episódio 1100 - Média últimos 100: -197.360
[Híbrido] Episódio 1200 - Média últimos 100: -201.120
[Híbrido] Episódio 1300 - Méd

In [None]:
# Cell 7: Experimentos com Learning Rates Fixos e Plotagem

# Para cada ambiente e para cada learning rate fixo, executa os três algoritmos
# (usando gamma=0.95 e epsilon=0.1) e plota as curvas de aprendizado.
for env_name, config in env_configs.items():
    for lr in learning_rates:
        fig, ax = plt.subplots(figsize=(8,6))

        # Expected-SARSA com learning rate fixo
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        print(f"Executando Expected-SARSA em {env_name} com lr fixo = {lr}")
        rewards_ES, _ = run_expected_sarsa(env, episodes=config["episodes"], lr=lr, gamma=0.95, epsilon=0.1)
        env.close()
        plot_learning_curve(ax, rewards_ES, window=50, label="Expected-SARSA")

        # Monte Carlo com learning rate fixo
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        print(f"Executando Monte Carlo em {env_name} com lr fixo = {lr}")
        rewards_MC, _ = run_montecarlo2(env, episodes=config["episodes"], lr=lr, gamma=0.95, epsilon=0.1)
        env.close()
        plot_learning_curve(ax, rewards_MC, window=50, label="Monte Carlo")

        # Híbrido com learning rate fixo
        if env_name == "CliffWalking-v0":
            env = gym.make(env_name, max_episode_steps=500)
        else:
            env = gym.make(env_name)
        print(f"Executando Híbrido em {env_name} com lr fixo = {lr}")
        rewards_HYB, _ = run_hybrid_expected_sarsa_mc(env, episodes=config["episodes"], lr=lr, gamma=0.95, epsilon=0.1)
        env.close()
        plot_learning_curve(ax, rewards_HYB, window=50, label="Híbrido")

        ax.set_title(f"{env_name} - Fixed Learning Rate: {lr}")
        ax.legend()
        plt.tight_layout()
        plt.show()
