# Drive mount - It's not mendatory

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## If you want unmount it, please follow below

In [None]:
from google.colab import drive
drive.flush_and_unmount()

# Define Environment and utils

In [None]:
import numpy as np

class TwoDimArrayMap:
    def __init__(self, x_dim, y_dim, action_space_dim=4):
        self.maze = np.zeros([x_dim, y_dim])
        self.success_reward = 0
        self.failed_reward = -1

        self.reward_states = np.full((x_dim, y_dim), self.failed_reward)    # make full of self.failed_reward matrix as x_dim * y_dim
        self.state = np.array([0, 0])

        self.observation_space_dim = x_dim * y_dim
        self.action_space_dim = action_space_dim

        self.row = len(self.maze)
        self.col = len(self.maze[0])

        self.goal = np.array([self.row - 1, 0])
        self.reward_states[self.goal[0]][self.goal[1]] = self.success_reward

    def SimpleMazation(self):   # simple maze having one large wall. Represent wall as 1. At the reward states, wall is -9
        for i in range(self.row):
            for j in range(self.col):
                if (self.row//3) <= i < (2 * self.row//3):
                    if j < self.col * (2/3):
                        self.maze[i][j] = 1
                        self.reward_states[i][j] = -9

    def reset(self):
        self.state = np.array([0, 0])
        return self.state

    def step(self, action):
        if action == 0 and self.state[1] < self.col-1:
            if self.maze[self.state[0]][self.state[1]+1] == 0:  # move to right
                self.state[1] += 1
        elif action == 1 and self.state[1] > 0:
            if self.maze[self.state[0]][self.state[1]-1] == 0:  # move to left
                self.state[1] -= 1
        elif action == 2 and self.state[0] < self.row-1:
            if self.maze[self.state[0]+1][self.state[1]] == 0:  # move to down
                self.state[0] += 1
        elif action == 3 and self.state[0] > 0:
            if self.maze[self.state[0]-1][self.state[1]] == 0:  # move to up
                self.state[0] -= 1

        if (self.state[0] == self.goal[0]) and (self.state[1] == self.goal[1]):
            reward = self.success_reward
            done = True
        else:
            reward = self.failed_reward
            done = False

        return self.state, reward, done

## Define ReplayMemory

In [None]:
from collections import deque, namedtuple
import random

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
        self.Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'mask'))

    def push(self, *args):
        """Saves a transition."""
        self.memory.append(self.Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

## Define utils

In [None]:
import numpy as np
import torch

class OneLineToCell:
    def __init__(self, x_dim, y_dim):
        self.maze = np.zeros([x_dim, y_dim])
        self.state = 0
        self.observation_space_dim = self.maze.size
        self.row = len(self.maze)
        self.col = len(self.maze[0])

    def FillGridByOneLineArray(self, array):
        for i in range(self.row):
            for j in range(self.col):
                self.maze[i][j] = array[i*self.row + j]
        return self.maze

## Define saveing train and test history as txt

In [None]:
import os
import datetime
import numpy as np

def save_history_txt(SAVE, device, path, Q_net, QnetToCell, env, i_episode, t):
    if i_episode % SAVE == 0:
        V_table, Action_table = QnetToCell.FillGridByQnet(Q_net, env, device)
        print(f"--------{i_episode} is saved with {t} steps. V_table and Action table")

        if not os.path.isdir(path+'/V_table_train') or not os.path.isdir(path+'/Action_table_train'):
            os.makedirs(path+'/V_table_train')
            os.makedirs(path+'/Action_table_train')

        now = datetime.datetime.now().strftime("%m-%d_%H:%M:%S")
        np.savetxt(f'{path}/V_table_train/V_table_{i_episode}_{now}.txt', V_table, fmt='%.3f')
        np.savetxt(f'{path}/Action_table_train/Action_table_{i_episode}_{now}.txt', Action_table, fmt='%d')

# Define Model


In [None]:
import torch.nn as nn

class QNET(nn.Module):

    def __init__(self, input_size, output_size):
        super(QNET, self).__init__()
        self.LReLU = nn.LeakyReLU(0.01)
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_size)
        self.reset_parameters()

    def reset_parameters(self):
        nn.init.xavier_uniform_(self.fc1.weight, gain=nn.init.calculate_gain('leaky_relu'))
        nn.init.xavier_uniform_(self.fc2.weight, gain=nn.init.calculate_gain('leaky_relu'))
        nn.init.xavier_uniform_(self.fc3.weight, gain=nn.init.calculate_gain('leaky_relu'))

    def forward(self, x):
        x = self.LReLU(self.fc1(x))
        x = self.LReLU(self.fc2(x))
        x = self.LReLU(self.fc3(x))
        return x

DQN optimizing code

In [None]:
import torch
import torch.nn as nn

def optimize_model_DQN(buffer, BATCH_SIZE, Q_net, target_Q_net, optimizer, GAMMA):
    if len(buffer) < BATCH_SIZE:
        return
    transitions = buffer.sample(BATCH_SIZE)

    batch = buffer.Transition(*zip(*transitions))

    state_batch = torch.stack(batch.state)
    action_batch = torch.stack(batch.action).unsqueeze(-1)
    reward_batch = torch.stack(batch.reward)
    non_final_next_states = torch.stack(batch.next_state)
    mask_batch = torch.stack(batch.mask)

    Q_values = Q_net(state_batch).gather(1, action_batch)   # q(s,a)

    with torch.no_grad():
        next_state_Q_values_array = target_Q_net(non_final_next_states).max(1)[0]                       # max_a(q_target(s',a))
        expected_Q_values_array = (next_state_Q_values_array.mul(mask_batch) * GAMMA) + reward_batch    # r + gamma * max_a(q_target(s',a)) * mask_batch

    criterion = nn.MSELoss()
    loss = criterion(Q_values, expected_Q_values_array.unsqueeze(-1))

    optimizer.zero_grad()               # optimizer reset
    loss.backward()                     # calculate backprop
    for param in Q_net.parameters():
        param.grad.data.clamp_(-1, 1)   # clamp parameters of the network
    optimizer.step()                    # apply backprop

    return loss

DDQN optimizing code

In [None]:
import torch
import torch.nn as nn

def optimize_model_DDQN(buffer, BATCH_SIZE, Q_net, target_Q_net, optimizer, GAMMA):
    if len(buffer) < BATCH_SIZE:
        return
    transitions = buffer.sample(BATCH_SIZE)

    batch = buffer.Transition(*zip(*transitions))

    state_batch = torch.stack(batch.state)
    action_batch = torch.stack(batch.action).unsqueeze(-1)
    reward_batch = torch.stack(batch.reward)
    non_final_next_states = torch.stack(batch.next_state)
    mask_batch = torch.stack(batch.mask)

    Q_values = Q_net(state_batch).gather(1, action_batch)   # q(s,a)
    argmax_Q_values = Q_net(non_final_next_states).max(1)[1].unsqueeze(-1)  # argmax_a' q(s',a')
    with torch.no_grad():
        next_state_Q_values_array = target_Q_net(non_final_next_states).gather(1, argmax_Q_values).view(BATCH_SIZE)      # q_target(s',argmax_a' q(s', a'))
        expected_Q_values_array = (next_state_Q_values_array.mul(mask_batch) * GAMMA) + reward_batch    # r + gamma * max_a(q_target(s',a)) * mask_batch

    criterion = nn.MSELoss()
    loss = criterion(Q_values, expected_Q_values_array.unsqueeze(-1))

    optimizer.zero_grad()               # optimizer reset
    loss.backward()                     # calculate backprop
    for param in Q_net.parameters():
        param.grad.data.clamp_(-1, 1)   # clamp parameters of the network
    optimizer.step()                    # apply backprop

    return loss

# Define train.py and test.py

In [None]:
import torch
import numpy as np

def train(TIME_LIMIT, TARGET_UPDATE, steps_done, device, path, Q_net, target_Q_net, buffer, select_action, optimize_model, env):
    state = env.reset()
    ### reward_states_reset check
    np.savetxt(f'{path}/SimpleMaze_Reward_table_reset.txt', env.reward_states, fmt='%d')

    state = torch.tensor(state, device=device, dtype=torch.float32)

    for t in range(1, TIME_LIMIT+1): # t = 1 ~ TIME_LIMIT
        action = select_action(state, test=False)
        next_state, reward, done = env.step(action.item())  # action.item() are the pure values from the tensor
        reward = torch.tensor(reward,dtype=torch.float32 ,device=device)
        next_state = torch.tensor(next_state, device=device, dtype=torch.float32)

        buffer.push(state, action, next_state, reward, torch.tensor(1-int(done), device=device, dtype=torch.float32))
        state = next_state

        loss = optimize_model()

        if steps_done % TARGET_UPDATE == 0:
            q_target_state_dict = target_Q_net.state_dict()
            q_state_dict = Q_net.state_dict()
            for key in q_state_dict:
                q_target_state_dict[key] = q_state_dict[key]*0.5 + q_target_state_dict[key]*(1-0.5) # 0.5 is the tau value, soft update
            target_Q_net.load_state_dict(q_target_state_dict)
        if done:
            break
    return t,done,loss

In [None]:
import os
import torch
import datetime
import numpy as np

def test(X_SIZE, Y_SIZE, TIME_LIMIT, TEST_EPISODES, device, path, writer, Q_net, QnetToCell, select_action, env, i_episode, t):
    if i_episode % 100 == 0:
        done_stack = 0
        steps_stack = 0

        for test_episode in range(1, TEST_EPISODES+1):
            state = env.reset()
            state = torch.tensor(state, device=device, dtype=torch.float32)

            for test_t in range(1, TIME_LIMIT+1):
                action = select_action(state, test=True)
                next_state, _, done = env.step(action.item())
                next_state = torch.tensor(next_state, device=device, dtype=torch.float32)
                state = next_state
                if done:
                    break
            done_stack += int(done)
            steps_stack += test_t

            writer.add_scalar('success_rate/test', done_stack/TEST_EPISODES, i_episode)
            writer.add_scalar('steps_per_episode/test', steps_stack/TEST_EPISODES, i_episode)
            V_table, Action_table = QnetToCell.FillGridByQnet(Q_net, env, device)

            if not os.path.isdir(path+'/V_table_test') or not os.path.isdir(path+'/Action_table_test'):
                    os.makedirs(path+'/V_table_test')
                    os.makedirs(path+'/Action_table_test')

            now = datetime.datetime.now().strftime("%m-%d_%H:%M:%S")
            np.savetxt(f'{path}/V_table_test/V_table_test_{now}.txt', V_table, fmt='%.3f')
            np.savetxt(f'{path}/Action_table_test/Action_table_test_{now}.txt', Action_table, fmt='%d')

# Define main.py

In [None]:
# code source from https://tutorials.pytorch.kr/intermediate/reinforcement_q_learning.html

import os
import math
import random
import argparse
import datetime
import numpy as np

import torch
import torch.optim as optim

from torch.utils.tensorboard import SummaryWriter

X_SIZE          = 6
Y_SIZE          = 6

STATE_DIM       = 2
ACTION_DIM      = 4

NUM_EPISODES    = 1000 #
TIME_LIMIT      = 2*X_SIZE*Y_SIZE
TEST_EPISODES   = 10

TARGET_UPDATE   = NUM_EPISODES//10

RM_SIZE             = 1000000
BATCH_SIZE          = 2048
GAMMA               = 0.9
EPS_START           = 0.7
EPS_END             = 0.15
EPS_DECAY           = NUM_EPISODES

SAVE            = NUM_EPISODES//10

#### STEPS DONE
steps_done = 0

#### Argument parser
parser = argparse.ArgumentParser()

#### CHOOSE DEVICE AND MODEL
parser.add_argument('--gpu', type=str, default='0', help='GPU ID')
parser.add_argument('--model', type=str, default='DQN', help='DQN or DDQN')
args = parser.parse_args(args=['--model', 'DQN'])

device = torch.device(f'cuda:{args.gpu}' if torch.cuda.is_available() else 'cpu')

#### logging
now_day = datetime.datetime.now().strftime("%m-%d")
now = datetime.datetime.now().strftime("%m-%d_%H:%M:%S")

path = f'./results_{args.model}/{now_day}_{X_SIZE, Y_SIZE}_GAM_{GAMMA}_NE_{NUM_EPISODES}_TU_{TARGET_UPDATE}_END_{EPS_END}_{RM_SIZE}_BS_{BATCH_SIZE}/result_{now}'
writer = SummaryWriter(f'{path}/tensorboard_{now}')

#### Networks
Q_net = QNET(STATE_DIM, ACTION_DIM).to(device)
target_Q_net = QNET(STATE_DIM, ACTION_DIM).to(device)
target_Q_net.load_state_dict(Q_net.state_dict())
target_Q_net.eval()

#### optimizer
optimizer = optim.Adam(Q_net.parameters())   # Optimizer should only work in Q_net
buffer = ReplayMemory(RM_SIZE)

#### QnetToCell bug fix
class QnetToCell:
    def __init__(self, x_dim, y_dim):
        self.V_states = np.zeros([x_dim, y_dim])
        self.Action = np.zeros([x_dim, y_dim])
        self.row = len(self.V_states)
        self.col = len(self.V_states[0])

    def FillGridByQnet(self, Qnet, env, device):
        for i in range(self.row):
            for j in range(self.col):
                state = np.array([i, j])
                torch_state = torch.tensor(state, dtype=torch.float32, device=device)

                self.V_states[i][j] = Qnet(torch_state).max(0)[0].item()
                self.Action[i][j] = Qnet(torch_state).max(0)[1].item()
        return self.V_states, self.Action

QnetToCell = QnetToCell(X_SIZE, Y_SIZE)

def select_action(state, test):
    global steps_done
    if test:
        return Q_net(state).max(0)[1]
    else:
        sample = random.random()    # random value b/w 0 ~ 1
        eps_threshold = EPS_END + (EPS_START - EPS_END) * \
            math.exp(-1. * steps_done / EPS_DECAY)
        steps_done += 1
        if sample > eps_threshold:
            with torch.no_grad():
                return Q_net(state).max(0)[1]
        else:
            return torch.tensor(random.randrange(ACTION_DIM), device=device, dtype=torch.int64)

def optimize_model():
    if args.model == 'DQN':
        return optimize_model_DQN(buffer, BATCH_SIZE, Q_net, target_Q_net, optimizer, GAMMA)
    else:
        return optimize_model_DDQN(buffer, BATCH_SIZE, Q_net, target_Q_net, optimizer, GAMMA)

if __name__ == '__main__':

    env = TwoDimArrayMap(X_SIZE, Y_SIZE)
    env.SimpleMazation()

    #### Maze logging
    if not os.path.isdir(path):
        os.makedirs(path)
    np.savetxt(f'{path}/SimpleMaze_table.txt', env.maze, fmt='%d')
    np.savetxt(f'{path}/SimpleMaze_Reward_table.txt', env.reward_states, fmt='%d')
    print(f'{path} is running...')

    #### Training
    done_stack = 0
    steps_stack = 0

    for i_episode in range(1, NUM_EPISODES+1):

        t, done, loss = train(TIME_LIMIT, TARGET_UPDATE, steps_done, device, path, Q_net, target_Q_net, buffer, select_action, optimize_model, env)

        ### tensorboard
        if loss is not None:
            writer.add_scalar('success_rate/train', int(done), i_episode)
            writer.add_scalar('steps_per_episode/train', t, i_episode)
            writer.add_scalar('Loss/train', loss, i_episode)

        test(X_SIZE, Y_SIZE, TIME_LIMIT, TEST_EPISODES, device, path, writer, Q_net, QnetToCell, select_action, env, i_episode, t)

        save_history_txt(SAVE, device, path, Q_net, QnetToCell, env, i_episode, t)

    writer.close()
    print(f'{path} is done')
    print('Complete')

./results_DQN/01-08_(6, 6)_GAM_0.9_NE_1000_TU_100_END_0.15_1000000_BS_2048/result_01-08_06:33:01 is running...
--------100 is saved with 72 steps. V_table and Action table
--------200 is saved with 16 steps. V_table and Action table
--------300 is saved with 13 steps. V_table and Action table
--------400 is saved with 14 steps. V_table and Action table
--------500 is saved with 24 steps. V_table and Action table
--------600 is saved with 18 steps. V_table and Action table
--------700 is saved with 13 steps. V_table and Action table
--------800 is saved with 13 steps. V_table and Action table
--------900 is saved with 17 steps. V_table and Action table
--------1000 is saved with 13 steps. V_table and Action table
./results_DQN/01-08_(6, 6)_GAM_0.9_NE_1000_TU_100_END_0.15_1000000_BS_2048/result_01-08_06:33:01 is done
Complete


In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir=./