# Implementation of Deep Q-Network

In [10]:
# Import Libraries
import random
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
from matplotlib import pyplot as plt

## Define Reply Buffer

In [15]:
class ReplayBuffer:
    def __init__(self, buffer_size, batch_size=10000):
        self.storage = deque(maxlen=buffer_size)
        self.batch_size = batch_size
    
    def push(self, params: tuple):
        self.storage.append(params)
    
    def extract_samples(self):
        experiences = random.sample(self.storage, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*experiences)
        
        return (torch.tensor(states, dtype=torch.float32),
                torch.tensor(actions, dtype=torch.float32),
                torch.tensor(rewards, dtype=torch.float32),
                torch.tensor(next_states, dtype=torch.float32),
                torch.tensor(dones, dtype = torch.float32)
                )
    
    def __len__(self):
        return len(self.storage)

## Define Deep Q-Network

In [16]:
class DeepQNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(DeepQNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
        
    def forward(self, state):
        x = self.fc1(state)
        x = torch.relu(x)
        x = self.fc2(x)
        x = torch.relu(x)
        
        return self.fc3(x)

## Define Deep Q-Learning

In [17]:
class DeepQLearning:
    def __init__(self, env, gamma=0.9, epsilon=0.9, epsilon_min=0.01, epsilon_decay=0.995, alpha=0.001, target_network_update_freq=5, buffer_size=1000, batch_size=64):
        self.env = env
        self.state_size = env.observation_space.n
        self.action_size = env.action_space.n
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.alpha = alpha
        self.target_network_update_freq = target_network_update_freq
        self.replay_buffer = ReplayBuffer(buffer_size)
        self.batch_size = batch_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.q_network = DeepQNetwork(self.state_size, self.action_size).to(self.device)
        self.target_q_network = DeepQNetwork(self.state_size, self.action_size).to(self.device)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=self.alpha)
        self.update_target_network()
        
    def epsilon_greedy_policy(self, state):
        if random.uniform(0, 1) < self.epsilon:
            return self.env.action_space.sample()
        else:
            state = torch.tensor([state], dtype=torch.float32).to(self.device)
            with torch.no_grad():
                return torch.argmax(self.q_network(state)).item()
        
    def update_target_network(self):
        self.target_q_network.load_state_dict(self.q_network.state_dict())
        
    def train_network(self):
        states, actions, rewards, next_states, dones = self.replay_buffer.extract_samples(self.batch_size)
        
        q_values = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_q_network(next_states).max(1)[0]
        expected_q_values = rewards + (self.gamma * next_q_values * (1 - dones))
        
        loss = nn.functional.mse_loss(q_values, expected_q_values.detach())
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
    def compute_policy(self, num_of_episodes, max_steps=1000):
        rewards_per_episode = []
        
        for e in range(num_of_episodes):
            state, _ = self.env.reset()
            episode_reward = 0
            
            for t in range(max_steps):
                action = self.epsilon_greedy_policy(state)
                
                # Take next step based on action
                next_state, reward, done, _, _ = self.env.step(action)
                episode_reward += reward
                
                # Save the state, action, reward, next_state to the replay buffer
                self.replay_buffer.push((state, action, reward, next_state, done))
                
                state = next_state
            
                if done:
                    break
                
                # Train the network if buffer size is greater than the batch size
                if len(self.replay_buffer) > self.batch_size:
                    self.train_network()
                
            self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
            rewards_per_episode.append(episode_reward)
            
            if e % self.target_network_update_freq == 0:
                self.update_target_network()
        
        return rewards_per_episode
    
    def execute_policy(self, max_steps=100):
        state, _ = self.env.reset()
        self.env.render()
        total_reward = 0
        
        for _ in range(max_steps):
            action = max(range(self.env.action_space.n), key=lambda x: self.q_network(torch.tensor([state], dtype=torch.float32).to(self.device))[x].item())
            next_state, reward, done, _, _ = self.env.step(action)
            total_reward += reward
            self.env.render()
            state = next_state
            
            if done:
                break
        
        print(f"Total reward: {total_reward}")
        self.env.close()

In [18]:
# Example usage
env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=True)
agent = DeepQLearning(env)
num_of_timesteps = 100
num_of_episodes = 1000

rewards = agent.compute_policy(num_of_episodes, num_of_timesteps)

# Plot the rewards per episode
plt.plot(rewards)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('DQN on FrozenLake')
plt.show()

# Execute the learned policy
agent.execute_policy()