In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import random
import gym
from collections import deque
import torch.nn.functional as F
import cv2

In [2]:
# Define the Q-Network
class QNetwork(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_dims = (32,32), activation =nn.ReLU):
        super(QNetwork,self).__init__()
        
        self.input_layer = nn.Linear(input_dim,hidden_dims[0])
        
        self.hidden_layer = nn.ModuleList()
        for i in range(len(hidden_dims)-1):
            self.hidden_layer.append(nn.Linear(hidden_dims[i],hidden_dims[i+1]))
            self.hidden_layer.append(activation())
        
        self.hidden_layer = nn.Sequential(*self.hidden_layer)
        self.output_layer = nn.Linear(hidden_dims[-1], output_dim)
    
    def forward(self, state):
        x= state
        x = self.input_layer(x)
        x = self.hidden_layer(x)
        x= self.output_layer(x)
        return x

In [3]:

# Define Experience Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)


In [4]:

# Define the DQN Agent
class DQNAgent:
    def __init__(self, state_dim, action_dim, gamma=0.99, lr=1e-3, batch_size=64, buffer_size=10000, epsilon=1.0, epsilon_decay=0.995, epsilon_min=0.1):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.epsilon_min = epsilon_min
        
        self.q_network = QNetwork(state_dim, action_dim)
        self.target_network = QNetwork(state_dim, action_dim)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        
        self.memory = ReplayBuffer(buffer_size)
    
    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.action_dim - 1)
        else:
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            with torch.no_grad():
                return torch.argmax(self.q_network(state_tensor)).item()
    
    def train_step(self):
        if len(self.memory) < self.batch_size:
            return
        
        batch = self.memory.sample(self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions).unsqueeze(1)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1)
        
        q_values = self.q_network(states).gather(1, actions)
        with torch.no_grad():
            next_q_values = self.target_network(next_states).max(1, keepdim=True)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values
        
        loss = nn.MSELoss()(q_values, target_q_values)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def decay_epsilon(self):
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

In [7]:
# Train the agent
env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = DQNAgent(state_dim, action_dim)

episodes = 10
update_target_every = 10
rewards_history = []
frames = []

for episode in range(episodes):
    state = env.reset()[0]
    total_reward = 0
    done = False
    
    while not done:
        action = agent.select_action(state)
        next_state, reward, done, _, _ = env.step(action)
        agent.memory.push(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        agent.train_step()
        
        # frame = env.render()
        # frames.append(frame)
    
    rewards_history.append(total_reward)
    agent.decay_epsilon()
    if episode % update_target_every == 0:
        agent.update_target_network()
    
    print(f"Episode {episode}: Total Reward = {total_reward}")
print("done")
env.close()
print(rewards_history)
# Plot rewards
# plt.plot(rewards_history)
# print('really??')
# plt.xlabel("Episode")
# plt.ylabel("Total Reward")
# plt.title("DQN Training Progress")
# plt.show()


Episode 0: Total Reward = 13.0
Episode 1: Total Reward = 27.0
Episode 2: Total Reward = 14.0
Episode 3: Total Reward = 14.0
Episode 4: Total Reward = 16.0
Episode 5: Total Reward = 14.0
Episode 6: Total Reward = 15.0
Episode 7: Total Reward = 14.0
Episode 8: Total Reward = 23.0
Episode 9: Total Reward = 15.0
done
[13.0, 27.0, 14.0, 14.0, 16.0, 14.0, 15.0, 14.0, 23.0, 15.0]


In [8]:
plt.plot(rewards_history)

[<matplotlib.lines.Line2D at 0x1b31fdf5820>]

: 

In [6]:

# # Save video of agent performance
# height, width, layers = frames[0].shape
# video = cv2.VideoWriter('dqn_training.avi', cv2.VideoWriter_fourcc(*'XVID'), 30, (width, height))

# for frame in frames:
#     video.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))

# video.release()
# print("Training video saved as dqn_training.avi")