<a href="https://colab.research.google.com/github/KillerX629/TPIA/blob/master/DeepQLearning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class DQN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_actions):
        super(DQN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_actions)

    def forward(self, x):
        # Set initial hidden and cell states
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)

        # Forward propagate LSTM
        out, _ = self.lstm(x, (h0, c0))  # out: tensor of shape (batch_size, seq_length, hidden_size)

        # Decode the hidden state of the last time step
        out = self.fc(out[:, -1, :])
        return out

# Initialize the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the DQN agent
input_size = 1  # replace with your input size
hidden_size = 64  # replace with your hidden size
num_layers = 2  # replace with your number of layers
num_actions = 4  # replace with your number of actions
agent = DQN(input_size, hidden_size, num_layers, num_actions).to(device)


In [2]:
import random
from collections import namedtuple

# Define a namedtuple for a single experience
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

class ReplayBuffer(object):
    # This is a simplified version of a replay buffer.
    # In a real-world scenario, you might want to add more functionality, such as prioritized experience replay.

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def optimize_model(agent, memory, optimizer, batch_size, gamma):
    if len(memory) < batch_size:
        return
    transitions = memory.sample(batch_size)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for detailed explanation).
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the columns of actions taken
    state_action_values = agent(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    next_state_values = torch.zeros(batch_size, device=device)
    next_state_values[non_final_mask] = agent(non_final_next_states).max(1)[0].detach()
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * gamma) + reward_batch

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in agent.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()


In [5]:
%pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.1


In [8]:
from itertools import count
import random
import math

# Initialize the number of steps done
steps_done = 0

# Set the epsilon start, end, and decay values
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200



def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return agent(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)

In [9]:
import gymnasium as gym
import torch.optim as optim
import os

# Initialize the environment
env = gym.make('CartPole-v1')

# Initialize the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the DQN agent
input_size = env.observation_space.shape[0]
hidden_size = 64
num_layers = 2
num_actions = env.action_space.n
agent = DQN(input_size, hidden_size, num_layers, num_actions).to(device)

# Initialize the optimizer
optimizer = optim.Adam(agent.parameters())

# Initialize the replay buffer
memory = ReplayBuffer(10000)

# Set the number of episodes
num_episodes = 500

# Load the model if it exists
if os.path.isfile('cartpole_dqn.pt'):
    agent.load_state_dict(torch.load('cartpole_dqn.pt'))

# Training loop
for i_episode in range(num_episodes):
    # Initialize the environment and get it's state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    for t in count():
        # Select and perform an action
        action = select_action(state)
        next_state, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)

        if terminated or truncated:
            next_state = None
        else:
            next_state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the target network)
        optimize_model(agent, memory, optimizer, 128, 0.999)

        if terminated or truncated:
            break

    # Save the model
    torch.save(agent.state_dict(), 'cartpole_dqn.pt')

    # Print the episode number and the number of timesteps
    print(f"Episode {i_episode + 1}/{num_episodes}, Timesteps: {t + 1}")

env.close()


Episode 1/500, Timesteps: 20
Episode 2/500, Timesteps: 30
Episode 3/500, Timesteps: 13
Episode 4/500, Timesteps: 13
Episode 5/500, Timesteps: 10
Episode 6/500, Timesteps: 14
Episode 7/500, Timesteps: 18
Episode 8/500, Timesteps: 9
Episode 9/500, Timesteps: 13
Episode 10/500, Timesteps: 10
Episode 11/500, Timesteps: 14
Episode 12/500, Timesteps: 13
Episode 13/500, Timesteps: 12
Episode 14/500, Timesteps: 10
Episode 15/500, Timesteps: 10
Episode 16/500, Timesteps: 14
Episode 17/500, Timesteps: 12
Episode 18/500, Timesteps: 13
Episode 19/500, Timesteps: 10
Episode 20/500, Timesteps: 9
Episode 21/500, Timesteps: 12
Episode 22/500, Timesteps: 8
Episode 23/500, Timesteps: 14
Episode 24/500, Timesteps: 10
Episode 25/500, Timesteps: 8
Episode 26/500, Timesteps: 22
Episode 27/500, Timesteps: 10
Episode 28/500, Timesteps: 11
Episode 29/500, Timesteps: 11
Episode 30/500, Timesteps: 10
Episode 31/500, Timesteps: 11
Episode 32/500, Timesteps: 9
Episode 33/500, Timesteps: 11
Episode 34/500, Timestep