18/05/2023

Q-learning

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import pickle
import os

#Q-learning

def smooth(y, box_pts):
    box = np.ones(box_pts) / box_pts
    y_smooth = np.convolve(y, box, mode='valid')
    return y_smooth

def plot_policy(q_table):
    actions = ['←', '↓', '→', '↑']
    action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'}

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, 8)
    ax.set_ylim(0, 8)

    for state in range(q_table.shape[0]):
        # Get the best action for the current state
        best_action = np.argmax(q_table[state])
        arrow = action_arrows[best_action]

        # Calculate the grid coordinates
        row = state // 8
        col = state % 8

        # Display the arrow in the grid cell
        ax.text(col + 0.5, 7 - row + 0.5, arrow, ha='center', va='center', fontsize=16)

    # Draw grid lines
    ax.set_xticks(np.arange(9))
    ax.set_yticks(np.arange(9))
    ax.grid(True)

    # Remove ticks
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    plt.title('Final policy for FrozenLake-v1 (8x8)')
    plt.savefig('fina_policy_frozen_lake8x8_avg.png')
    plt.show()

def run(episodes, num_runs=20, is_training=True, render=False, smooth_graph=False):
    env = gym.make('FrozenLake-v1', map_name="8x8", is_slippery=True, render_mode='human' if render else None)
    all_rewards = []

    # cargar la tabla Q promediada para la evaluación, si no está en entrenamiento
    if not is_training:
        if os.path.exists('frozen_lake8x8_avg.pkl'):
            with open('frozen_lake8x8_avg.pkl', 'rb') as f:
                q = pickle.load(f)
        else:
            print("No se encontró el archivo 'frozen_lake8x8_avg.pkl'. Ejecute en modo de entrenamiento primero.")
            return
    else:
        q = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q

    for run in range(num_runs):
        if is_training:
            q_run = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q para cada ejecución

        learning_rate_a = 0.01
        discount_factor_g = 0.9
        epsilon = 1
        epsilon_decay_rate = 0.0000333
        rng = np.random.default_rng()

        rewards_per_episode = np.zeros(episodes)

        for i in range(episodes):
            state = env.reset()[0]
            terminated = False
            truncated = False
            episode_reward = 0  # inicializar la recompensa del episodio

            while not terminated and not truncated:
                if is_training and rng.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    action = np.argmax(q_run[state, :] if is_training else q[state, :])

                new_state, reward, terminated, truncated, _ = env.step(action)

                if is_training:
                    q_run[state, action] = q_run[state, action] + learning_rate_a * (
                        reward + discount_factor_g * np.max(q_run[new_state, :]) - q_run[state, action]
                    )

                state = new_state
                episode_reward += reward  # acumular la recompensa del episodio

            if is_training:
                epsilon = max(epsilon - epsilon_decay_rate, 0)
                if epsilon == 0:
                    learning_rate_a = 0.001

            rewards_per_episode[i] += episode_reward  # sumar la recompensa acumulada del episodio

        all_rewards.append(rewards_per_episode)

        if is_training:
            with open(f"frozen_lake8x8_run{run}.pkl", "wb") as f:
                pickle.dump(q_run, f)

    env.close()

    # promediar los resultados de todas las ejecuciones
    if is_training:
        all_rewards = np.array(all_rewards)
        average_rewards = np.mean(all_rewards, axis=0)

        plt.figure(figsize=(10, 5))
        plt.plot(average_rewards, label='average rewards')
        if smooth_graph:
            average_rewards_smooth = smooth(average_rewards, 10)
            plt.plot(range(len(average_rewards_smooth)), average_rewards_smooth, label='smoothed average rewards')
        plt.xlabel('episodes')
        plt.ylabel('average reward')
        plt.title('average rewards per episode over training')
        plt.xlim(0, len(average_rewards))
        plt.legend()
        plt.grid(True)
        plt.savefig('training_frozen_lake8x8_avg.png')
        plt.show()

        # guardar la tabla Q promediada
        avg_q = np.mean([pickle.load(open(f"frozen_lake8x8_run{run}.pkl", 'rb')) for run in range(num_runs)], axis=0)
        with open('frozen_lake8x8_avg.pkl', 'wb') as f:
            pickle.dump(avg_q, f)

    if not is_training:
        plot_policy(q)

if __name__ == '__main__':
    is_training = False  # ajusta esto según sea necesario

    if is_training:
        smooth_input = input("¿Desea suavizar la gráfica? (y/n): ").strip().lower()
        smooth_graph = smooth_input == 'y'
    else:
        smooth_graph = False

    # para generar el promedio sobre 20 ejecuciones y 10,000 episodios, descomenta las siguientes líneas:
    #run(30_000, num_runs=20, is_training=is_training, render=False, smooth_graph=smooth_graph)
    run(1, num_runs=1,  is_training=is_training, render=True, smooth_graph=smooth_graph)  # evaluación usando la tabla Q promediada

SARSA

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import pickle
import os

# SARSA

def smooth(y, box_pts):
    box = np.ones(box_pts) / box_pts
    y_smooth = np.convolve(y, box, mode='valid')
    return y_smooth

def plot_policy(q_table):
    actions = ['←', '↓', '→', '↑']
    action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'}

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, 8)
    ax.set_ylim(0, 8)

    for state in range(q_table.shape[0]):
        # Get the best action for the current state
        best_action = np.argmax(q_table[state])
        arrow = action_arrows[best_action]

        # Calculate the grid coordinates
        row = state // 8
        col = state % 8

        # Display the arrow in the grid cell
        ax.text(col + 0.5, 7 - row + 0.5, arrow, ha='center', va='center', fontsize=16)

    # Draw grid lines
    ax.set_xticks(np.arange(9))
    ax.set_yticks(np.arange(9))
    ax.grid(True)

    # Remove ticks
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    plt.title('Final policy for FrozenLake-v1 (8x8)')
    plt.savefig('final_policy_frozen_lake8x8_avg.png')
    plt.show()

def run(episodes, num_runs=20, is_training=True, render=False, smooth_graph=False):
    env = gym.make('FrozenLake-v1', map_name="8x8", is_slippery=False, render_mode='human' if render else None)
    all_rewards = []

    # cargar la tabla Q promediada para la evaluación, si no está en entrenamiento
    if not is_training:
        if os.path.exists('frozen_lake8x8_avg.pkl'):
            with open('frozen_lake8x8_avg.pkl', 'rb') as f:
                q = pickle.load(f)
        else:
            print("No se encontró el archivo 'frozen_lake8x8_avg.pkl'. Ejecute en modo de entrenamiento primero.")
            return
    else:
        q = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q

    for run in range(num_runs):
        if is_training:
            q_run = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q para cada ejecución

        learning_rate_a = 0.01
        discount_factor_g = 0.9
        epsilon = 1
        epsilon_decay_rate = 0.0000333
        rng = np.random.default_rng()

        rewards_per_episode = np.zeros(episodes)

        for i in range(episodes):
            state = env.reset()[0]
            terminated = False
            truncated = False
            episode_reward = 0  # inicializar la recompensa del episodio

            if is_training and rng.random() < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(q_run[state, :] if is_training else q[state, :])

            while not terminated and not truncated:
                new_state, reward, terminated, truncated, _ = env.step(action)

                if is_training and rng.random() < epsilon:
                    next_action = env.action_space.sample()
                else:
                    next_action = np.argmax(q_run[new_state, :] if is_training else q[new_state, :])

                if is_training:
                    q_run[state, action] = q_run[state, action] + learning_rate_a * (
                        reward + discount_factor_g * q_run[new_state, next_action] - q_run[state, action]
                    )

                state = new_state
                action = next_action
                episode_reward += reward  # acumular la recompensa del episodio

            if is_training:
                epsilon = max(epsilon - epsilon_decay_rate, 0)
                if epsilon == 0:
                    learning_rate_a = 0.001

            rewards_per_episode[i] += episode_reward  # sumar la recompensa acumulada del episodio

        all_rewards.append(rewards_per_episode)

        if is_training:
            with open(f"frozen_lake8x8_run{run}.pkl", "wb") as f:
                pickle.dump(q_run, f)

    env.close()

    # promediar los resultados de todas las ejecuciones
    if is_training:
        all_rewards = np.array(all_rewards)
        average_rewards = np.mean(all_rewards, axis=0)

        plt.figure(figsize=(10, 5))
        plt.plot(average_rewards, label='average rewards')
        if smooth_graph:
            average_rewards_smooth = smooth(average_rewards, 10)
            plt.plot(range(len(average_rewards_smooth)), average_rewards_smooth, label='smoothed average rewards')
        plt.xlabel('episodes')
        plt.ylabel('average reward')
        plt.title('average rewards per episode over training')
        plt.xlim(0, len(average_rewards))
        plt.legend()
        plt.grid(True)
        plt.savefig('training_frozen_lake8x8_avg.png')
        plt.show()

        # guardar la tabla Q promediada
        avg_q = np.mean([pickle.load(open(f"frozen_lake8x8_run{run}.pkl", 'rb')) for run in range(num_runs)], axis=0)
        with open('frozen_lake8x8_avg.pkl', 'wb') as f:
            pickle.dump(avg_q, f)

    if not is_training:
        plot_policy(q)

if __name__ == '__main__':
    is_training = True  # ajusta esto según sea necesario

    if is_training:
        smooth_input = input("¿Desea suavizar la gráfica? (y/n): ").strip().lower()
        smooth_graph = smooth_input == 'y'
    else:
        smooth_graph = False

    # para generar el promedio sobre 20 ejecuciones y 10,000 episodios, descomenta las siguientes líneas:
    run(30_000, num_runs=20, is_training=is_training, render=False, smooth_graph=smooth_graph)
    #run(1, num_runs=1,  is_training=is_training, render=True, smooth_graph=smooth_graph)  # evaluación usando la tabla Q promediada

Expected SARSA

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import pickle
import os

# Expected SARSA

def smooth(y, box_pts):
    box = np.ones(box_pts) / box_pts
    y_smooth = np.convolve(y, box, mode='valid')
    return y_smooth

def plot_policy(q_table):
    actions = ['←', '↓', '→', '↑']
    action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'}

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, 8)
    ax.set_ylim(0, 8)

    for state in range(q_table.shape[0]):
        # Get the best action for the current state
        best_action = np.argmax(q_table[state])
        arrow = action_arrows[best_action]

        # Calculate the grid coordinates
        row = state // 8
        col = state % 8

        # Display the arrow in the grid cell
        ax.text(col + 0.5, 7 - row + 0.5, arrow, ha='center', va='center', fontsize=16)

    # Draw grid lines
    ax.set_xticks(np.arange(9))
    ax.set_yticks(np.arange(9))
    ax.grid(True)

    # Remove ticks
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    plt.title('Final policy for FrozenLake-v1 (8x8)')
    plt.savefig('final_policy_frozen_lake8x8_avg.png')
    plt.show()

def run(episodes, num_runs=20, is_training=True, render=False, smooth_graph=False):
    env = gym.make('FrozenLake-v1', map_name="8x8", is_slippery=False, render_mode='human' if render else None)
    all_rewards = []

    # cargar la tabla Q promediada para la evaluación, si no está en entrenamiento
    if not is_training:
        if os.path.exists('frozen_lake8x8_avg.pkl'):
            with open('frozen_lake8x8_avg.pkl', 'rb') as f:
                q = pickle.load(f)
        else:
            print("No se encontró el archivo 'frozen_lake8x8_avg.pkl'. Ejecute en modo de entrenamiento primero.")
            return
    else:
        q = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q

    for run in range(num_runs):
        if is_training:
            q_run = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q para cada ejecución

        learning_rate_a = 0.01
        discount_factor_g = 0.9
        epsilon = 1
        epsilon_decay_rate = 0.0000333
        rng = np.random.default_rng()

        rewards_per_episode = np.zeros(episodes)

        for i in range(episodes):
            state = env.reset()[0]
            terminated = False
            truncated = False
            episode_reward = 0  # inicializar la recompensa del episodio

            while not terminated and not truncated:
                if is_training and rng.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    action = np.argmax(q_run[state, :] if is_training else q[state, :])

                new_state, reward, terminated, truncated, _ = env.step(action)

                if is_training:
                    # Calcular el valor esperado de Q en el siguiente estado
                    policy = np.ones(env.action_space.n) * epsilon / env.action_space.n
                    best_action = np.argmax(q_run[new_state, :])
                    policy[best_action] += (1.0 - epsilon)
                    expected_q = np.sum(policy * q_run[new_state, :])

                    # Actualizar la tabla Q usando el valor esperado
                    q_run[state, action] = q_run[state, action] + learning_rate_a * (
                        reward + discount_factor_g * expected_q - q_run[state, action]
                    )

                state = new_state
                episode_reward += reward  # acumular la recompensa del episodio

            if is_training:
                epsilon = max(epsilon - epsilon_decay_rate, 0)
                if epsilon == 0:
                    learning_rate_a = 0.001

            rewards_per_episode[i] += episode_reward  # sumar la recompensa acumulada del episodio

        all_rewards.append(rewards_per_episode)

        if is_training:
            with open(f"frozen_lake8x8_run{run}.pkl", "wb") as f:
                pickle.dump(q_run, f)

    env.close()

    # promediar los resultados de todas las ejecuciones
    if is_training:
        all_rewards = np.array(all_rewards)
        average_rewards = np.mean(all_rewards, axis=0)

        plt.figure(figsize=(10, 5))
        plt.plot(average_rewards, label='average rewards')
        if smooth_graph:
            average_rewards_smooth = smooth(average_rewards, 10)
            plt.plot(range(len(average_rewards_smooth)), average_rewards_smooth, label='smoothed average rewards')
        plt.xlabel('episodes')
        plt.ylabel('average reward')
        plt.title('average rewards per episode over training')
        plt.xlim(0, len(average_rewards))
        plt.legend()
        plt.grid(True)
        plt.savefig('training_frozen_lake8x8_avg.png')
        plt.show()

        # guardar la tabla Q promediada
        avg_q = np.mean([pickle.load(open(f"frozen_lake8x8_run{run}.pkl", 'rb')) for run in range(num_runs)], axis=0)
        with open('frozen_lake8x8_avg.pkl', 'wb') as f:
            pickle.dump(avg_q, f)

    if not is_training:
        plot_policy(q)

if __name__ == '__main__':
    is_training = True  # ajusta esto según sea necesario

    if is_training:
        smooth_input = input("¿Desea suavizar la gráfica? (y/n): ").strip().lower()
        smooth_graph = smooth_input == 'y'
    else:
        smooth_graph = False

    # para generar el promedio sobre 20 ejecuciones y 10,000 episodios, descomenta las siguientes líneas:
    run(30_000, num_runs=20, is_training=is_training, render=False, smooth_graph=smooth_graph)
    #run(1, num_runs=1,  is_training=is_training, render=True, smooth_graph=smooth_graph)  # evaluación usando la tabla Q promediada

Monte Carlo First Visit with Exploring Starts

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import pickle
import os

def smooth(y, box_pts):
    box = np.ones(box_pts) / box_pts
    y_smooth = np.convolve(y, box, mode='valid')
    return y_smooth

def plot_policy(q_table):
    actions = ['←', '↓', '→', '↑']
    action_arrows = {0: '←', 1: '↓', 2: '→', 3: '↑'}

    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.set_xlim(0, 8)
    ax.set_ylim(0, 8)

    for state in range(q_table.shape[0]):
        # Get the best action for the current state
        best_action = np.argmax(q_table[state])
        arrow = action_arrows[best_action]

        # Calculate the grid coordinates
        row = state // 8
        col = state % 8

        # Display the arrow in the grid cell
        ax.text(col + 0.5, 7 - row + 0.5, arrow, ha='center', va='center', fontsize=16)

    # Draw grid lines
    ax.set_xticks(np.arange(9))
    ax.set_yticks(np.arange(9))
    ax.grid(True)

    # Remove ticks
    ax.set_xticklabels([])
    ax.set_yticklabels([])

    plt.title('Final policy for FrozenLake-v1 (8x8)')
    plt.savefig('final_policy_frozen_lake8x8_avg.png')
    plt.show()

def run(episodes, num_runs=20, is_training=True, render=False, smooth_graph=False):
    env = gym.make('FrozenLake-v1', map_name="8x8", is_slippery=False, render_mode='human' if render else None)
    all_rewards = []

    # cargar la tabla Q promediada para la evaluación, si no está en entrenamiento
    if not is_training:
        if os.path.exists('first_visit_mc_es_avg.pkl'):
            with open('first_visit_mc_es_avg.pkl', 'rb') as f:
                q = pickle.load(f)
        else:
            print("No se encontró el archivo 'first_visit_mc_es_avg.pkl'. Ejecute en modo de entrenamiento primero.")
            return
    else:
        q = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q

    for run in range(num_runs):
        if is_training:
            q_run = np.zeros((env.observation_space.n, env.action_space.n))  # inicializar la tabla Q para cada ejecución
            returns = {state: {action: [] for action in range(env.action_space.n)} for state in range(env.observation_space.n)}
        else:
            q_run = q  # en modo evaluación, usamos la tabla Q promediada

        epsilon = 1 if is_training else 0  # solo se usa epsilon en modo de entrenamiento
        epsilon_decay_rate = 0.0000333
        rng = np.random.default_rng()

        rewards_per_episode = np.zeros(episodes)

        for i in range(episodes):
            if is_training:
                # Exploring starts: inicializar el episodio desde un estado-acción aleatorio
                state = env.observation_space.sample()
                action = env.action_space.sample()
                new_state, reward, terminated, truncated, _ = env.step(action)
                episode = [(state, action, reward)]
                state = new_state
                episode_reward = reward
            else:
                state = env.reset()[0]
                episode = []
                episode_reward = 0

            terminated = False
            truncated = False

            while not terminated and not truncated:
                if is_training and rng.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    action = np.argmax(q_run[state, :])

                new_state, reward, terminated, truncated, _ = env.step(action)
                episode.append((state, action, reward))
                state = new_state
                episode_reward += reward  # acumular la recompensa del episodio
            # First visit
            visited = set()
            G = 0
            for state, action, reward in reversed(episode):
                G = reward + 0.9 * G  # discount_factor_g
                if (state, action) not in visited:
                    if is_training:
                        returns[state][action].append(G)
                        q_run[state, action] = np.mean(returns[state][action])
                    visited.add((state, action))

            if is_training:
                epsilon = max(epsilon - epsilon_decay_rate, 0)

            rewards_per_episode[i] += episode_reward  # sumar la recompensa acumulada del episodio

        all_rewards.append(rewards_per_episode)

        if is_training:
            with open(f"first_visit_mc_es_run{run}.pkl", "wb") as f:
                pickle.dump(q_run, f)

    env.close()

    # promediar los resultados de todas las ejecuciones
    if is_training:
        all_rewards = np.array(all_rewards)
        average_rewards = np.mean(all_rewards, axis=0)

        plt.figure(figsize=(10, 5))
        plt.plot(average_rewards, label='average rewards')
        if smooth_graph:
            average_rewards_smooth = smooth(average_rewards, 10)
            plt.plot(range(len(average_rewards_smooth)), average_rewards_smooth, label='smoothed average rewards')
        plt.xlabel('episodes')
        plt.ylabel('average reward')
        plt.title('average rewards per episode over training')
        plt.xlim(0, len(average_rewards))
        plt.legend()
        plt.grid(True)
        plt.savefig('training_first_visit_mc_es_avg.png')
        plt.show()

        # guardar la tabla Q promediada
        avg_q = np.mean([pickle.load(open(f"first_visit_mc_es_run{run}.pkl", 'rb')) for run in range(num_runs)], axis=0)
        with open('first_visit_mc_es_avg.pkl', 'wb') as f:
            pickle.dump(avg_q, f)

    if not is_training:
        plot_policy(q)

if __name__ == '__main__':
    is_training = False  # ajusta esto según sea necesario

    if is_training:
        smooth_input = input("¿Desea suavizar la gráfica? (y/n): ").strip().lower()
        smooth_graph = smooth_input == 'y'
    else:
        smooth_graph = False

    # para generar el promedio sobre 20 ejecuciones y 30,000 episodios, descomenta las siguientes líneas:
    #run(30_000, num_runs=20, is_training=is_training, render=False, smooth_graph=smooth_graph)
    run(1, num_runs=1, is_training=is_training, render=True, smooth_graph=smooth_graph)  # evaluación usando la tabla Q promediada