## DDQN in Simpler Environments

Base DQN implementation adapted from HW7

In [None]:
import os
import sys

import numpy as np
import gymnasium as gym
from gymnasium import spaces
from matplotlib import pyplot as plt
from tqdm.notebook import tqdm

# Adding the parent directory to the path to enable importing
root_dir = os.path.dirname(os.path.abspath("../"))
if root_dir not in sys.path:
    sys.path.append(root_dir)
    
from DDQN.DQN import DQNAgent, TargetDQNAgent, DoubleDQNAgent
from DDQN.DDQN import DuelingDQNAgent

In [None]:
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)


class DiscreteActionWrapper(gym.ActionWrapper):
    def __init__(self, env: gym.Env, bins = 5):
        """A wrapper for converting a 1D continuous actions into discrete ones.
        Args:
            env: The environment to apply the wrapper
            bins: number of discrete actions
        """
        assert isinstance(env.action_space, spaces.Box)
        super().__init__(env)
        self.bins = bins
        self.orig_action_space = env.action_space
        self.action_space = spaces.Discrete(self.bins)

    def action(self, action):
        """ discrete actions from low to high in 'bins'
        Args:
            action: The discrete action
        Returns:
            continuous action
        """
        return self.orig_action_space.low + action/(self.bins-1.0)*(self.orig_action_space.high-self.orig_action_space.low)  


def create_env(env_name, render=False, discrete_wrapper=None):
    if render:
        env = gym.make(env_name, render_mode='human')
    else:
        env = gym.make(env_name)

    if isinstance(env.action_space, spaces.Box):
        env = (DiscreteActionWrapper(env, 5) if discrete_wrapper is None else discrete_wrapper)

    ac_space = env.action_space
    o_space = env.observation_space
    print("Env. action space:")
    print(ac_space)
    print("Env. observation space:")
    print(o_space)
    #print(list(zip(env.observation_space.low, env.observation_space.high)))

    return env, ac_space, o_space


def train_dqn(q_agent, env, max_episodes=600, max_steps=500, print_freq=100):
    stats = []
    losses = []

    for i in tqdm(range(max_episodes)):
        # print("Starting a new episode")
        total_reward = 0
        ob, _info = env.reset()
        for t in range(max_steps):
            done = False
            a = q_agent.act(ob)
            (ob_new, reward, done, trunc, _info) = env.step(a)
            total_reward += reward
            q_agent.store_transition((ob, a, reward, ob_new, done))            
            ob=ob_new
            if done: break
        losses.extend(q_agent.train(32))
        stats.append([i,total_reward,t+1])    
        
        if (i-1) % print_freq == 0:
            print("{}: Done after {} steps. Reward: {}".format(i, t+1, total_reward))

    return stats, losses


def plot_training(stats, losses):
    stats_np = np.asarray(stats)
    losses_np = np.asarray(losses)

    fig=plt.figure(figsize=(6,3.8))
    plt.plot(stats_np[:,1], label="return")
    plt.plot(running_mean(stats_np[:,1],20), label="smoothed-return")
    plt.legend()

    plt.figure()
    plt.plot(losses_np)


def display_env(q_agent, env):
    """Display trained agent's performance. Human-mode rendering recommended for demonstration."""

    ob, _info = env.reset()
    if isinstance(env.action_space, spaces.Box):
        env = DiscreteActionWrapper(env,5)

    env.reset()
    total_reward = 0
    for t in range(500):
        a = q_agent.act(ob)
        (ob, reward, done, trunc, _info) = env.step(a)
        total_reward+= reward
        if done or trunc: break

    print("Total reward:", total_reward)

## DQN
Test DQN implementation on simple environments

### Pendulum

In [None]:
env, ac_space, o_space = create_env("Pendulum-v1")

q_agent = DQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=600, max_steps=500, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("Pendulum-v1", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

### LunarLander

In [None]:
env, ac_space, o_space = create_env("LunarLander-v3")

q_agent = DQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=1000, max_steps=600, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("LunarLander-v3", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

## DQN with Target Network

Test Target-DQN implementation on simple environments

### Pendulum

In [None]:
env, ac_space, o_space = create_env("Pendulum-v1")

q_agent = TargetDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=600, max_steps=500, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("Pendulum-v1", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

### LunarLander

In [None]:
env, ac_space, o_space = create_env("LunarLander-v3")

q_agent = TargetDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=1000, max_steps=600, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("LunarLander-v3", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

## Double DQN

### Pendulum

In [None]:
env, ac_space, o_space = create_env("Pendulum-v1")

q_agent = DoubleDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=600, max_steps=500, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("Pendulum-v1", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

### LunarLander

In [None]:
env, ac_space, o_space = create_env("LunarLander-v3")

q_agent = DoubleDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=1000, max_steps=600, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("LunarLander-v3", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

## Dueling DQN

### Pendulum

In [None]:
env, ac_space, o_space = create_env("Pendulum-v1")

q_agent = DuelingDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=600, max_steps=500, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("Pendulum-v1", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()

### LunarLander

In [None]:
env, ac_space, o_space = create_env("LunarLander-v3")

q_agent = DuelingDQNAgent(
    o_space,
    ac_space,
    discount=0.95,
    eps=0.2,
    update_target_every=20,
    tau=1e-3
)

In [None]:
stats, losses = train_dqn(q_agent, env, max_episodes=1000, max_steps=600, print_freq=100)

In [None]:
env.close()

In [None]:
plot_training(stats, losses)

In [None]:
# Display trained agent's performance
env = gym.make("LunarLander-v3", render_mode='human')

display_env(q_agent, env)

In [None]:
env.close()