In [1]:
import os
import gym
from tqdm import trange

import matplotlib.pyplot as plt
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.tensorboard import SummaryWriter

open_tensorboard = True

In [2]:
class network(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.lin = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim),
        )

    def forward(self, state):
        return self.lin(state)

In [3]:
class ReplayBuffer():
    def __init__(self, states_size: int, buffer_size: int, batch_size: int = 32):

        self.states_buffer = np.zeros(
            (buffer_size, states_size), dtype=np.float32)
        self.next_states_buffer = np.zeros(
            (buffer_size, states_size), dtype=np.float32)
        self.actions_buffer = np.zeros((buffer_size, ), dtype=np.float32)
        self.rewards_buffer = np.zeros((buffer_size, ), dtype=np.float32)
        self.done_buffer = np.zeros((buffer_size, ), dtype=np.float32)

        self.max_buffer_size = buffer_size
        self.batch_size = batch_size
        self.current_size = 0
        self.ptr = 0

    def save(
        self,
        state: np.ndarray,
        action: np.ndarray,
        next_state: np.ndarray,
        reward: float,
        done: bool
    ):
        self.states_buffer[self.ptr] = state
        self.next_states_buffer[self.ptr] = next_state
        self.actions_buffer[self.ptr] = action
        self.rewards_buffer[self.ptr] = reward
        self.done_buffer[self.ptr] = done

        self.current_size = min(self.current_size + 1, self.max_buffer_size)
        self.ptr = (self.ptr + 1) % self.max_buffer_size

    def sample_batch(self):
        i = np.random.choice(
            self.current_size, size=self.batch_size, replace=False)
        return dict(states=self.states_buffer[i],
                next_states=self.next_states_buffer[i],
                actions=self.actions_buffer[i],
                rewards=self.rewards_buffer[i],
                done=self.done_buffer[i])

    def __len__(self):
        return self.current_size

In [4]:
class dqnAgent:
    def __init__(self,
                 env: gym.Env,
                 buffer_size: int,
                 batch_size: int,
                 target_update_period: int,
                 epsilon_decay: float,
                 max_epsilon: float = 1.0,
                 min_epsilon: float = 0.01,
                 gamma: float = 0.99
                 ):

        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n

        self.env = env
        self.buffer = ReplayBuffer(self.state_dim, buffer_size, batch_size)
        self.buffer_size = buffer_size
        self.eps = max_epsilon
        self.eps_decay = epsilon_decay
        self.max_eps = max_epsilon
        self.min_eps = min_epsilon
        self.target_update_period = target_update_period
        self.gamma = gamma

        self.device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        print(self.device)

        self.dqn_net = network(self.state_dim, self.action_dim).to(self.device)
        self.dqn_target_net = network(
            self.state_dim, self.action_dim).to(self.device)
        self.dqn_target_net.load_state_dict(self.dqn_net.state_dict())
        self.dqn_target_net.eval()

        self.optimizer = optim.Adam(self.dqn_net.parameters())
        self.transition = list()
        
        self.writer = SummaryWriter('result/dqn')

    def select_action(self, state: np.ndarray):

        if self.eps > np.random.random():
            selected_action = self.env.action_space.sample()
        else:
            selected_action = self.dqn_net(
                torch.FloatTensor(state).to(self.device)).argmax()
            selected_action = selected_action.detach().cpu().numpy()

        self.transition = [state, selected_action]
        return selected_action

    def step(self, action):

        next_state, reward, done, _ = self.env.step(action)
        self.transition += [next_state, reward, done]
        self.buffer.save(*self.transition)
        
        return next_state, reward, done

    def update(self):

        samples = self.buffer.sample_batch()
        loss = self.compute_loss(samples)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        
        return loss.item()

    def compute_loss(self, samples):

        states = torch.FloatTensor(samples["states"]).to(self.device)
        next_states = torch.FloatTensor(samples["next_states"]).to(self.device)
        actions = torch.LongTensor(
            samples["actions"].reshape(-1, 1)).to(self.device)
        rewards = torch.FloatTensor(samples["rewards"]).reshape(-1, 1).to(self.device)
        done=torch.FloatTensor(samples["done"]).reshape(-1, 1).to(self.device)

        # curr_q_values=self.dqn_net(states).gather(1, actions)
        curr_q_values = torch.gather(self.dqn_net(states), 1, actions)
        # use max()[0] bcs it returns required values and corresponding indices
        next_q_values=self.dqn_target_net(next_states).max(
            dim = 1, keepdim = True)[0].detach()

        mask=1 - done
        target=(rewards + self.gamma * next_q_values * mask).to(self.device)

        loss=F.smooth_l1_loss(curr_q_values, target)

        return loss
    
    def train(self, num_episode):
        
        state = self.env.reset()
        update_count = 0
        score = 0
        
        with trange(num_episode) as pbar:
            for i in pbar:

                action = self.select_action(state)
                next_state, reward, done = self.step(action)

                score += reward
                state = next_state
                
                

                if done:
                    state = self.env.reset()
                    self.writer.add_scalar("Scores", score, i)
                    score = 0

                if len(self.buffer) >= self.buffer_size:

                    loss = self.update()
                    self.writer.add_scalar("Losses", loss, i)
                    update_count += 1

                    curr_eps = self.eps - (self.max_eps - self.min_eps) * self.eps_decay
                    self.eps = max(self.min_eps, curr_eps)

                    if update_count % self.target_update_period == 0:
                        self.dqn_target_net.load_state_dict(self.dqn_net.state_dict())
                
        self.writer.flush()
        self.env.close()

In [5]:
env = "CartPole-v0"
env = gym.make(env)

In [6]:
EPISODES = 10000
BUFFER_SIZE = 500
BATCH_SIZE = 128
TARGET_UPDATE_PERIOD = 100
EPSILON_DECAY = 1 / 2000

In [7]:
agent = dqnAgent(env, BUFFER_SIZE, BATCH_SIZE, TARGET_UPDATE_PERIOD, EPSILON_DECAY)

cuda


In [8]:
agent.train(EPISODES)

100%|████████████████████████████████████| 10000/10000 [00:15<00:00, 633.04it/s]


In [None]:
if open_tensorboard :
    !tensorboard --logdir result/dqn

TensorFlow installation not found - running with reduced feature set.

NOTE: Using experimental fast data loading logic. To disable, pass
    "--load_fast=false" and report issues on GitHub. More details:
    https://github.com/tensorflow/tensorboard/issues/4784

Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.6.0 at http://localhost:6006/ (Press CTRL+C to quit)
