# **Imports for making a maze**

In [0]:
import sys
import math
import copy
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
from collections import namedtuple
from itertools import count
from IPython.display import HTML
from IPython import display as ipythondisplay
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
from torch.autograd import Variable

is_ipython = 'inline' in matplotlib.get_backend()

## **Maze**

In [0]:
class Maze(object):
    def __init__(self, size=10, blocks_rate=0.1, reward=-1.0):
        self.size = size if size > 3 else 10
        self.blocks = int((size ** 2) * blocks_rate) 
        self.s_list = []
        self.maze_list = []
        self.e_list = []
        self.start_point = None
        self.goal_point = None
        self.reward = reward
        self.reward_nb = reward - 5.0
        self.reward_g = reward + 49.0
        self.generate_maze()

    def create_mid_lines(self, k):
        if k == 0: self.maze_list.append(self.s_list)
        elif k == self.size - 1: self.maze_list.append(self.e_list)
        else:
            tmp_list = []
            for l in range(0,self.size):
                if l == 0: tmp_list.extend("#")
                elif l == self.size-1: tmp_list.extend("#")
                else:
                    tmp_list.extend([self.reward])
            self.maze_list.append(tmp_list)

    def insert_blocks(self, s_r, e_r):
        b_y = random.randint(1, self.size-2)
        b_x = random.randint(1, self.size-2)
        if not([b_y, b_x] == [1, s_r] or [b_y, b_x] == [self.size - 2, e_r]):
          self.maze_list[b_y][b_x] = "$"
          self.maze_list[b_y-1][b_x] = self.reward_nb if self.maze_list[b_y-1][b_x] != "#" else "#"
          self.maze_list[b_y+1][b_x] = self.reward_nb if self.maze_list[b_y+1][b_x] != "#" else "#"
          self.maze_list[b_y][b_x-1] = self.reward_nb if self.maze_list[b_y][b_x-1] != "#" else "#"
          self.maze_list[b_y][b_x+1] = self.reward_nb if self.maze_list[b_y][b_x+1] != "#" else "#"
          self.maze_list[b_y+1][b_x+1] = self.reward_nb if self.maze_list[b_y+1][b_x+1] != "#" else "#"
          self.maze_list[b_y-1][b_x-1] = self.reward_nb if self.maze_list[b_y-1][b_x-1] != "#" else "#"
          self.maze_list[b_y+1][b_x-1] = self.reward_nb if self.maze_list[b_y+1][b_x-1] != "#" else "#"
          self.maze_list[b_y-1][b_x+1] = self.reward_nb if self.maze_list[b_y-1][b_x+1] != "#" else "#"
            
    def generate_maze(self): 
        s_r = random.randint(1, self.size - 2)
        for i in range(0, self.size):
            self.s_list.extend("#")
        self.start_point = [0, s_r]

        e_r = random.randint(1, self.size - 2)
        for j in range(0, self.size):
            self.e_list.extend("#")
        self.goal_point = [self.size - 1, e_r]

        for k in range(0, self.size):
            self.create_mid_lines(k)

        self.maze_list[self.start_point[0]][self.start_point[1]] = "S"
        self.maze_list[self.goal_point[0]][self.goal_point[1]] = self.reward_g
        
        for k in range(self.blocks):
            self.insert_blocks(s_r, e_r)

    def display(self, point=None):
        if point is not None:
            y, x = point
            old_v = self.maze_list[y][x]
            self.maze_list[y][x] = "@@"
        for line in self.maze_list:
            print ("\t" + "\t"+ "\t" + "%4s " * len(line) % tuple(line))
        if point is not None:        
            self.maze_list[y][x] = old_v

    def get_val(self, state):
        y, x = state[0],state[1]
        if state == self.start_point:
            return self.reward, False
        elif self.maze_list[y][x] == "$":
            return self.reward, True
        elif self.maze_list[y][x] == "#":
            return self.reward, True
        elif y < 0 or x < 0:
            return self.reward, True
        else:
            v = float(self.maze_list[y][x])
            if state == self.goal_point: 
                return v, True
            else: 
                return v, False

# **Generate Maze**

In [0]:
size = 15
barriar_rate = 0.01

maze_1 = Maze(size, barriar_rate)

maze_1.display()

			   #    #    #    #    #    #    #    #    S    #    #    #    #    #    # 
			   # -1.0 -1.0 -6.0    $ -6.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -6.0 -6.0 -6.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0 -6.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0    $    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0 -6.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0

In [0]:
class DQN(nn.Module):
  def __init__(self, num_actions):
    super().__init__()

    self.fc1 = nn.Linear(in_features=2,out_features=128)
    self.fc2 = nn.Linear(in_features=128,out_features=128)
    self.fc3 = nn.Linear(in_features=128,out_features=128)
    self.out = nn.Linear(in_features=128, out_features=num_actions)

  def forward(self,t):
    t = F.relu(self.fc1(t))
    t = F.relu(self.fc2(t))
    t = F.relu(self.fc3(t))
    t = self.out(t)
    return t

In [0]:
Experience = namedtuple('Experience',('state','action','next_state','reward'))

In [0]:
class ReplayMemory():
  def __init__(self,capacity):
    self.capacity = capacity
    self.memory = []
    self.push_count = 0

  def push(self,experience):
    if len(self.memory) < self.capacity:
      self.memory.append(None)
    self.memory[self.push_count] = experience
    self.push_count = (self.push_count + 1) % self.capacity

  def sample(self,batch_size):
    return random.sample(self.memory, batch_size)

  def can_provide_sample(self,batch_size):
    return len(self.memory)>=batch_size

In [0]:
class Agent():
  def __init__(self,start,end,decay,num_actions,device):
    self.current_step = 0
    self.num_actions = num_actions
    self.device = device
    self.start = start
    self.end = end
    self.decay = decay

  def select_action(self,state,policy_net,val=False):
    rate = self.end+(self.start-self.end)*math.exp(-1*self.current_step*self.decay)
    self.current_step+=1

    if rate > random.random() and not val:
        # print("in if")
        action = random.randrange(self.num_actions) # 0 means down; 1 means up; 2 means right; 3 means left
        return torch.tensor([action]).to(device)
    else:
        # print("in else")
        with torch.no_grad():
            state = torch.from_numpy(state).to(self.device)
            state = state.float()
            return policy_net(state).argmax(dim=0)

In [0]:
batch_size = 225
gamma = 0.9
eps_start = 0.95
eps_end = 0.01
eps_decay = 0.009
target_update = 10
memory_size = 10000
lr = 0.001
num_episodes = 100000
num_actions = 4

In [0]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
agent = Agent(eps_start,eps_end,eps_decay,num_actions,device)
memory = ReplayMemory(memory_size)

policy_net = DQN(num_actions).to(device)
# print(policy_net)
target_net = DQN(num_actions).to(device)
# print(target_net)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
optimizer = optim.Adam(params=policy_net.parameters(),lr=lr)

In [0]:
def extract_tensors(experiences):

    batch = Experience(*zip(*experiences))

    t1 = torch.stack(batch.state)
    t2 = torch.stack(batch.action)
    t3 = torch.stack(batch.reward)
    t4 = torch.stack(batch.next_state)

    return (t1,t2,t3,t4)

In [0]:
def plot(values,moving_avg_period):
  plt.figure(2)
  plt.clf()
  plt.title('Training..')
  plt.xlabel('Episode')
  plt.ylabel('Duration')
  plt.plot(values)
  moving_avg = get_moving_average(moving_avg_period,values)
  plt.plot(moving_avg)
  plt.pause(0.001)
  print("Episode",len(values),"\n",moving_avg_period,"episode moving avg:",moving_avg[-1])
  if is_ipython: ipythondisplay.clear_output(wait=True)

def get_moving_average(period,values):
  values = torch.tensor(values,dtype=torch.float)
  if len(values)>=period:
    moving_avg = values.unfold(dimension=0, size=period,step=1).mean(dim=1).flatten(start_dim=0)
    moving_avg = torch.cat((torch.zeros(period-1),moving_avg))
    return moving_avg.numpy()
  else:
    moving_avg = torch.zeros(len(values))
    return moving_avg.numpy()

In [0]:
episode_duration = []
movable_vec = [[1,0],[-1,0],[0,1],[0,-1]] # 0 means down; 1 means up; 2 means right; 3 means left
goal_count = 0

wrong = True
for episode in range(num_episodes):
    # maze = Maze(size, barriar_rate)
    # start_point=maze_1.start_point
    # state = np.asarray(start_point)
    while wrong:
      x = random.randrange(1,maze_1.size-2)
      y = random.randrange(1,maze_1.size-2)
      _, final = maze_1.get_val([y,x])
      if not final:
        start_point = [y,x]
        state = np.asarray(start_point) 
        wrong = False
    loss = None

    for timestep in count():
        action = agent.select_action(state,policy_net)
        action = action.item() # 0 means down; 1 means up; 2 means right; 3 means left
        next_state = [x + y for x,y in zip(state, movable_vec[action])]
        reward,done = maze_1.get_val(next_state)
        state_mem = torch.from_numpy(state)
        next_state_mem = torch.from_numpy(np.asarray(next_state))
        reward_mem = torch.from_numpy(np.asarray(reward))
        action_mem = torch.from_numpy(np.asarray(action))
        memory.push(Experience(state_mem,action_mem,next_state_mem,reward_mem))
        state = np.asarray(next_state)

        if memory.can_provide_sample(batch_size):
            experiences = memory.sample(batch_size)
            states,actions,rewards,next_states = extract_tensors(experiences)
            # all_batch = torch.cat((states.float(),actions.float().unsqueeze(1),next_states.float(),rewards.float().unsqueeze(1)),1)
            # print(all_batch)
            states = states.float()
            actions = actions.to(device)
            final_state_locations = []
            
            for i in next_states.numpy():
                _, final = maze_1.get_val(i.tolist())
                if final:
                    final_state_locations.append(True)
                else:
                    final_state_locations.append(False)
            # print(final_state_locations)
            non_final_state_locations = [final_state_location==False for final_state_location in final_state_locations]
            # print(non_final_state_locations)
            non_final_next_states = next_states[non_final_state_locations]
            # print(non_final_states)

            ## Compute the current Q values
            current_q_values = policy_net(states.to(device)).gather(dim=1,index=actions.unsqueeze(-1))
            
            ## Compute V(S_{t+1}) for all next states.
            next_q_values = torch.zeros(batch_size).to(device)
            non_final_next_states = non_final_next_states.float()
            next_q_values[non_final_state_locations] = target_net(non_final_next_states.to(device)).max(dim=1)[0].detach()
            next_q_values = next_q_values.to(device)
            # print(type(next_q_values),type(gamma),type(rewards))

            ## Compute the expected Q values
            target_q_values = (next_q_values*gamma)+rewards.to(device)
            target_q_values = target_q_values.double()

            ## Compute Huber loss
            loss = F.smooth_l1_loss(current_q_values.double(),target_q_values.unsqueeze(1))
            # loss = F.mse_loss(current_q_values.double(),target_q_values.unsqueeze(1))
            
            ## Optimize the model
            optimizer.zero_grad()
            loss.backward()
            for param in policy_net.parameters():
              param.grad.data.clamp_(-1, 1)
            optimizer.step()

        if done:
            print("episode:",episode)
            print("Start point:", start_point)
            if state.tolist() == maze_1.goal_point:
                print('Goal Reached')
                goal_count+=1
            else:
                print('Episode ended at:',state)
            episode_duration.append(timestep)
            print("Timesteps:", timestep)
            if loss is not None:
              print("Loss:", loss.item())
            break
    
    if episode%target_update==0:
        target_net.load_state_dict(policy_net.state_dict())
    
    wrong = True

print("number of times it reached the goal:",goal_count)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Start point: [6, 5]
Episode ended at: [6 0]
Timesteps: 4
Loss: 0.0035234222420617205
episode: 6242
Start point: [6, 3]
Episode ended at: [6 0]
Timesteps: 2
Loss: 0.0035925125838279152
episode: 6243
Start point: [7, 1]
Episode ended at: [7 0]
Timesteps: 0
Loss: 0.00555787642331844
episode: 6244
Start point: [8, 7]
Episode ended at: [14 10]
Timesteps: 8
Loss: 0.003792114558860628
episode: 6245
Start point: [11, 5]
Episode ended at: [14  5]
Timesteps: 2
Loss: 0.0038422645852784695
episode: 6246
Start point: [1, 7]
Episode ended at: [0 7]
Timesteps: 0
Loss: 0.002294426291246353
episode: 6247
Start point: [4, 12]
Episode ended at: [ 0 12]
Timesteps: 3
Loss: 0.0032285594199462295
episode: 6248
Start point: [9, 4]
Episode ended at: [14  4]
Timesteps: 4
Loss: 0.002704460725770905
episode: 6249
Start point: [7, 1]
Episode ended at: [7 0]
Timesteps: 0
Loss: 0.004432456009814962
episode: 6250
Start point: [11, 6]
Episode ended at: [

KeyboardInterrupt: ignored

# Try the model

In [0]:
start_point = maze_1.start_point
state = np.asarray(start_point)
score = 0
steps = 0
while True:
    steps += 1
    action = agent.select_action(np.array(state),target_net,True)
    print("current state: {0} -> action: {1} ".format(state, action.item()))
    next_state = [x + y for x,y in zip(state, movable_vec[int(action.item())])]
    reward, done = maze_1.get_val(next_state)
    maze_1.display(state)
    score = score + reward
    state = next_state
    print("current step: {0} \t score: {1}\n".format(steps, score))
    if done and state == maze_1.goal_point:
        maze_1.display(state)
        print("goal!")
        break
    elif done:
        print("T.T")
        break

current state: [0 8] -> action: 0 
			   #    #    #    #    #    #    #    #   @@    #    #    #    #    #    # 
			   # -1.0 -1.0 -6.0    $ -6.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -6.0 -6.0 -6.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0 -6.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0    $    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -6.0 -6.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0 -1.0    # 
			   # -1.0 -1.0