# Setup

In [1]:
import gymnasium as gym
env = gym.make('MountainCar-v0')
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
import wandb

In [2]:
starting_state, _ = env.reset() 

In [3]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size,dropout_rate, seed):
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.bn1 = nn.BatchNorm1d(hidden_size)  # BatchNorm1d layer
        self.fc2 = nn.Linear(hidden_size, action_size)
        self.dropout = nn.Dropout(dropout_rate)  # Dropout layer

    def forward(self, state):
        if state.shape[0] == 1:
            state = torch.unsqueeze(state, 0)
            x = F.relu(self.fc1(state))
        else:
            x = F.relu(self.bn1(self.fc1(state))) 
         # Apply BatchNorm1d after fc1
        x = self.dropout(x)  # Apply Dropout after BatchNorm1d
        
        return self.fc2(x)

In [5]:
class ReplayBuffer():
    def __init__(self, replay_size):
        self.state_buffer = None
        self.action_buffer = None
        self.reward_buffer = None
        self.next_state_buffer = None

        self.replay_size = replay_size

    def update(self, state, action, reward, next_state):
        if self.state_buffer is None:
            self.state_buffer = np.array(state)
            self.action_buffer = np.array([action])
            self.reward_buffer = np.array([reward])
            self.next_state_buffer = np.array(next_state)
        else:
            self.state_buffer = np.vstack((self.state_buffer[-self.replay_size:], state))
            self.action_buffer = np.hstack((self.action_buffer[-self.replay_size:], action))
            self.reward_buffer = np.hstack((self.reward_buffer[-self.replay_size:], reward))
            self.next_state_buffer = np.vstack((self.next_state_buffer[-self.replay_size:], next_state))


    def sample(self, batch_size):
        if self.state_buffer.shape[0] < self.replay_size:
            return None
        else:
            idx = np.random.choice(len(self.state_buffer), batch_size, replace=False)
            return [self.state_buffer[idx],self.action_buffer[idx],self.reward_buffer[idx],self.next_state_buffer[idx]]

In [6]:
class DQNAgent:
    def __init__(
        self,
        state_size : int,
        action_size : int,
        hidden_size:int,
        replay_size: int,
        learning_rate: float,
        initial_epsilon: float,
        final_epsilon: float,
        epsilon_decay: float,
        discount_factor: float = 0.95,
        dropout_rate: float = 0.1,
        seed: int = 42,
        scheduler=None,
    ):
        """Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.qnetwork = QNetwork(state_size=state_size, action_size=action_size, hidden_size=hidden_size, dropout_rate=dropout_rate,seed=seed).to(DEVICE)
        self.replay_buffer = ReplayBuffer(replay_size=replay_size)
        self.lr = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = initial_epsilon
        self.optimizer = torch.optim.Adam(self.qnetwork.parameters(), lr=learning_rate)
        self.training_error = []
        self.final_epsilon = final_epsilon
        self.epsilon_decay = epsilon_decay
        self.scheduler = scheduler

    def get_action(self, obs) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        
        if np.random.random() < self.epsilon:
            return env.action_space.sample()

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            with torch.no_grad():
                return int(torch.argmax(self.qnetwork(torch.tensor(obs,device=DEVICE).unsqueeze(0))))

    def update(
        self,
        obs: tuple[int, int, bool],
        action: int,
        reward: float,
        next_obs: tuple[int, int, bool],
        batch_size: int = 32,
    ):
        
        self.replay_buffer.update(obs, action, reward, next_obs)

        sample_replay = self.replay_buffer.sample(batch_size)
        if sample_replay is None:
            return None, None
        replay_reward = torch.tensor(sample_replay[2]).to(DEVICE)
        actions = torch.tensor(sample_replay[1]).to(DEVICE)
        replay_obs = torch.tensor(sample_replay[0]).to(DEVICE)
        replay_next_obs = torch.tensor(sample_replay[3]).to(DEVICE)

        self.qnetwork.train()
        q_values_next_obs = self.qnetwork(replay_next_obs)
        q_values_obs = self.qnetwork(replay_obs)

        temporal_difference = torch.sum(
            replay_reward + self.discount_factor * torch.max(q_values_next_obs,dim=1).values - torch.gather(q_values_obs, 1, actions.unsqueeze(1)).squeeze(1)
        )

        self.optimizer.zero_grad()

        loss = temporal_difference ** 2

        loss.backward()
        self.optimizer.step()
        if self.scheduler is not None:
            self.scheduler.step()

        self.training_error.append(temporal_difference)
        self.qnetwork.eval()
        return loss.item(), reward


    def decay_epsilon(self):
        if self.replay_buffer.reward_buffer.shape[0] > self.replay_buffer.replay_size:
            self.epsilon = max(self.final_epsilon, self.epsilon*self.epsilon_decay)

In [7]:
# hyperparameters
learning_rate = 1e-3
n_episodes = 3_000
start_epsilon = 0.9
final_epsilon = 0.05
epsilon_decay = 0.999  # reduce the exploration over time
batch_size = 64
discount_factor = 0.95
replay_size = 10_000
logging_interval = 50
hidden_size=128
dropout_rate=0.1

In [8]:


agent = DQNAgent(
    learning_rate=learning_rate,
    state_size=2,
    action_size=3,
    discount_factor=discount_factor,
    final_epsilon=final_epsilon,
    hidden_size=hidden_size,
    epsilon_decay=epsilon_decay,
    initial_epsilon=start_epsilon,
    replay_size=replay_size,
    dropout_rate=dropout_rate,
)

In [9]:
run = wandb.init(project='ANN', config={"learning_rate": learning_rate, "n_episodes": n_episodes, "start_epsilon": start_epsilon, "final_epsilon": final_epsilon, "epsilon_decay": epsilon_decay, "batch_size": batch_size, "discount_factor": discount_factor, "replay_size": replay_size, "hidden_size": hidden_size, "dropout_rate": dropout_rate}, name='DQN')


Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mnreateguir[0m ([33mreategui[0m). Use [1m`wandb login --relogin`[0m to force relogin


In [10]:
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)
with tqdm(total=n_episodes, desc=f"Episode 0/{n_episodes}") as pbar:
    losses = []
    rewards = []
    for episode in tqdm(range(n_episodes)):
        obs, info = env.reset()
        done = False

        # play one episode
        while not done:
            action = agent.get_action(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)

            # update the agent

            
            loss, reward = agent.update(obs, action, reward, next_obs, batch_size=batch_size)
            if loss is not None:
                rewards.append(reward)
                losses.append(loss)
            # update if the environment is done and the current obs
            done = terminated or truncated
            obs = next_obs

        agent.decay_epsilon()
        pbar.set_description(f"Episode {episode + 1}/{n_episodes}")
        pbar.set_postfix(train_loss=loss, epsilon=agent.epsilon)
        pbar.update(1)
        pbar.refresh() 
        if episode % logging_interval == 0:
            wandb.log({"train_loss": np.mean(losses), "epsilon": agent.epsilon, "mean_reward": np.mean(rewards)})
            losses = []
            rewards = []


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
Episode 105/3000:   4%|▎         | 105/3000 [00:28<22:35,  2.14it/s, epsilon=0.852, train_loss=0.872]

In [None]:
wandb.finish()

NameError: name 'wandb' is not defined