<a href="https://colab.research.google.com/github/Cpt-Shaan/RL-Implementations/blob/main/cartpole_dqn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Importing required libraries
import numpy as np
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from collections import namedtuple,deque

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Define the Q-Network structure

class DQN(nn.Module):
    def __init__(self, state_size, action_size, seed, fc1_nodes = 64, fc2_nodes = 64):
        super(DQN, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, fc1_nodes)
        self.fc2 = nn.Linear(fc1_nodes, fc2_nodes)
        self.fc3 = nn.Linear(fc2_nodes, action_size)
        self.to(device)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)


In [None]:
# Replay memory class

class ReplayMemory:
    def __init__(self, action_size, buffer_size, batch_size, seed):
        self.action_size = action_size
        self.memory = deque(maxlen = buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names = ["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def __len__(self):
        return len(self.memory)

    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        experiences = random.sample(self.memory, k = self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).float().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None])).float().to(device)

In [None]:
# Class for the DQN-Agent

class DQNAgent:
    def __init__(self, state_size, action_size, seed, lr):
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        self.localNet = DQN(state_size, action_size, seed).to(device)
        self.targetNet = DQN(state_size, action_size, seed).to(device)
        self.optimizer = optim.Adam(self.localNet.parameters(), lr)
        self.memory = ReplayMemory(action_size, buffer_size = int(1e5), batch_size = 64, seed = seed)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if(len(self.memory) > 64):
                experiences = self.memory.sample()
                self.learn(experiences, gamma = 0.99)

    # Chosse an action based on the epsilon-greedy poilcy
    def act(self, state, eps = 0.):
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)

        self.localNet.eval()
        with torch.no_grad():
            action_values = self.localNet(state_tensor)

        self.localNet.train()

        if(np.random.random() > eps):
            return action_values.argmax(dim = 1).item()
        else:
            return np.random.randint(self.action_size)

    # Learn from the experiences in the replay-memory
    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.from_numpy(np.vstack(states)).float().to(device)
        actions = torch.from_numpy(np.vstack(actions)).long().to(device)  # Use long for gather
        rewards = torch.from_numpy(np.vstack(rewards)).float().to(device)
        next_states = torch.from_numpy(np.vstack(next_states)).float().to(device)
        dones = torch.from_numpy(np.vstack(dones)).float().to(device)

        next_targets = self.targetNet(next_states).detach().max(1)[0].unsqueeze(1)
        targets = rewards + (gamma * next_targets * (1 - dones))
        expected_qvalue = self.localNet(states).gather(1, actions)

        loss = F.mse_loss(expected_qvalue, targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.soft_update(self.localNet, self.targetNet, tau = 1e-3)


    def soft_update(self, localNet, targetNet, tau):
        for target_param, local_param in zip(targetNet.parameters(), localNet.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)


In [None]:
# Setup environment
env = gym.make("CartPole-v1")

# Hyperparameters
num_episodes = 3000
max_steps = 200
epsilon_start = 1.0
epsilon_end = 0.2
epsilon_decay_rate = 0.99
gamma = 0.9
lr = 0.0025
buffer_size = 10000
buffer = deque(maxlen=buffer_size)
batch_size = 128

# Initialize DQN-Agent
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
new_agent = DQNAgent(input_dim, output_dim, seed=69691, lr = lr)

In [None]:
total_rewards = 0

for episode in range(num_episodes):
    state = env.reset()[0]
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))

    # One episode
    for step in range(max_steps):
        action = new_agent.act(state,epsilon)
        next_state,reward,done,_,_ = env.step(action)
        total_rewards += reward
        buffer.append((state, action, reward, next_state, done))

        if(len(buffer) >= batch_size):
            batch = random.sample(buffer, batch_size)

            # update the agent's knowledge
            new_agent.learn(batch, gamma)

        state = next_state

        if done:
            break

    if (episode + 1) % 20 == 0:
        avg_reward = total_rewards / 20
        print(f"Episode {episode + 1} : Finished Training, Average Rewards over last 20 episodes : {avg_reward:0.2f}")
        total_rewards = 0

In [None]:
# Evaluate the agent's performance
test_episodes = 100
episode_rewards = []

for episode in range(test_episodes):
    state = env.reset()[0]
    episode_reward = 0
    done = False

    while not done:
        action = new_agent.act(state, eps=0.)
        next_state, reward, done, _, _ = env.step(action)
        episode_reward += reward
        state = next_state

    episode_rewards.append(episode_reward)

average_reward = sum(episode_rewards) / test_episodes
print(f"Average reward over {test_episodes} test episodes: {average_reward:.2f}")

In [None]:
# Visualize the agent's performance
import time

env = gym.make("CartPole-v1", render_mode = "human")
state = env.reset()[0]
done = False

while not done:

    action = new_agent.act(state, eps=0.)
    next_state, reward, done, _ , _ = env.step(action)
    state = next_state
    time.sleep(0.1)  # Add a delay to make the visualization easier to follow

env.close()