In [2]:
state_dim = 2
action_dim = 1

In [3]:



from collections import deque
import random

class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = deque(maxlen=capacity)
        self.memory = np.zeros((capacity, state_dim + action_dim + 1 + state_dim), dtype=np.float32)
        self.position = 0

    def add(self, state, action, reward, next_state, done):
        transition = (state, action, reward, next_state, done)
        self.buffer.append(transition)
        self.memory[self.position] = np.concatenate((state, [action, reward], next_state))
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        indices = random.sample(range(len(self.buffer)), batch_size)
        states, actions, rewards, next_states, dones = [], [], [], [], []
        for index in indices:
            state, action, reward, next_state, done = self.buffer[index]
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(next_state)
            dones.append(done)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)






In this implementation, the replay buffer is initialized with a capacity, and the memory is allocated with zeros to store the transitions in the form of (state, action, reward, next_state, done). Whenever a new transition is added to the replay buffer, it is appended to the buffer and the corresponding entry in the memory is updated. When the replay buffer is full, the new entries overwrite the oldest ones, creating a circular buffer.

The sample method is used to retrieve a batch of transitions from the replay buffer. It randomly selects batch_size transitions and returns the states, actions, rewards, next_states, and dones in separate lists.

Note that in this implementation, the state_dim and action_dim variables are assumed to be defined elsewhere. Also, the np module from NumPy is assumed to be imported.


In [4]:

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import gym

# Define the Q-Network
class QNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim):
        super(QNetwork, self).__init__()
        self.linear1 = nn.Linear(state_dim, hidden_dim)
        self.linear2 = nn.Linear(hidden_dim, hidden_dim)
        self.linear3 = nn.Linear(hidden_dim, action_dim)

    def forward(self, state):
        x = torch.relu(self.linear1(state))
        x = torch.relu(self.linear2(x))
        x = self.linear3(x)
        return x

# Define the DQN Agent
class DQNAgent:
    def __init__(self, env, hidden_dim, lr, gamma, epsilon):
        self.env = env
        self.q_net = QNetwork(env.observation_space.shape[0], env.action_space.n, hidden_dim)
        self.target_q_net = QNetwork(env.observation_space.shape[0], env.action_space.n, hidden_dim)
        self.target_q_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
        self.gamma = gamma
        self.epsilon = epsilon

    def act(self, state):
        if np.random.uniform() < self.epsilon:
            return self.env.action_space.sample()
        else:
            with torch.no_grad():
                q_values = self.q_net(torch.FloatTensor(state))
                return q_values.argmax().item()

    def update(self, batch_size):
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)

        with torch.no_grad():
            target_q_values = self.target_q_net(torch.FloatTensor(next_states)).max(dim=1, keepdim=True)[0]
            target_q_values = rewards + self.gamma * target_q_values * (1 - dones)

        q_values = self.q_net(torch.FloatTensor(states)).gather(1, torch.LongTensor(actions))

        loss = nn.functional.mse_loss(q_values, target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, num_episodes, batch_size):
        for episode in range(num_episodes):
            state = self.env.reset()
            done = False
            while not done:
                action = self.act(state)
                next_state, reward, done, info = self.env.step(action)
                self.replay_buffer.add(state, action, reward, next_state, done)
                state = next_state
                if len(self.replay_buffer) >= batch_size:
                    self.update(batch_size)
            if episode % 10 == 0:
                self.target_q_net.load_state_dict(self.q_net.state_dict())



This code defines a Q-network and a DQN agent, and includes the main training loop for the agent. It assumes that the OpenAI Gym environment is already defined and initialized.

Note that this code assumes the existence of a replay buffer class that contains the replay buffer data and sampling methods. The implementation of the replay buffer is omitted for brevity.

Also, note that this is a relatively basic implementation of a DQN algorithm and may not be optimal for all problems.