In [None]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

import environments.mountain_car as mc
from useful import trees
from agents import dqn_agent
from agents import ddqn_agent
from agents import pddqn_agent

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Models

In [None]:
class mountain_car_model_1(nn.Module): 
    def __init__(self, n_observations, n_actions):
        super(mountain_car_model_1, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(n_observations, 256),
            nn.ReLU(),
            nn.Linear(256, n_actions)
        )

    def forward(self, x):
        return self.model(x)

# Functions

In [None]:
def testing(env, agent, n_episodes, file_name, options = None):
    """
    Training agents for the mountain car environment. Provides the mean positions as a graph with the mean rewards. 
    -----
    Input
    -----
    env: gym environment
    agent: training agent
    n_episodes: an integer for the number of episodes to run
    file_name: a string name of the model to import
    options: a dict that changes the environment
    ------
    output
    ------
    reward_list: a list of the total reward per episode
    state_list: a list of states per episode
    starting_positions: a list of starting positions
    """
    agent.main_model.load_state_dict(torch.load(f"model_weights/mountain_car/{file_name}.pth"))
    agent.main_model.eval()
    reward_list, state_list, starting_positions, global_steps, success = [], [], [], 0, 0

    for episode in range(n_episodes):
        total_reward, position_list = 0, []
        state, _ = env.reset(options = options)
        starting_positions.append(state[0])

        for step in range(env.max_steps):
            global_steps += 1
            position_list.append(state)

            action = agent.act(state)
            next_state, reward, termination, truncation, _ = env.step(action)

            state = next_state
            total_reward += reward
            if termination:
                success += 1
                break
            if truncation:
                break

        position_list.append(state)
        reward_list.append(total_reward)
        state_list.append(position_list)
    
    print(f"Success Rate: {100 * success/n_episodes:.2f}% | Mean Reward: {np.mean(reward_list):.2f}")

    return reward_list, state_list, starting_positions

In [None]:
def positions_to_rewards(reward_list, starting_positions, move):
    """
    Takes starting positions and converts them to the reward for the episode
    -----
    Input
    -----
    reward_list: a list of rewards 
    starting_positions: a list of starting_positions
    move: a list for how to move each line 
    ------
    Output
    ------
    position_bounds: a dict with the bounds and the corresponding rewards
    """
    position_bounds = {"bound_1": [], "bound_2": [], "bound_3": [], "bound_4": []}
    
    for i, j in zip(reward_list, starting_positions):
        if -0.6 + move <= j < -0.55 + move:
            position_bounds["bound_1"].append(i)
        elif -0.55 + move <= j < -0.5 + move:
            position_bounds["bound_2"].append(i)
        elif -0.5 + move <= j < -0.45 + move:
            position_bounds["bound_3"].append(i)
        else:
            position_bounds["bound_4"].append(i)

    return position_bounds

In [None]:
def plot_bars(reward_list, starting_positions, legends, title, moved = [], move = 0):
    """
    A function that plot a grouped bar plot
    -----
    Input
    -----
    reward_list: a list of rewards 
    starting_positions: a list of starting_positions
    legends: a list of annotations
    title: a string for the title
    moved: a list for changing the x labels 
    move: adjust to seperate the startin positions
    """
    if moved: 
        bounds = moved
    else:
        bounds = ["[-0.6, -0.55)", "[-0.55, -0.5)", "[-0.5, -0.45)", "[-0.45, -0.4)"]
    x = np.arange(len(bounds))
    width = 0.2

    plt.figure(figsize = (4.5, 3))
    
    for i in range(len(legends)):
        position_bounds = positions_to_rewards(reward_list = reward_list[i], starting_positions = starting_positions[i], move = move)
        means = [np.mean(i) for i in position_bounds.values()]
        stds = [np.std(i) for i in position_bounds.values()]

        plt.bar(x + (i - (len(legends) - 1)/2) * width, means, width = width, yerr = stds, label = legends[i])

    plt.xticks(x, bounds)
    plt.title(f"{title}")
    plt.xlabel("Starting Position Bounds")
    plt.ylabel("Average Reward")
    #plt.xticks(rotation = 10) 
    plt.grid(alpha = 0.25)

    plt.legend(fontsize = 8, loc = "upper center")
    plt.tight_layout()
    plt.savefig("_comparison.png")
    plt.show()

In [None]:
def plot_episode(state_list):
    """
    A function that plots random episodes
    -----
    Input
    -----
    state_list: a list of states
    """
    figs, axes = plt.subplots(1, 6, figsize = (18, 1.5))

    for j in range(6):
        random = np.random.randint(0, len(state_list))
        position_1 = [state[0] for state in state_list[random]]
        min_y, max_y = min(position_1), max(position_1)
        axes[j].plot(position_1, label = f"{position_1[0]:.2f}")
        axes[j].grid(alpha = 0.25)
        axes[j].set_ylim(min_y, max_y)
        axes[j].set_yticks([min_y, max_y])
        #axes[j].set_yticks(np.linspace(min_y, max_y, 4))
        axes[j].legend(loc = "upper left")

    plt.tight_layout()
    plt.savefig("_runs.png")
    plt.show()

In [None]:
def plot_terrain(move, x, y, title):
    """
    Plots the terrain for the episode
    -----
    Input
    -----
    move: a list for how to move each line
    x: an array of x values
    y: an array of y values
    title: a string for the title
    """
    plt.figure(figsize = (3, 2))

    plt.plot(x, y)
    plt.title(f"{title}")
    plt.xlabel("Position")
    plt.ylabel("Height")
    plt.axvline(-1.2 + move[0], color = "black", label = "Wall")
    plt.axvline(0.5 + move[1], color = "r", linestyle = "--", label = "Flag")
    plt.axvline(-0.6 + move[2], color = "g", linestyle = "--", label = "Start")
    plt.axvline(-0.4 + move[3], color = "g", linestyle = "--")

    #plt.legend()
    plt.tight_layout()
    plt.show()

# Standard Mountain Car Test

In [None]:
move = [0, 0, 0, 0]
x = np.linspace(-1.2, 0.6, 500) 
y = np.sin(3 * x)      

plot_terrain(move = move, x = x, y = y, title = "Standard Terrain")

In [None]:
env = mc.mountain_car_discrete_v1()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv1_rewards, mcv1_states, mcv1_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv1/mcv1_5")

In [None]:
plot_episode(state_list = mcv1_states)

In [None]:
env = mc.mountain_car_discrete_v1()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv2_rewards, mcv2_states, mcv2_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv2/mcv2_8")

In [None]:
plot_episode(state_list = mcv2_states)

In [None]:
reward_list = [mcv1_rewards, mcv2_rewards]
starting_positions = [mcv1_sp, mcv2_sp]
legends = ["MCV1", "MCV2"]

plot_bars(reward_list = reward_list, starting_positions = starting_positions, legends = legends, title = "Standard Environment")

# Changing Constants

In [None]:
env = mc.mountain_car_discrete_v1()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv1_f_rewards, mcv1_f_states, mcv1_f_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv1/mcv1_5", options = {"force": 0.1})

In [None]:
plot_episode(state_list = mcv1_f_states)

In [None]:
env = mc.mountain_car_discrete_v1()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv2_f_rewards, mcv2_f_states, mcv2_f_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv2/mcv2_8", options = {"force": 0.1})

In [None]:
plot_episode(state_list = mcv2_f_states)

In [None]:
reward_list = [mcv1_f_rewards, mcv2_f_rewards]
starting_positions = [mcv1_f_sp, mcv2_f_sp]
legends = ["MCV1", "MCV2"]

plot_bars(reward_list = reward_list, starting_positions = starting_positions, legends = legends, title = "Force: 0.1")

# Testing a Steeper Hill

In [None]:
move = [0, 0, 0, 0]
x = np.linspace(-1.2, 0.6, 500) 
y = 1.5 * np.sin(3 * x)        

plot_terrain(move = move, x = x, y = y, title = "Steeper Terrain")

In [None]:
env = mc.steeper_hill() 
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv1_steeper_rewards, mcv1_steeper_states, mcv1_steeper_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv1/mcv1_5")

In [None]:
plot_episode(state_list = mcv1_steeper_states)

In [None]:
env = mc.steeper_hill() 
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv2_steeper_rewards, mcv2_steeper_states, mcv2_steeper_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv2/mcv2_8")

In [None]:
plot_episode(state_list = mcv2_steeper_states)

In [None]:
reward_list = [mcv1_steeper_rewards, mcv2_steeper_rewards]
starting_positions = [mcv1_steeper_sp, mcv2_steeper_sp]
legends = ["MCV1", "MCV2"]

plot_bars(reward_list = reward_list, starting_positions = starting_positions, legends = legends, title = "Steeper Hill")

# Mirrored Terrain

In [None]:
move = [3.4, 0, 2.0, 2.0]
x = np.linspace(0.4, 2.2, 500) 
y = np.sin(3 * x)  

plot_terrain(move = move, x = x, y = y, title = "Mirrored Terrain")

In [None]:
env = mc.mirrored_terrain() 
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv1_mirrored_rewards, mcv1_mirrored_states, mcv1_mirrored_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv1/mcv1_5")

In [None]:
plot_episode(state_list = mcv1_mirrored_states)

In [None]:
env = mc.mirrored_terrain()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv2_mirrored_rewards, mcv2_mirrored_states, mcv2_mirrored_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv2/mcv2_8")

In [None]:
plot_episode(state_list = mcv2_mirrored_states)

In [None]:
reward_list = [mcv1_mirrored_rewards, mcv2_mirrored_rewards]
starting_positions = [mcv1_mirrored_sp, mcv2_mirrored_sp]
legends = ["MCV1", "MCV2"]
moved = ["[1.4, 1.45)", "[1.45, 1.5)", "[1.5, 1.55)", "[1.55, 1.6)"]
move = 2.0

plot_bars(reward_list = reward_list, starting_positions = starting_positions, legends = legends, title = "Mirrored Terrain", moved = moved, move = move)

# Extended Track Length

In [None]:
value = 2 * np.pi/3
move = [-value, 0, -value, -value]
x = np.linspace(-1.2 - value, 0.6, 500) 
y = np.sin(3 * x)   

plot_terrain(move = move, x = x, y = y, title = "Exteneded Terrain")

In [None]:
env = mc.extended_track() 
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv1_extended_rewards, mcv1_extended_states, mcv1_extended_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv1/mcv1_5")

In [None]:
plot_episode(state_list = mcv1_extended_states)

In [None]:
env = mc.extended_track()
agent = ddqn_agent.ddqn_agent_mse(model = mountain_car_model_1, state_dim = 2, action_dim = env.action_space.n, gamma = 0, lr = 0, 
                                  epsilon = 0, epsilon_min = 0, decay_steps = 1, buffer_size = 0, batch_size = 0, device = device)

mcv2_extended_rewards, mcv2_extended_states, mcv2_extended_sp = testing(env = env, agent = agent, n_episodes = 1000, file_name = "mcv2/mcv2_8")

In [None]:
plot_episode(state_list = mcv2_extended_states)

In [None]:
reward_list = [mcv1_extended_rewards, mcv2_extended_rewards]
starting_positions = [mcv1_extended_sp, mcv2_extended_sp]
legends = ["MCV1", "MCV2"]
moved = ["[-2.69, -2.64)", "[-2.64, -2.59)", "[-2.59, -2.54)", "[-2.54, -2.49)"]
move = -2 * np.pi/3

plot_bars(reward_list = reward_list, starting_positions = starting_positions, legends = legends, title = "Extended Track Length", moved = moved, move = move)