In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
from collections import namedtuple, deque
import gymnasium as gym


In [2]:
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, seed, fc1_units=64, fc2_units=64):
        super(QNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.fc1 = nn.Linear(state_size, fc1_units)
        self.fc2 = nn.Linear(fc1_units, fc2_units)
        self.fc3 = nn.Linear(fc2_units, action_size)
        
    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        return self.fc3(x)
    
class ReplayBuffer:
    def __init__(self, action_size, buffer_size, batch_size, seed):
        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)
        
    def add(self, state, action, reward, next_state, done):
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)
        
    def sample(self):
        experiences = random.sample(self.memory, k=self.batch_size)
        
        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float()
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long()
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float()
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float()
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None])).astype(np.uint8).float()

        return (states, actions, rewards, next_states, dones)
    
    def __len__(self):
        return len(self.memory)
    

In [3]:
class DQNAgent:
    def __init__(self, state_size, action_size, seed, lr):
        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)
        
        self.qnetwork_local = QNetwork(state_size, action_size, seed)
        self.qnetwork_target = QNetwork(state_size, action_size, seed)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr)
        
        self.memory = ReplayBuffer(action_size, buffer_size=int(1e5), batch_size=64, seed = seed)
        self.t_step = 0
    
    def step(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)

        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0:
            if len(self.memory) > 64:
                experiences = self.memory.sample()
                self.learn(experiences, gamma=0.99)
    
    def act(self, state, eps=0.):
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        print(state_tensor.type())
        self.qnetwork_local.eval()
        with torch.no_grad():
            action_values = self.qnetwork_local(state_tensor)
        self.qnetwork_local.train()

        if np.random.random() > eps:
            return action_values.argmax(dim=1).item()
        else:
            return np.random.randint(self.action_size)
    
    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = zip(*experiences)
        states = torch.from_numpy(np.vstack(states)).float()
        actions = torch.from_numpy(np.vstack(actions)).long()
        rewards = torch.from_numpy(np.vstack(rewards)).float()
        next_states = torch.from_numpy(np.vstack(next_states)).float()
        dones = torch.from_numpy(np.vstack(dones).astype(np.uint8)).float()

        Q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        Q_targets = rewards + (gamma * Q_targets_next * (1 - dones))

        Q_expected = self.qnetwork_local(states).gather(1, actions)

        loss = F.mse_loss(Q_expected, Q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.soft_update(self.qnetwork_local, self.qnetwork_target, tau=1e-3)

    def soft_update(self, local_model, target_model, tau):
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau * local_param.data + (1.0 - tau) * target_param.data)

In [4]:
env = gym.make("CartPole-v1")

# Define training parameters
num_episodes = 250
max_steps_per_episode = 200
epsilon_start = 1.0
epsilon_end = 0.2
epsilon_decay_rate = 0.99
gamma = 0.9
lr = 0.0025
buffer_size = 10000
buffer = deque(maxlen=buffer_size)
batch_size = 128
update_frequency = 10


# Initialize the DQNAgent
input_dim = env.observation_space.shape[0]
output_dim = env.action_space.n
new_agent = DQNAgent(input_dim, output_dim, seed=170715, lr = lr)

In [5]:
for episode in range(num_episodes):
    # Reset the environment
    state = env.reset()
    epsilon = max(epsilon_end, epsilon_start * (epsilon_decay_rate ** episode))

    # Run one episode
    for step in range(max_steps_per_episode):
        print(state)
        # Choose and perform an action
        state = state[0]
        state = np.array(state)
        print(state)
        action = new_agent.act(state, epsilon)
        print(env.step(action))
        next_state, reward, done, _ = env.step(action)[0]
        
        buffer.append((state, action, reward, next_state, done))
        
        if len(buffer) >= batch_size:
            batch = random.sample(buffer, batch_size)
            # Update the agent's knowledge
            new_agent.learn(batch, gamma)
        
        state = next_state
        
        # Check if the episode has ended
        if done:
            break
    
    if (episode + 1) % update_frequency == 0:
        print(f"Episode {episode + 1}: Finished training")


(array([-0.00111257, -0.01993169, -0.01443949, -0.00084542], dtype=float32), {})
[-0.00111257 -0.01993169 -0.01443949 -0.00084542]
torch.FloatTensor
(array([-0.0015112 , -0.21484362, -0.0144564 ,  0.2872469 ], dtype=float32), 1.0, False, False, {})
(array([ 0.00504925, -0.03751699,  0.03893349, -0.01428052], dtype=float32), {})
[ 0.00504925 -0.03751699  0.03893349 -0.01428052]
torch.FloatTensor
(array([ 0.00429891, -0.23317504,  0.03864788,  0.2904277 ], dtype=float32), 1.0, False, False, {})
(array([-0.00493403, -0.04527399,  0.01826592, -0.0194621 ], dtype=float32), {})
[-0.00493403 -0.04527399  0.01826592 -0.0194621 ]
torch.FloatTensor
(array([-0.00583951, -0.24065307,  0.01787668,  0.2789275 ], dtype=float32), 1.0, False, False, {})
(array([-0.04212321,  0.02118369, -0.03138705,  0.03360751], dtype=float32), {})
[-0.04212321  0.02118369 -0.03138705  0.03360751]
torch.FloatTensor
(array([-0.04169954, -0.17347443, -0.0307149 ,  0.3162246 ], dtype=float32), 1.0, False, False, {})
(arr

  if not isinstance(terminated, (bool, np.bool8)):


RuntimeError: mat1 and mat2 shapes cannot be multiplied (128x1 and 4x64)