In [None]:
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Dueling DQN Network
class DuelingDQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DuelingDQN, self).__init__()
        self.feature_layer = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU()
        )
        self.advantage_layer = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim)
        )
        self.value_layer = nn.Sequential(
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        x = self.feature_layer(x)
        advantage = self.advantage_layer(x)
        value = self.value_layer(x)
        return value + advantage - advantage.mean(dim=1, keepdim=True)

In [None]:
# Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        experience = (state, action, reward, next_state, done)
        self.buffer.append(experience)

    def sample(self, batch_size):
        batch = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*[self.buffer[idx] for idx in batch])
        return np.array(states), np.array(actions), np.array(rewards, dtype=np.float32), np.array(next_states), np.array(dones, dtype=np.uint8)

    def __len__(self):
        return len(self.buffer)

In [None]:
# Dueling DQN Agent
class DuelingDQNAgent:
    def __init__(self, env, gamma=0.99, lr=0.001, update_rule='average', replay_buffer_capacity=10000):
        self.env = env
        self.gamma = gamma
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.input_dim = env.observation_space.shape[0]
        self.output_dim = env.action_space.n

        self.Q_net = DuelingDQN(self.input_dim, self.output_dim).to(self.device)
        self.target_net = DuelingDQN(self.input_dim, self.output_dim).to(self.device)
        self.target_net.load_state_dict(self.Q_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.Adam(self.Q_net.parameters(), lr=lr)

        self.update_rule = update_rule

        self.replay_buffer = ReplayBuffer(replay_buffer_capacity)

    def select_action(self, state, epsilon=0.1):
        if np.random.rand() < epsilon:
            return self.env.action_space.sample()
        else:
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
            with torch.no_grad():
                Q_values = self.Q_net(state)
            return Q_values.argmax().item()

    def update_network(self, batch_size):
        if len(self.replay_buffer) < batch_size:
            return

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)
        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions).to(self.device)
        rewards = torch.tensor(rewards).to(self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones).to(self.device)

        Q_values = self.Q_net(states)
        next_Q_values = self.target_net(next_states).detach()

        if self.update_rule == 'average':
            Q_value = Q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
            next_Q_value = next_Q_values.mean(1)
        elif self.update_rule == 'max':
            Q_value = Q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
            next_Q_value = next_Q_values.max(1)[0]

        target_Q_value = rewards + self.gamma * next_Q_value * (1 - dones)

        loss = nn.MSELoss()(Q_value, target_Q_value)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        self.target_net.load_state_dict(self.Q_net.state_dict())

    def train(self, episodes=1000, max_steps=500, batch_size=64, epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.99):
      episodic_returns = []
      epsilon = epsilon_start
      for episode in tqdm(range(episodes)):
          state = self.env.reset()
          episodic_return = 0
          for step in range(max_steps):
              action = self.select_action(state, epsilon)
              next_state, reward, done, _ = self.env.step(action)
              self.replay_buffer.push(state, action, reward, next_state, done)
              episodic_return += reward

              self.update_network(batch_size)
              if step % 100 == 0:
                  self.update_target_network()

              state = next_state
              if done:
                  break

          episodic_returns.append(episodic_return)
          epsilon = max(epsilon_end, epsilon_decay * epsilon)

          # calculate average reward
          #avg_reward = np.mean(episodic_returns[-100:]) if len(episodic_returns) >= 100 else np.mean(episodic_returns)
          #print(f"Episode {episode + 1}/{episodes}, Episode Reward: {episodic_return}, Average Reward: {avg_reward:.4f}")

      return episodic_returns

In [None]:
# Training function to train the agent on the environment and update rule
def train_agent(env_name, update_rule, gamma=0.99, lr=0.001, replay_buffer_capacity=10000, episodes=1000, max_steps=500, batch_size=64, epsilon_start=1.0, epsilon_end=0.1, epsilon_decay=0.99):
    env = gym.make(env_name)
    agent = DuelingDQNAgent(env, gamma=gamma, lr=lr, update_rule=update_rule, replay_buffer_capacity=replay_buffer_capacity)

    num_runs = 5
    all_returns = []
    for run in range(num_runs):
        print(f'Experiment Run: {run+1}')
        returns = agent.train(episodes=episodes, max_steps=max_steps, batch_size=batch_size, epsilon_start=epsilon_start, epsilon_end=epsilon_end, epsilon_decay=epsilon_decay)
        all_returns.append(returns)

    mean_returns = np.mean(all_returns, axis=0)
    std_returns = np.std(all_returns, axis=0)

    return mean_returns, std_returns

In [None]:
def plot_episodic_returns(mean_returns_list, std_returns_list, env_name, update_rules):
    update_map = {"average":"Type-1", "max":"Type-2"}
    for i in range(len(mean_returns_list)):
        plt.plot(mean_returns_list[i], label=f'{update_map[update_rules[i]]}')
        plt.fill_between(range(len(mean_returns_list[i])), mean_returns_list[i] - std_returns_list[i], mean_returns_list[i] + std_returns_list[i], alpha=0.2)
    plt.xlabel('Episodes')
    plt.ylabel('Episodic Return')
    plt.title(f'Dueling DQN - {env_name}')
    plt.legend()
    plt.show()

In [None]:
hyperparameters = {
    'gamma': 0.99,
    'lr': 0.001,
    'replay_buffer_capacity': 10000,
    'episodes': 500,
    'max_steps': 500,
    'batch_size': 128,
    'epsilon_start': 1.0,
    'epsilon_end': 0.1,
    'epsilon_decay': 0.99
}


In [None]:
env_name = 'CartPole-v1'
update_rules = ['average', 'max']
mean_returns_list = []
std_returns_list = []

for update_rule in update_rules:
    print(f"Training {env_name} with {update_rule} update rule".upper())
    mean_returns, std_returns = train_agent(env_name, update_rule, **hyperparameters)
    mean_returns_list.append(mean_returns)
    std_returns_list.append(std_returns)

plot_episodic_returns(mean_returns_list, std_returns_list, env_name, update_rules)

In [None]:
env_name = 'Acrobot-v1'
update_rules = ['average', 'max']
mean_returns_list = []
std_returns_list = []

for update_rule in update_rules:
    print(f"Training {env_name} with {update_rule} update rule".upper())
    mean_returns, std_returns = train_agent(env_name, update_rule, **hyperparameters)
    mean_returns_list.append(mean_returns)
    std_returns_list.append(std_returns)

plot_episodic_returns(mean_returns_list, std_returns_list, env_name, update_rules)