<a href="https://colab.research.google.com/github/OneFineStarstuff/State-of-the-Art/blob/main/DQN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import random
import numpy as np
from collections import deque

# Define the neural network
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay memory to store transitions
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        state, action, reward, next_state, done = transition
        if isinstance(state, np.ndarray) and state.shape == (4,) and \
           isinstance(next_state, np.ndarray) and next_state.shape == (4,):
            self.memory.append(transition)
        else:
            print(f"Invalid transition: {transition}")

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Initialize environment and model
env = gym.make('CartPole-v1', new_step_api=True)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
model = DQN(state_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
memory = ReplayMemory(10000)

# Hyperparameters
batch_size = 64
gamma = 0.99
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 500

def epsilon_by_frame(frame_idx):
    return epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * frame_idx / epsilon_decay)

# Training loop
num_episodes = 500
for episode in range(num_episodes):
    state = env.reset()  # `env.reset()` returns a single value
    state = np.array(state)  # Ensure state is a NumPy array
    total_reward = 0
    for t in range(200):
        epsilon = epsilon_by_frame(t)
        if random.random() > epsilon:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            print(f'state_tensor shape: {state_tensor.shape}')  # Debugging print
            action = model(state_tensor).max(1)[1].item()
        else:
            action = env.action_space.sample()

        next_state, reward, done, truncated, _ = env.step(action)
        next_state = np.array(next_state)  # Ensure next_state is a NumPy array
        memory.push((state, action, reward, next_state, done or truncated))

        state = next_state
        total_reward += reward

        if done or truncated:
            break

        if len(memory) > batch_size:
            transitions = memory.sample(batch_size)
            batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)

            # Debugging print
            print(f'batch_state shapes: {[np.array(state).shape for state in batch_state]}')

            batch_state = torch.FloatTensor(np.array(batch_state))
            batch_action = torch.LongTensor(np.array(batch_action)).unsqueeze(1)
            batch_reward = torch.FloatTensor(np.array(batch_reward))
            batch_next_state = torch.FloatTensor(np.array(batch_next_state))
            batch_done = torch.FloatTensor(np.array(batch_done))

            current_q_values = model(batch_state).gather(1, batch_action)
            max_next_q_values = model(batch_next_state).max(1)[0]
            expected_q_values = batch_reward + (gamma * max_next_q_values * (1 - batch_done))

            loss = criterion(current_q_values, expected_q_values.unsqueeze(1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    print(f"Episode {episode + 1}, Total Reward: {total_reward}")

# Evaluation loop
model.eval()
state = env.reset()  # `env.reset()` returns a single value
state = np.array(state)  # Ensure state is a NumPy array
total_reward = 0
for t in range(200):
    state_tensor = torch.FloatTensor(state).unsqueeze(0)
    print(f'state_tensor shape: {state_tensor.shape}')  # Debugging print during evaluation
    action = model(state_tensor).max(1)[1].item()
    next_state, reward, done, truncated, _ = env.step(action)
    next_state = np.array(next_state)  # Ensure next_state is a NumPy array
    state = next_state
    total_reward += reward
    if done or truncated:
        break
print(f"Total Reward (Evaluation): {total_reward}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import gym
import random
import numpy as np
from collections import deque

# Define the neural network
class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

# Replay memory to store transitions
class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Initialize environment and model
env = gym.make('CartPole-v1')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
model = DQN(state_dim, action_dim)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
memory = ReplayMemory(10000)

# Hyperparameters
batch_size = 64
gamma = 0.99
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 500

def epsilon_by_frame(frame_idx):
    return epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * frame_idx / epsilon_decay)

# Training loop
num_episodes = 500
for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    for t in range(200):
        epsilon = epsilon_by_frame(t)
        if random.random() > epsilon:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            action = model(state_tensor).max(1)[1].item()
        else:
            action = env.action_space.sample()

        next_state, reward, done, _ = env.step(action)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        if done:
            break

        if len(memory) > batch_size:
            # Sample a batch of transitions from the replay memory
            transitions = memory.sample(batch_size)
            batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(*transitions)

            # Convert to PyTorch tensors
            batch_state = torch.FloatTensor(batch_state)
            batch_action = torch.LongTensor(batch_action).unsqueeze(1)
            batch_reward = torch.FloatTensor(batch_reward).unsqueeze(1)
            batch_next_state = torch.FloatTensor(batch_next_state)
            batch_done = torch.FloatTensor(batch_done).unsqueeze(1)

            # Compute the Q values for the current state-action pairs
            q_values = model(batch_state).gather(1, batch_action)

            # Compute the target Q values
            next_q_values = model(batch_next_state).max(1)[0].detach().unsqueeze(1)
            target_q_values = batch_reward + (gamma * next_q_values * (1 - batch_done))

            # Compute the loss
            loss = criterion(q_values, target_q_values)

            # Optimize the model
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    print(f"Episode {episode + 1}, Total Reward: {total_reward}, Loss: {loss.item():.4f}")

    # If the total reward exceeds a threshold, stop training
    if total_reward > 195:
        print(f"Solved in {episode + 1} episodes!")
        break

# Test the trained model
state = env.reset()
total_reward = 0
for t in range(200):
    env.render()
    state_tensor = torch.FloatTensor(state).unsqueeze(0)
    action = model(state_tensor).max(1)[1].item()
    next_state, reward, done, _ = env.step(action)
    total_reward += reward
    state = next_state
    if done:
        break

print(f"Test Reward: {total_reward}")
env.close()