In [None]:
import torch
import gymnasium as gym
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from collections import deque
import random
import csv

class DQN(torch.nn.Module):
    def __init__(self, state_size=8, action_size=4, hidden_size=64):
        super(DQN, self).__init__()
        self.layer1 = torch.nn.Linear(state_size, hidden_size)
        self.layer2 = torch.nn.Linear(hidden_size, hidden_size)
        self.layer3 = torch.nn.Linear(hidden_size, action_size)

    def forward(self, state: torch.Tensor) -> torch.Tensor:
        x = torch.relu(self.layer1(state))
        x = torch.relu(self.layer2(x))
        return self.layer3(x)



In [None]:
class ReplayBuffer:
    def __init__(self, max_size: int = 50_000):
        self.memory = deque(maxlen=max_size)
        self.max_size = max_size

    def store_transition(self, state, action, reward, next_state, done):
        transition = (state, action, reward, next_state, done)
        self.memory.append(transition)

    def get_batch(self, batch_size: int):
        if batch_size > len(self.memory):
            raise ValueError(f"Not enough samples in memory: {len(self.memory)}")
            
        batch = random.sample(self.memory, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        return (
            np.array(states),
            np.array(actions),
            np.array(rewards),
            np.array(next_states),
            np.array(dones)
        )

    def __len__(self):
        return len(self.memory)

    @property
    def is_ready(self):
        return len(self.memory) > 0



In [None]:
class DQNAgent:
    def __init__(self, state_size=8, action_size=4, hidden_size=64, 
                 learning_rate=1e-3, gamma=0.99, buffer_size=50000, batch_size=64):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.action_size = action_size
        self.gamma = gamma
        self.batch_size = batch_size
        
        self.q_network = self._init_network(state_size, action_size, hidden_size)
        self.target_network = self._init_network(state_size, action_size, hidden_size)
        self._sync_target_network()
        
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.memory = ReplayBuffer(buffer_size)

    def _init_network(self, state_size: int, action_size: int, hidden_size: int) -> torch.nn.Module:
        return DQN(state_size, action_size, hidden_size).to(self.device)

    def _sync_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.target_network.eval()

    def act(self, state: np.ndarray, epsilon: float = 0.0) -> int:
        if random.random() > epsilon:
            return self._get_best_action(state)
        return random.randrange(self.action_size)

    def _get_best_action(self, state: np.ndarray) -> int:
        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)
        
        self.q_network.eval()
        with torch.no_grad():
            q_values = self.q_network(state)
        self.q_network.train()
        
        return np.argmax(q_values.cpu().numpy())

    def step(self, state: np.ndarray, action: int, reward: float, next_state: np.ndarray, done: bool):
        self.memory.store_transition(state, action, reward, next_state, done)
        
        if len(self.memory) > self.batch_size:
            self._learn()

    def _learn(self):
        batch = self._get_batch()
        states, actions, rewards, next_states, dones = batch
        
        current_q = self.q_network(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
        
        with torch.no_grad():
            next_q = self.target_network(next_states).max(1)[0]
            target_q = rewards + self.gamma * next_q * (1 - dones)
        
        loss = torch.nn.functional.mse_loss(current_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def _get_batch(self):
        states, actions, rewards, next_states, dones = self.memory.get_batch(self.batch_size)
        
        return (
            torch.from_numpy(states).float().to(self.device),
            torch.tensor(actions).long().to(self.device),
            torch.tensor(rewards).float().to(self.device),
            torch.from_numpy(next_states).float().to(self.device),
            torch.tensor(dones).float().to(self.device)
        )

    def save(self, path):
        torch.save(self.q_network.state_dict(), path)
    
    def load(self, path):
        self.q_network.load_state_dict(torch.load(path))
        self.q_network.to(self.device)



In [None]:
def evaluate(agent, env, num_episodes=100, max_steps=1000):
    all_rewards = []
    all_steps = []
    
    with open('dqn_evaluation_results.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Episode', 'Reward', 'Steps'])
    
    for i_episode in range(num_episodes):
        state, _ = env.reset()
        episode_reward = 0
        
        for step in range(max_steps):
            action = agent.act(state, epsilon=0.0)
            next_state, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            
            if terminated or truncated:
                break
            state = next_state
        
        all_rewards.append(episode_reward)
        all_steps.append(step + 1)
        
        with open('dqn_evaluation_results.csv', 'a', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow([i_episode + 1, episode_reward, step + 1])
    
    avg_reward = np.mean(all_rewards)
    avg_steps = np.mean(all_steps)
    std_reward = np.std(all_rewards)
    
    print(f"\nEvaluation over {num_episodes} episodes:")
    print(f"Average Reward: {avg_reward:.2f} ± {std_reward:.2f}")
    print(f"Average Episode Length: {avg_steps:.2f}")
    
    return avg_reward, avg_steps



In [None]:
def train():
    with open('dqn_training_rewards.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Episode', 'Reward', 'Mean_100', 'Epsilon'])

    env = gym.make('LunarLander-v3')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    
    agent = DQNAgent(
        state_size=state_size,
        action_size=action_size,
        hidden_size=64,
        learning_rate=1e-3
    )
    
    num_episodes = 3000
    target_update = 10
    eps_start = 1.0
    eps_end = 0.01
    eps_decay = 0.995
    
    epsilon = eps_start
    recent_rewards = deque(maxlen=100)
    best_mean_reward = float('-inf')
    
    for episode in range(num_episodes):
        state, _ = env.reset()
        total_reward = 0
        
        while True:
            action = agent.act(state, epsilon)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            agent.step(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            
            if done:
                break
        
        epsilon = max(eps_end, epsilon * eps_decay)
        
        if episode % target_update == 0:
            agent._sync_target_network()
        
        recent_rewards.append(total_reward)
        mean_reward = np.mean(list(recent_rewards))
        
        with open('dqn_training_rewards.csv', 'a', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow([episode + 1, total_reward, mean_reward, epsilon])
        
        if mean_reward > best_mean_reward and len(recent_rewards) == 100:
            best_mean_reward = mean_reward
            agent.save('best_dqn_model.pt')
            print(f"\nNew best model saved with mean reward: {best_mean_reward:.2f}")
        
        if (episode + 1) % 10 == 0:
            print(f"Episode {episode + 1}/{num_episodes}, Reward: {total_reward:.2f}, Mean (100): {mean_reward:.2f}, Epsilon: {epsilon:.3f}")
    
    agent.save('final_dqn_model.pt')
    print("\nTraining completed. Final model saved.")
    
    print("\nEvaluating best model...")
    agent.load('best_dqn_model.pt')
    evaluate(agent, env)
    
    env.close()
    return agent

In [None]:
if __name__ == "__main__":
    train()