
### âœ… Recommended DQN Improvements

- Increase hidden units in DQN network to 128
- Increase replay memory size to 100,000
- Increase batch size to 128
- Slow down epsilon decay to 0.999
- Increase max episodes to 5000
- Expand discrete action space to better approximate continuous controls
- Reduce learning rate to 5e-4
- Update target network every 1 episode


# DDQN

In [None]:
from unityagents import UnityEnvironment
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque
import os
import matplotlib.pyplot as plt

## Environment Setup

In [None]:
env = UnityEnvironment(file_name="Tennis.x86") # This is how the Unity environment is initialized
brain_name = env.brain_names[0]
brain = env.brains[brain_name]
env_info = env.reset(train_mode=True)[brain_name]
discrete_actions = [(-1,0),(1,0),(0,1),(0,-1)]

## DQN Agent Class

In [None]:
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(24, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 4)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class DDQNAgent:
    def __init__(self, seed=0, load_path=None):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.q_network = DQN().to(self.device)
        self.target_network = DQN().to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.0005)
        if load_path and os.path.exists(load_path):
            self.q_network.load_state_dict(torch.load(load_path))
            self.target_network.load_state_dict(torch.load(load_path))
        for target_param, local_param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(local_param.data)
        self.replay_mem = deque(maxlen=100000)
        self.bSize =128
        self.gamma =0.99
        self.max_ep = 1.0
        self.minimum_ep = 0.01
        self.decay_ep =0.999
        self.target_up = 1
        self.step_count = 0
        torch.manual_seed(seed)
        random.seed(seed)

    def do_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        self.q_network.eval()
        with torch.no_grad():
            action_values = self.q_network(state)
        self.q_network.train()
        if random.random() < self.max_ep:
            return random.randint(0, 3)
        return np.argmax(action_values.cpu().data.numpy())

    def step(self, state, action, reward, next_state, done):
        self.replay_mem.append((state, action, reward, next_state, done))
        self.step_count += 1
        if len(self.replay_mem) >= self.bSize and self.step_count % self.target_up == 0:
            self.train()

    def train(self):
        experiences = random.sample(self.replay_mem, self.bSize)
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)

        next_actions = self.q_network(next_states).max(1)[1].unsqueeze(1)
        next_q_values = self.target_network(next_states).gather(1, next_actions).squeeze(1).detach()

        targets = rewards + (self.gamma * next_q_values * (1 - dones))

        loss = nn.MSELoss()(q_values, targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        for target_param, local_param in zip(self.target_network.parameters(), self.q_network.parameters()):
            target_param.data.copy_(local_param.data)
        self.max_ep = max(self.minimum_ep, self.max_ep * self.decay_ep)


    def save_model(self, path):
        torch.save(self.q_network.state_dict(), path)
        print(f"Saved model to {path}")


## Training and Testing Code

In [None]:
def train_dqn(episodes=5000, load_paths=None, save_interval=2000):
    if load_paths and len(load_paths) == 2:
        dqn_agents = [DDQNAgent(seed= i, load_path=load_paths[i]) 
                  for i in range(2)]
    else:
        dqn_agents =[DDQNAgent( seed=i) for i in range(2)]
    scores_window =deque(maxlen=100)
    final_scores =[]
    # Main train loop
    for episode in range(episodes):
        env_info = env.reset(train_mode=True)[brain_name]
        states = env_info.vector_observations
        scores = np.zeros(2)
        for step in range(1000):
            actions_idx = [agent.do_action(states[i]) for i, agent in enumerate(dqn_agents)]
            actions = np.array([discrete_actions[idx] for idx in actions_idx])
            
            env_info = env.step(actions)[brain_name]
            next_states = env_info.vector_observations
            rewards = env_info.rewards
            dones = env_info.local_done
            
            for i, agent in enumerate(dqn_agents):
                agent.step(states[i], actions_idx[i],rewards[i], next_states[i],dones[i])
            states = next_states
            scores += rewards
            if np.any(dones):
                break
        avg_score =np.mean(scores)
        scores_window.append(avg_score)
        final_scores.append(avg_score)
        print(f"Episode {episode+1}/{episodes}, Avg Score: {avg_score}, Window Avg: {np.mean(scores_window)}")
        if (episode+ 1) %save_interval==0:
            for i, agent in enumerate(dqn_agents):
                agent.save_model(f"ddqn_agent_{i}_ep{episode +1}.pth")
        
        if episode>= 100 and np.mean(scores_window)>=0.5: #Environment Solved condition
            print(f"Environment solved. Episodes: {episodes+1} | Avg Score: {np.mean(scores_window):.3f}")
            for i, agent in enumerate(dqn_agents):
                agent.save_model(f"ddqn_agent_{i}_solved.pth")
            break
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(final_scores)+1), final_scores, label='Average Reward')
    plt.xlabel("Episode")
    plt.ylabel("Average Reward")
    plt.title("Training Reward per Episode")
    plt.legend()
    plt.show()
    return dqn_agents, final_scores

def test_dqn(agents):
    env_info =env.reset(train_mode=False)[brain_name]
    states= env_info.vector_observations
    scores =np.zeros(2)
    for k in range(1000):
        actions_idx = [agent.do_action(states[i]) for i,agent in enumerate(agents)]
        actions = np.array([discrete_actions[idx] for idx in actions_idx])
        env_info = env.step(actions)[brain_name]
        states =env_info.vector_observations
        scores+= env_info.rewards
        if np.any(env_info.local_done):
            break
    print(f"Test Avg Score: {np.mean(scores):.3f}")


## Training

In [None]:
# Start fresh training
agents, scores = train_dqn(episodes=10000)
    
# load_paths = ["dqn_agent_0_ep160000.pth","dqn_agent_1_ep160000.pth"]
# agents, scores = train_dqn(episodes=1000, load_paths=load_paths)

## Testing

In [None]:
test_dqn(agents)
env.close()