In [2]:
import gymnasium as gym
from collections import namedtuple, deque
import random

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np

# Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
buffer_limit  = 50000
batch_size    = 32

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [14]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'next_state'))

class ReplayMemory():
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
    def push(self, *args):
        self.memory.append(Transition(*args))
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def __len__(self):
        return len(self.memory)

In [15]:
# Test ReplayBuffer class
memory = ReplayMemory(5)

# Add 5 transitions to the buffer
for i in range(5):
    transition = ([i], i+1, i+2, [i+3])
    print(f"transition: {transition}")
    memory.push(*transition)

len(memory)

# Sample 3 transitions from the buffer
transitions = memory.sample(3)

# Check if the shape of the tensors is correct
for transition in transitions:
    print(f"state shape: {transition.state[0]}")
    print(f"action: {transition.action}")
    print(f"reward: {transition.reward}")
    print(f"next state shape: {transition.next_state[0]}")
    print("")

transition: ([0], 1, 2, [3])
transition: ([1], 2, 3, [4])
transition: ([2], 3, 4, [5])
transition: ([3], 4, 5, [6])
transition: ([4], 5, 6, [7])
state shape: 1
action: 2
reward: 3
next state shape: 4

state shape: 3
action: 4
reward: 5
next state shape: 6

state shape: 0
action: 1
reward: 2
next state shape: 3



In [24]:
class DRQN(nn.Module):
    def __init__(self, n_observations, n_actions, n_time_steps):
        super(DRQN, self).__init__()
        self.n_observations = n_observations
        self.n_actions = n_actions
        self.n_time_steps = n_time_steps
        self.hidden_size = 128
        self.rnn = nn.LSTM(self.n_observations, self.hidden_size, num_layers=2)
        self.fc = nn.Linear(self.hidden_size, self.n_actions)
        self.hidden = self.init_hidden()
        
    def init_hidden(self):
        # The axes semantics are (num_layers, minibatch_size, hidden_dim)
        return (torch.zeros(2, 1, 128),
                torch.zeros(2, 1, 128))
    def forward(self, x):
        x = x.view(1, 1, self.n_observations)
        x, self.hidden = self.rnn(x, self.hidden)
        x = self.fc(x.view(1, -1))
        return x.view(1, self.n_actions)
    
        # self.layer1 = nn.Linear(n_observations, round(n_observations/2))
        # self.layer2 = nn.Linear(round(n_observations/2), round(n_observations/2))
        # self.layer3 = nn.Linear(round(n_observations/2), n_actions)
    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    # def forward(self, x):
    #     x = F.relu(self.layer1(x))
    #     x = F.relu(self.layer2(x))
    #     return self.layer3(x)

In [34]:
drqn = DRQN(4, 2)
x_data = torch.tensor([[[1, 2, 3, 4],
                  [2, 3, 4, 5],
                  [3, 4, 5, 5],
                  [5, 6, 7, 8],
                  [6, 7, 8, 9]]], dtype=torch.float32)
x_data.shape

torch.Size([1, 5, 4])