In [32]:
import gymnasium as gym
import numpy as np
import random
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque

In [16]:
!pip install swig
!pip install gymnasium[box2d]

Collecting swig
  Using cached swig-4.4.0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl.metadata (3.5 kB)
Using cached swig-4.4.0-py3-none-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.9 MB)
Installing collected packages: swig
Successfully installed swig-4.4.0
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Using cached box2d-py-2.3.5.tar.gz (374 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp312-cp312-linux_x86_64.whl size=2398996 sha256=f82d78825876486408bb742e323e35e5ace465da57b9eeae04b86516fcb8d068
  Stored in directory: /root/.cache/pip/wheels/2a/e9/60/774da0bcd07f7dc7761a8590fa2d065e4069568e78dcdc3318
Successfully built box2d-py
Installing collected packages: box2d-py
Successfully installed box2d-py-2.3.5


In [40]:
NUM_EPISODES = 500
BATCH_SIZE = 128
GAMMA = 0.99
EPSILON_START = 1.0
EPSILON_END = 0.01
EPSILON_DECAY = 0.995
TARGET_UPDATE = 10

In [41]:
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [42]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)
    

In [43]:
class DQNAgent:
    def __init__(self, action_space, observation_space):
        self.action_space = action_space
        self.policy = DQN(observation_space, action_space)
        self.target = DQN(observation_space, action_space)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-3)
        self.memory = ReplayBuffer(10000)
        self.steps_done = 0

    def select_action(self, state, epsilon):
        if np.random.rand() < epsilon:
            return torch.tensor([[np.random.randint(self.action_space)]], dtype=torch.long)
        else:
            with torch.no_grad():
                state = torch.as_tensor(state, dtype=torch.float32).unsqueeze(0)
                q_values = self.policy(state)
                return q_values.max(1)[1].view(1, 1)
    
    def optimize_model(self, batch_size, gamma):
        if len(self.memory) < batch_size:
            return
        transitions = self.memory.sample(batch_size)
        batch = list(zip(*transitions))

        state_batch = torch.cat(batch[0])
        action_batch = torch.cat(batch[1])
        reward_batch = torch.cat(batch[2])
        next_state_batch = torch.cat(batch[3])

        current_q_values = self.policy(state_batch).gather(1, action_batch)
        next_q_values = self.target(next_state_batch).max(1)[0].detach()
        expected_q_values = reward_batch + (gamma * next_q_values)
        loss = F.mse_loss(current_q_values.squeeze(), expected_q_values)
        self.optimizer.step()


    def update_target(self):
        self.target.load_state_dict(self.policy.state_dict())
            
        

In [44]:
def train():
    env = gym.make('LunarLander-v3')
    agent = DQNAgent(env.action_space.n, env.observation_space.shape[0])

    episode_durations = []
    episode_rewards = []
    epsilon = 1.0

    for episode in range(NUM_EPISODES):
        state, _ = env.reset()
        state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        total_reward = 0
        for t in range(1000):
            action = agent.select_action(state, epsilon)
            next_state, reward, done, trunc, _ = env.step(action.item())
            total_reward += reward
            reward = torch.tensor([reward], dtype=torch.float32)
            done = done or trunc

            next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
            agent.memory.push((state, action, reward, next_state))
            state = next_state
            agent.optimize_model(BATCH_SIZE, GAMMA)
            if done:
                episode_durations.append(t + 1)
                episode_rewards.append(total_reward)
                break
            epsilon = max(EPSILON_END, EPSILON_START * (EPSILON_DECAY ** episode))
        if episode % TARGET_UPDATE == 0:
            agent.update_target()
        if episode % 100 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward}, AVG Reward: {np.mean(episode_rewards[-100:]):.2f} Epsilon: {epsilon:.2f}")

    env.close()
    return agent, episode_rewards
        

In [45]:
agent, rewards = train()

plt.plot(rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN Training Performance')
plt.show()

Episode 0, Total Reward: -131.10576728486748, AVG Reward: -131.11 Epsilon: 1.00


RuntimeError: shape '[1, 1]' is invalid for input of size 4