In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import matplotlib.pyplot as plt
from itertools import product
import random

In [None]:
# Create environment
env = gym.make('Acrobot-v1')

# Dueling DQN Network - Type 1
class DuelingDQN_Type1(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DuelingDQN_Type1, self).__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.feature_layer = nn.Sequential(
            nn.Linear(state_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU()
        )

        self.value_stream = nn.Sequential(
            nn.Linear(32, 1)
        )

        self.advantage_stream = nn.Sequential(
            nn.Linear(32, action_dim)
        )

    def forward(self, x):
        x = self.feature_layer(x)
        values = self.value_stream(x)
        advantages = self.advantage_stream(x)
        q_values = values + (advantages - advantages.mean(dim=-1, keepdim=True))
        return q_values

# Dueling DQN Network - Type 2
class DuelingDQN_Type2(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DuelingDQN_Type2, self).__init__()
        self.state_dim = state_dim
        self.action_dim = action_dim

        self.feature_layer = nn.Sequential(
            nn.Linear(state_dim, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU()
        )

        self.value_stream = nn.Sequential(
            nn.Linear(32, 1)
        )

        self.advantage_stream = nn.Sequential(
            nn.Linear(32, action_dim)
        )

    def forward(self, x):
        x = self.feature_layer(x)
        values = self.value_stream(x)
        advantages = self.advantage_stream(x)
        q_values = values + torch.maximum(advantages - advantages.mean(dim=-1, keepdim=True), torch.tensor(0.))
        return q_values



# Experience Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, transition):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = transition
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Function to select actions using epsilon-greedy policy
def select_action(state, epsilon, model):
    if np.random.rand() < epsilon:
        return np.random.randint(model.action_dim)
    else:
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = model(state)
            return q_values.argmax().item()



# Define training function
def train_dqn(model, target_model, optimizer, loss_fn, replay_buffer, num_episodes, batch_size, gamma, eps_start, eps_end, eps_decay, target_update):
    returns = []
    best_return = float('-inf')
    best_hyperparams = None

    for episode in range(num_episodes):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            epsilon = max(eps_end, eps_start * np.exp(-eps_decay * episode))
            action = select_action(state, epsilon, model)
            next_state, reward, done, _ = env.step(action)
            total_reward += reward

            replay_buffer.push((state, action, reward, next_state, done))

            if len(replay_buffer) > batch_size:
                train_dqn_from_replay(replay_buffer, model, target_model, optimizer, loss_fn, batch_size, gamma)

            state = next_state

        returns.append(total_reward)

        if episode % target_update == 0:
            target_model.load_state_dict(model.state_dict())

        avg_return = np.mean(returns)
        if avg_return > best_return:
            best_return = avg_return
            best_hyperparams = episode

    return returns, best_hyperparams


# Function to train DQN from experience replay
def train_dqn_from_replay(replay_buffer, model, target_model, optimizer, loss_fn, batch_size, gamma):
    transitions = replay_buffer.sample(batch_size)
    batch = list(zip(*transitions))
    state_batch = torch.FloatTensor(batch[0])
    action_batch = torch.LongTensor(batch[1])
    reward_batch = torch.FloatTensor(batch[2])
    next_state_batch = torch.FloatTensor(batch[3])
    done_batch = torch.FloatTensor(batch[4])

    q_values = model(state_batch)
    next_q_values = target_model(next_state_batch)

    q_value = q_values.gather(1, action_batch.unsqueeze(1)).squeeze(1)
    next_q_value = next_q_values.max(1)[0]
    expected_q_value = reward_batch + gamma * next_q_value * (1 - done_batch)

    loss = loss_fn(q_value, expected_q_value.detach())

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

# Function to select actions using epsilon-greedy policy
def select_action(state, epsilon, model):
    if np.random.rand() < epsilon:
        return np.random.randint(model.action_dim)
    else:
        with torch.no_grad():
            state = torch.FloatTensor(state).unsqueeze(0)
            q_values = model(state)
            return q_values.argmax().item()

# Set hyperparameters grid
learning_rates = [0.001, 0.01]
batch_sizes = [32, 64, 128]
gammas = [0.99]
eps_starts = [1.0, 0.01]
eps_ends = [0.01, 0.995]
eps_decays = [0.99, 0.995]
target_updates = [10, 20, 50]

hyperparameter_grid = product(learning_rates, batch_sizes, gammas, eps_starts, eps_ends, eps_decays, target_updates)

# Fixed hyperparameters
num_episodes = 100
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
capacity = 10000

# Initialize replay buffer
replay_buffer = ReplayBuffer(capacity)

# Train Dueling DQN - Type 1
best_return_type1 = float('-inf')
best_hyperparams_type1 = None
for hyperparams in hyperparameter_grid:
    model_type1 = DuelingDQN_Type1(state_dim, action_dim)
    target_model_type1 = DuelingDQN_Type1(state_dim, action_dim)
    optimizer_type1 = optim.Adam(model_type1.parameters(), lr=hyperparams[0])
    loss_fn_type1 = nn.MSELoss()
    returns_type1, best_episode_type1 = train_dqn(model_type1, target_model_type1, optimizer_type1, loss_fn_type1, replay_buffer, num_episodes, hyperparams[1], hyperparams[2], hyperparams[3], hyperparams[4], hyperparams[5], hyperparams[6])
    if np.mean(returns_type1) > best_return_type1:
        best_return_type1 = np.mean(returns_type1)
        best_hyperparams_type1 = hyperparams
print("Best hyperparameters Type 1:", best_hyperparams_type1)

# Train Dueling DQN - Type 2
best_return_type2 = float('-inf')  # Initialize with a very low value
best_hyperparams_type2 = None
for hyperparams in hyperparameter_grid:
    model_type2 = DuelingDQN_Type2(state_dim, action_dim)
    target_model_type2 = DuelingDQN_Type2(state_dim, action_dim)
    optimizer_type2 = optim.Adam(model_type2.parameters(), lr=hyperparams[0])
    loss_fn_type2 = nn.MSELoss()
    returns_type2, best_episode_type2 = train_dqn(model_type2, target_model_type2, optimizer_type2, loss_fn_type2, replay_buffer, num_episodes, hyperparams[1], hyperparams[2], hyperparams[3], hyperparams[4], hyperparams[5], hyperparams[6])
    if np.mean(returns_type2) > best_return_type2:
        best_return_type2 = np.mean(returns_type2)
        best_hyperparams_type2 = hyperparams
print("Best hyperparameters Type 2:", best_hyperparams_type2)


# Train Dueling DQN - Type 1 with best hyperparameters
model_type1 = DuelingDQN_Type1(state_dim, action_dim)
target_model_type1 = DuelingDQN_Type1(state_dim, action_dim)
optimizer_type1 = optim.Adam(model_type1.parameters(), lr=best_hyperparams_type1[0])
loss_fn_type1 = nn.MSELoss()
returns_type1, best_episode_type1 = train_dqn(model_type1, target_model_type1, optimizer_type1, loss_fn_type1, replay_buffer, num_episodes, best_hyperparams_type1[1], best_hyperparams_type1[2], best_hyperparams_type1[3], best_hyperparams_type1[4], best_hyperparams_type1[5], best_hyperparams_type1[6])

# Train Dueling DQN - Type 2 with best hyperparameters
model_type2 = DuelingDQN_Type2(state_dim, action_dim)
target_model_type2 = DuelingDQN_Type2(state_dim, action_dim)
optimizer_type2 = optim.Adam(model_type2.parameters(), lr=best_hyperparams_type2[0])
loss_fn_type2 = nn.MSELoss()
returns_type2, best_episode_type2 = train_dqn(model_type2, target_model_type2, optimizer_type2, loss_fn_type2, replay_buffer, num_episodes, best_hyperparams_type2[1], best_hyperparams_type2[2], best_hyperparams_type2[3], best_hyperparams_type2[4], best_hyperparams_type2[5], best_hyperparams_type2[6])

# Plot results
plt.plot(returns_type1, label='Type 1 Dueling DQN')
plt.plot(returns_type2, label='Type 2 Dueling DQN')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Dueling DQN Training')
plt.legend()
plt.show()


  deprecation(
  deprecation(
  if not isinstance(terminated, (bool, np.bool8)):
  state_batch = torch.FloatTensor(batch[0])


In [5]:
best_hyperparams_type1

(0.001, 64, 0.99, 0.01, 0.01, 0.99, 10)

In [3]:
best_hyperparams_type2

(0.01, 128, 0.99, 0.01, 0.995, 0.995, 50)