# Pre-Experiment

In this part, a basic DQN architecture is implemented and tested on the self-created mazes with easy-training scale but the same designing specifications with the required problem. The details are as follows.

#### Library Illustration

In [None]:
import keras
from __future__ import print_function
import os, sys, time, datetime, json, random
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from tensorflow.keras.optimizers import SGD,Adam,RMSprop
from keras.layers.advanced_activations import PReLU
import matplotlib.pyplot as plt
%matplotlib inline

#### Self-created Maze Definition

In [None]:
maze = np.array([
    [ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0],
    [ 0,  1,  1,  1,  1,  0,  0,  0,  1,  0],
    [ 0,  1,  0,  0,  1,  1,  1,  1,  1,  0],
    [ 0,  1,  0,  0,  0,  1,  0,  0,  1,  0],
    [ 0,  1,  0,  1,  0,  1,  0,  0,  1,  0],
    [ 0,  1,  0,  1,  0,  1,  0,  0,  0,  0],
    [ 0,  1,  1,  1,  0,  1,  1,  1,  1,  0],
    [ 0,  1,  0,  0,  0,  1,  0,  0,  0,  0],
    [ 0,  1,  0,  0,  0,  1,  1,  1,  1,  0],
    [ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0]
])
print(maze)

Within this static (10,10) maze, thet agent need to travel from (1,1) to (9,9), avoiding all the walls (represented by '0') in its journey.

#### Action Definition
Include travel trace mark, action encoding, and a exploration factor.

In [None]:
visited_mark = 2
agent_mark = 2
LEFT = 0
UP = 1
RIGHT = 2
DOWN = 3

# Actions dictionary
actions_list = {
    LEFT: 'left',
    UP: 'up',
    RIGHT: 'right',
    DOWN: 'down',
}

num_actions = len(actions_list)

# Exploration factor
epsilon = 0.1

#### Basic Rules Setting
Return +10 for reaching the destination, in order to positively award the arrival greatly and only;

Return -10 for entering the regions of wall, in order to stop the illegal action with severe punishment; 

Return -0.01 for every single action, in order to encourage the agent to seek the shortest routine;

Return -0.1 for revisiting a experienced block, in order to prevent the agent from struggling around;

Eliminate actions hitting the walls and boundaries;

Reset game when sum of rewards exceeding (10 * 10) * -0.01;

In [None]:
class DQNmaze(object):
    # Maze Class Initialization
    def __init__(self, maze, agent=(1,1)):
        self.maze = np.array(maze)
        nrows, ncols = self.maze.shape
        self.target = (8,8)
        self.path = [(r,c) for r in range(nrows) for c in range(ncols) if self.maze[r,c] == 1]
        self.reset(agent)

    # Reset Agent State 
    def reset(self, agent):
        self.agent = agent
        self.maze = np.copy(self.maze)
        nrows, ncols = self.maze.shape
        row, col = agent
        self.maze[row, col] = agent_mark
        self.state = (row, col, 'start')
        self.threshold = -0.01 * self.maze.shape[0]**2
        self.reward_sum = 0
        self.visited = set()
        
    # Update State Corresponding to Action Encoding and Validation
    def update_state(self, action):
        nrows, ncols = self.maze.shape
        nrow, ncol, nmode = agent_row, agent_col, mode = self.state
        if self.maze[agent_row, agent_col] > 0:
            self.visited.add((agent_row, agent_col))  # mark visited cell
        valid_actions = self.valid_actions()
        if not valid_actions:
            nmode = 'blocked'
        elif action in valid_actions:
            nmode = 'valid'
            if action == LEFT:
                ncol -= 1
            elif action == UP:
                nrow -= 1
            elif action == RIGHT:
                ncol += 1
            elif action == DOWN:
                nrow += 1
        else:
            mode = 'invalid'
        self.state = (nrow, ncol, nmode)

    # Update Reward based on Pre-defined Rules
    def get_reward(self):
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 8 and agent_col == 8:
            return 10.0
        if mode == 'blocked':
            return self.min_reward - 1
        if (agent_row, agent_col) in self.visited:
            return -0.1
        if mode == 'invalid':
            return -10
        if mode == 'valid':
            return -0.01
        
    # Record Reflections to Action
    def act(self, action):
        self.update_state(action)
        reward = self.get_reward()
        self.reward_sum += reward
        status = self.travel_status()
        envstate = self.observe()
        return envstate, reward, status

    # Input Environment Information as Vector
    def observe(self):
        canvas = self.draw_env()
        envstate = canvas.reshape((1, -1))
        return envstate

    # Visualization Encodes
    def draw_env(self):
        canvas = np.copy(self.maze)
        nrows, ncols = self.maze.shape
        for r in range(nrows):
            for c in range(ncols):
                if canvas[r,c] > 0.0:
                    canvas[r,c] = 0.01
        row, col, valid = self.state
        canvas[row, col] = agent_mark
        return canvas

    # Judging Travel Status 
    def travel_status(self):
        if self.reward_sum < self.threshold:
            return 'lose'
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 8 and agent_col == 8:
            return 'win'
        return 'not_over'

    # Validation for Actions: Agent is not allowed to go into the walls
    def valid_actions(self, cell=None):
        if cell is None:
            row, col, mode = self.state
        else:
            row, col = cell
        actions = [0, 1, 2, 3]
        nrows, ncols = self.maze.shape
        
        if row == 1:
            actions.remove(1)
        elif row>1 and self.maze[row-1,col] == 0:
            actions.remove(1)
        if row == nrows-2:
            actions.remove(3)
        elif row<nrows-2 and self.maze[row+1,col] == 0:
            actions.remove(3)

        if col == 1:
            actions.remove(0)
        elif col>1 and self.maze[row,col-1] == 0:
            actions.remove(0)    
        if col == ncols-2:
            actions.remove(2)
        elif col<ncols-2 and self.maze[row,col+1] == 0:
            actions.remove(2)

        return actions

#### Action Funtion Test 

In [None]:
# Test Sample Visulization in Gray-scale 
def show(DQNmaze):
    plt.grid('on')
    nrows, ncols = DQNmaze.maze.shape
    ax = plt.gca()
    ax.set_xticks(np.arange(0.5, nrows, 1))
    ax.set_yticks(np.arange(0.5, ncols, 1))
    ax.set_xticklabels([])
    ax.set_yticklabels([])
    canvas = np.copy(DQNmaze.maze)
    for row,col in DQNmaze.visited:
        canvas[row,col] = 2
    agent_row, agent_col, _ = DQNmaze.state
    canvas[agent_row, agent_col] = 2 
    canvas[nrows-2, ncols-2] = 2
    img = plt.imshow(canvas, interpolation='none', cmap='gray')
    return img

In [None]:
DQNmaze = DQNmaze(maze)
show(DQNmaze)

In [None]:
DQNmaze = DQNmaze(maze)
DQNmaze.act(DOWN)
DQNmaze.act(DOWN)
DQNmaze.act(DOWN)
DQNmaze.act(DOWN)
DQNmaze.act(DOWN)
DQNmaze.act(DOWN)
show(DQNmaze)

In [None]:
DQNmaze = DQNmaze(maze)
DQNmaze.act(RIGHT)
DQNmaze.act(RIGHT)
DQNmaze.act(RIGHT)
DQNmaze.act(DOWN)
DQNmaze.act(RIGHT)
DQNmaze.act(DOWN)
show(DQNmaze)

### Start Travelling

In [None]:
# Define Travel Function as the Core of Iterations
# Inputs are Outputs of the Previous Step
def travel(model, DQNmaze, agent_start):
    DQNmaze.reset(agent_start)
    envstate = DQNmaze.observe()
    while True:
        prev_envstate = envstate 
        # get next action
        q = model.predict(prev_envstate)
        action = np.argmax(q[0])
        #  action, get rewards and new state
        envstateupdate, reward, travel_status = DQNmaze.act(action)
        if travel_status == 'win':
            return True
        elif travel_status == 'lose':
            return False

In [None]:
# Define Details in Eacch Iteration
class Experience(object):
    def __init__(self, model, limit=100, gamma=0.95):
        self.model = model
        self.limit = limit
        self.gamma = gamma
        self.memory = list()
        self.num_actions = model.output_shape[-1]

    # Store Previous Experiences as training data
    # Parameter Limit limits the Memory Upper Bound
    def remember(self, episode):
        # episode = [envstate, action, reward, envstate_next, travel_over]
        # memory[i] = episode
        self.memory.append(episode)
        if len(self.memory) > self.limit:
            del self.memory[0]

    def predict(self, envstate):
        return self.model.predict(envstate)[0]

    # Get Input Data and Targets for the Next Iteration
    def get_data(self, data_size=10):
        env_size = self.memory[0][0].shape[1]
        mem_size = len(self.memory)
        data_size = min(mem_size, data_size)
        inputs = np.zeros((data_size, env_size))
        targets = np.zeros((data_size, self.num_actions))
        for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)):
            envstate, action, reward, envstate_next, travel_over = self.memory[j]
            inputs[i] = envstate
            # There should be no target values for actions not taken.
            targets[i] = self.predict(envstate)
            # Q_sa = derived policy = max quality env/action = max_a' Q(s', a')
            Q_sa = np.max(self.predict(envstate_next))
            if game_over:
                targets[i, action] = reward
            else:
                # reward + gamma * max_a' Q(s', a')
                targets[i, action] = reward + self.gamma * Q_sa
        return inputs, targets

In [None]:
# Deinfe Neural Network
def build_model(maze, lr=0.001):
    model = Sequential()
    model.add(Dense(maze.size, input_shape=(maze.size,)))
    model.add(PReLU())
    model.add(Dense(maze.size))
    model.add(PReLU())
    model.add(Dense(num_actions))
    model.compile(optimizer='adam', loss='mse')
    return model

In [None]:
# Build Training Architecture
def qtrain(model, maze, **opt):
    global epsilon
    n_epoch = opt.get('n_epoch', 15000)
    limit = opt.get('limit', 1000)
    data_size = opt.get('data_size', 50)
    weights_file = opt.get('weights_file', "")
    name = opt.get('name', 'model')
    start_time = datetime.datetime.now()

    # Load Previous Weight File
    if weights_file:
        print("loading weights from file: %s" % (weights_file,))
        model.load_weights(weights_file)

    # Environment Construction
    dqnmaze = DQNmaze(maze)

    # Initialize Experience Object and Related Records
    experience = Experience(model, limit=limit)
    win_history = []   # history of win/lose game
    n_path = len(dqnmaze.path)
    hsize = dqnmaze.maze.size//2   # history window size
    win_rate = 0.0
    imctr = 1

    # Iteration Functioning
    for epoch in range(n_epoch):
        loss = 0.0
        agent = random.choice(dqnmaze.path)
        dqnmaze.reset(agent)
        travel_over = False

        envstate = dqnmaze.observe()

        n_episodes = 0
        while not travel_over:
            valid_actions = dqnmaze.valid_actions()
            if not valid_actions: break
                
            prev_envstate = envstate
            
            # Prediction of Next Action
            if np.random.rand() < epsilon:
                action = random.choice(valid_actions)
            else:
                action = np.argmax(experience.predict(prev_envstate))
                
            # Apply action, get reward and new envstate
            envstate, reward, travel_status = dqnmaze.act(action)
            if travel_status == 'win':
                win_history.append(1)
                travel_over = True
            elif travel_status == 'lose':
                win_history.append(0)
                travel_over = True
            else:
                travel_over = False
                
            # Store Experience
            episode = [prev_envstate, action, reward, envstate, travel_over]
            experience.remember(episode)
            n_episodes += 1
            
            # Train Neural Network
            inputs, targets = experience.get_data(data_size=data_size)
            h = model.fit(
                inputs,
                targets,
                epochs=8,
                batch_size=16,
                verbose=0,
            )
            loss = model.evaluate(inputs, targets, verbose=0)

        if len(win_history) > hsize:
            win_rate = sum(win_history[-hsize:]) / hsize
    
        dt = datetime.datetime.now() - start_time
        t = format_time(dt.total_seconds())
        template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | time: {}"
        print(template.format(epoch, n_epoch-1, loss, n_episodes, sum(win_history), win_rate, t))

        if win_rate > 0.9 : epsilon = 0.05
        if sum(win_history[-hsize:]) == hsize:
            print("Reached 100%% win rate at epoch: %d" % (epoch,))
            break

    # Save Trained Model Weights and Architecture
    h5file = name + ".h5"
    json_file = name + ".json"
    model.save_weights(h5file, overwrite=True)
    with open(json_file, "w") as outfile:
        json.dump(model.to_json(), outfile)
    end_time = datetime.datetime.now()
    dt = datetime.datetime.now() - start_time
    seconds = dt.total_seconds()
    t = format_time(seconds)
    print('files: %s, %s' % (h5file, json_file))
    print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t))
    return seconds

# Reset Time Units
def format_time(seconds):
    if seconds < 400:
        s = float(seconds)
        return "%.1f seconds" % (s,)
    elif seconds < 4000:
        m = seconds / 60.0
        return "%.2f minutes" % (m,)
    else:
        h = seconds / 3600.0
        return "%.2f hours" % (h,)

In [None]:
DQNmaze = DQNmaze(maze)
show(DQNmaze)

In [None]:
model = build_model(maze)
qtrain(model, maze, epochs=1000, limit=8*maze.size, data_size=32)

In [None]:
# Replay Game based on Trained Model
experience = Experience(model)
Output = experience.getdata()

# Formal Experiment

In this part, the DQN architecture is expanded and tested on the more challenging questions about large-scale static maze and a dynamic maze.

# Static Maze

#### Environment Setting Illustration
Load the basic information of the target maze and plot visualization for obervation use only.

load_maze function constructs the whole maze environment and the get_information function returns the surrounding information of a state.

In [None]:
import os
import numpy as np
import random
import read_maze
from read_maze import load_maze
from read_maze import get_local_maze_information as get_information
import matplotlib.pyplot as plt
load_maze()
depthmap = np.load('COMP6247Maze20212022.npy')
ax = plt.subplots(figsize = (100,100))
plt.imshow(depthmap)
plt.savefig('maze.png')

#### Action Definition and DQN Maze Class Construction
Apply the same action definitions as before.

Alter the model architecture due to unkonwn information of the whole maze.

Numerically alter the reward rules to fit the new maze of large scale.

Change the draw_env and the observe function as illustrating get_information to get surrounding information.

In [None]:
LEFT = 0
UP = 1
RIGHT = 2
DOWN = 3

# Actions dictionary
actions_list = {
    LEFT: 'left',
    UP: 'up',
    RIGHT: 'right',
    DOWN: 'down',
}

num_actions = len(actions_list)

# Exploration factor
epsilon = 0.1
class DQNmaze(object):
    # Maze Class Initialization
    def __init__(self, maze, agent=(1,1)):
        self.maze = np.array(maze)
        nrows, ncols = self.maze.shape
        self.target = (199,199)
        self.reset(agent)

    # Reset Agent State 
    def reset(self, agent):
        self.agent = agent
        self.maze = np.copy(self.maze)
        nrows, ncols = self.maze.shape
        row, col = agent
        self.state = (row, col, 'start')
        self.threshold = -0.01 * self.maze.shape[0]**2
        self.reward_sum = 0
        self.visited = set()
        
    # Update State Corresponding to Action Encoding and Validation
    def update_state(self, action):
        nrows, ncols = self.maze.shape
        nrow, ncol, nmode = agent_row, agent_col, mode = self.state
        if self.maze[agent_row, agent_col] > 0:
            self.visited.add((agent_row, agent_col))  # mark visited cell
        valid_actions = self.valid_actions()
        if not valid_actions:
            nmode = 'blocked'
        elif action in valid_actions:
            nmode = 'valid'
            if action == LEFT:
                ncol -= 1
            elif action == UP:
                nrow -= 1
            elif action == RIGHT:
                ncol += 1
            elif action == DOWN:
                nrow += 1
        else:
            mode = 'invalid'
        self.state = (nrow, ncol, nmode)

    # Update Reward based on Pre-defined Rules
    def get_reward(self):
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 199 and agent_col == 199:
            return 50
        if mode == 'blocked':
            return self.min_reward - 1
        if (agent_row, agent_col) in self.visited:
            return -0.1
        if mode == 'invalid':
            return -50
        if mode == 'valid':
            return -0.01
        
    # Record Reflections to Action
    def act(self, action):
        self.update_state(action)
        reward = self.get_reward()
        self.reward_sum += reward
        status = self.travel_status()
        envstate = self.observe()
        return envstate, reward, status

    # Input Environment Information as Vector
    def observe(self):
        canvas = self.draw_env()
        envstate = canvas.reshape((1, -1))
        return envstate

    # Visualization Encodes
    def draw_env(self):
        row, col, valid = self.state
        m = self.state[0]
        n = self.state[1]
        environment = get_information(m,n)
        canvas = np.zeros((3,3))
        for i in range(3):
            for j in range(3):
                canvas[i][j] = environment[i][j][0]
                self.maze[row+i-1][col+j-1] = environment[i][j][0]
        return canvas

    # Judging Travel Status 
    def travel_status(self):
        if self.reward_sum < self.threshold:
            return 'lose'
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 199 and agent_col == 199:
            return 'win'
        return 'not_over'

    # Validation for Actions: Agent is not allowed to go into the walls
    def valid_actions(self, cell=None):
        if cell is None:
            row, col, mode = self.state
        else:
            row, col = cell
        actions = [0, 1, 2, 3]
        nrows, ncols = self.maze.shape
        
        if row == 1:
            actions.remove(1)
        elif row>1 and self.maze[row-1,col] == 0:
            actions.remove(1)
        if row == nrows-2:
            actions.remove(3)
        elif row<nrows-2 and self.maze[row+1,col] == 0:
            actions.remove(3)

        if col == 1:
            actions.remove(0)
        elif col>1 and self.maze[row,col-1] == 0:
            actions.remove(0)    
        if col == ncols-2:
            actions.remove(2)
        elif col<ncols-2 and self.maze[row,col+1] == 0:
            actions.remove(2)
        return actions

#### Travel and Training Definition
The Input has to be changed to the Surrounding Environment of the State.

This leads to no significant change to the algorithm architecture, but change the core thinking.

We now aim to learn the mapping from the environment of (3,3) to the action.

In [None]:
# Define Travel Function as the Core of Iterations
# Inputs are Outputs of the Previous Step
def travel(model, DQNmaze, agent_start):
    DQNmaze.reset(agent_start)
    envstate = DQNmaze.observe()
    while True:
        prev_envstate = envstate 
        # get next action
        q = model.predict(prev_envstate)
        action = np.argmax(q[0])
        #  action, get rewards and new state
        envstateupdate, reward, travel_status = DQNmaze.act(action)
        if travel_status == 'win':
            return True
        elif travel_status == 'lose':
            return False
        
# Define Details in Each Iteration
class Experience(object):
    def __init__(self, model, limit=3000, gamma=0.95):
        self.model = model
        self.limit = limit
        self.gamma = gamma
        self.memory = list()
        self.num_actions = model.output_shape[-1]

    # Store Previous Experiences as training data
    # Parameter Limit limits the Memory Upper Bound
    def remember(self, episode):
        # episode = [envstate, action, reward, envstate_next, travel_over]
        # memory[i] = episode
        self.memory.append(episode)
        if len(self.memory) > self.limit:
            del self.memory[0]

    def predict(self, envstate):
        return self.model.predict(envstate)[0]

    # Get Input Data and Targets for the Next Iteration
    def get_data(self, data_size=10):
        env_size = self.memory[0][0].shape[1]
        mem_size = len(self.memory)
        data_size = min(mem_size, data_size)
        inputs = np.zeros((data_size, env_size))
        targets = np.zeros((data_size, self.num_actions))
        for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)):
            envstate, action, reward, envstate_next, travel_over = self.memory[j]
            inputs[i] = envstate
            # There should be no target values for actions not taken.
            targets[i] = self.predict(envstate)
            # Q_sa = derived policy = max quality env/action = max_a' Q(s', a')
            Q_sa = np.max(self.predict(envstate_next))
            if travel_over:
                targets[i, action] = reward
            else:
                # reward + gamma * max_a' Q(s', a')
                targets[i, action] = reward + self.gamma * Q_sa
        return inputs, targets
    
# Deinfe Neural Network
def build_model(lr=0.001):
    model = Sequential()
    model.add(Dense(9, input_shape=(9,)))
    model.add(PReLU())
    model.add(Dense(9))
    model.add(PReLU())
    model.add(Dense(num_actions))
    model.compile(optimizer='adam', loss='mse')
    return model

# Build Training Architecture
def qtrain(model, maze, **opt):
    global epsilon
    n_epoch = opt.get('n_epoch', 15000)
    limit = opt.get('limit', 3000)
    data_size = opt.get('data_size', 50)
    weights_file = opt.get('weights_file', "")
    name = opt.get('name', 'model')
    start_time = datetime.datetime.now()

    # Load Previous Weight File
    if weights_file:
        print("loading weights from file: %s" % (weights_file,))
        model.load_weights(weights_file)

    # Environment Construction
    dqnmaze = DQNmaze(maze)

    # Initialize Experience Object and Related Records
    experience = Experience(model, limit=limit)
    win_history = []   # history of win/lose game
    hsize = dqnmaze.maze.size//2   # history window size
    win_rate = 0.0
    imctr = 1

    # Iteration Functioning
    for epoch in range(n_epoch):
        loss = 0.0
        agent = (1,1)
        dqnmaze.reset(agent)
        travel_over = False

        envstate = dqnmaze.observe()

        n_episodes = 0
        while not travel_over:
            valid_actions = dqnmaze.valid_actions()
            if not valid_actions: break
                
            prev_envstate = envstate
            
            # Prediction of Next Action
            if np.random.rand() < epsilon:
                action = random.choice(valid_actions)
            else:
                action = np.argmax(experience.predict(prev_envstate))
                
            # Apply action, get reward and new envstate
            envstate, reward, travel_status = dqnmaze.act(action)
            if travel_status == 'win':
                win_history.append(1)
                travel_over = True
            elif travel_status == 'lose':
                win_history.append(0)
                travel_over = True
            else:
                travel_over = False
                
            # Store Experience
            episode = [prev_envstate, action, reward, envstate, travel_over]
            experience.remember(episode)
            n_episodes += 1
            
            # Train Neural Network
            inputs, targets = experience.get_data(data_size=data_size)
            h = model.fit(
                inputs,
                targets,
                epochs=8,
                batch_size=16,
                verbose=0,
            )
            loss = model.evaluate(inputs, targets, verbose=0)

        if len(win_history) > hsize:
            win_rate = sum(win_history[-hsize:]) / hsize
    
        dt = datetime.datetime.now() - start_time
        t = format_time(dt.total_seconds())
        template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | time: {}"
        print(template.format(epoch, n_epoch-1, loss, n_episodes, sum(win_history), win_rate, t))

        if win_rate > 0.9 : epsilon = 0.05
        if sum(win_history[-hsize:]) == hsize:
            print("Reached 100%% win rate at epoch: %d" % (epoch,))
            break

    # Save Trained Model Weights and Architecture
    h5file = name + ".h5"
    json_file = name + ".json"
    model.save_weights(h5file, overwrite=True)
    with open(json_file, "w") as outfile:
        json.dump(model.to_json(), outfile)
    end_time = datetime.datetime.now()
    dt = datetime.datetime.now() - start_time
    seconds = dt.total_seconds()
    t = format_time(seconds)
    print('files: %s, %s' % (h5file, json_file))
    print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t))
    return seconds

# Reset Time Units
def format_time(seconds):
    if seconds < 400:
        s = float(seconds)
        return "%.1f seconds" % (s,)
    elif seconds < 4000:
        m = seconds / 60.0
        return "%.2f minutes" % (m,)
    else:
        h = seconds / 3600.0
        return "%.2f hours" % (h,)

#### Training and Testing
Without the whole maze information, a blank maze has to be defined for the agent to travel. And the model need to be initialized with blank environemnt

The Results are stored in the output file.

In [None]:
maze = np.zeros((201,201))
environ = np.zeros((3,3))
model = build_model(environ)
qtrain(model, maze, epochs=10000, limit=0.1*maze.size, data_size=32)

In [None]:
experience = Experience(model)
Environment,Action = experience.getdata()
for i in range(Environment.shape[0]-1):
    MazeInfo = []
    m,n = location[i]
    for j in len(Environment[i]):
            if j==4:
                MazeInfo.append("Location")
            else:
                if (Environment[i]):
                    MazeInfo.append("   Path   ")
                else:
                    MazeInfo.append("   Wall   ")
    File = open('Solution.txt',mode='a')
    File.writelines(['Time : ',str(i),'\n'])
    File.writelines(['Location : ( ',str(int(m)),', ',str(int(n)),' )\n'])
    File.writelines(['Environment : ',MazeInfo[0],MazeInfo[1],MazeInfo[2],'\n'])
    File.writelines(['                      ',MazeInfo[3],MazeInfo[4],MazeInfo[5],'\n'])
    File.writelines(['                      ',MazeInfo[6],MazeInfo[7],MazeInfo[8],'\n'])
    File.writelines(['Action : ',Action[i]])
    File.writelines(['\n','\n'])
    File.close()

# Dynamic Maze

#### Action Definition and DQN Maze Class Construction
Alter the action definitions, adding wait option.

Numerically alter the reward rules to fit the new actions.

Change the draw_env and the observe functions encoding the Fire state.

In [None]:
LEFT = 0
UP = 1
RIGHT = 2
DOWN = 3
WAIT = 4

# Actions dictionary
actions_list = {
    LEFT: 'left',
    UP: 'up',
    RIGHT: 'right',
    DOWN: 'down',
    WAIT: 'wait',
}

num_actions = len(actions_list)

# Exploration factor
epsilon = 0.2
class DQNmaze(object):
    # Maze Class Initialization
    def __init__(self, maze, agent=(1,1)):
        self.maze = np.array(maze)
        nrows, ncols = self.maze.shape
        self.target = (199,199)
        self.reset(agent)

    # Reset Agent State 
    def reset(self, agent):
        self.agent = agent
        self.maze = np.copy(self.maze)
        nrows, ncols = self.maze.shape
        row, col = agent
        self.state = (row, col, 'start')
        self.threshold = -0.05 * self.maze.shape[0]**2
        self.reward_sum = 0
        self.visited = set()
        
    # Update State Corresponding to Action Encoding and Validation
    def update_state(self, action):
        nrows, ncols = self.maze.shape
        nrow, ncol, nmode = agent_row, agent_col, mode = self.state
        if self.maze[agent_row, agent_col] > 0:
            self.visited.add((agent_row, agent_col))  # mark visited cell
        valid_actions = self.valid_actions()
        if not valid_actions:
            nmode = 'blocked'
        elif action in valid_actions:
            nmode = 'valid'
            if action == LEFT:
                ncol -= 1
            elif action == UP:
                nrow -= 1
            elif action == RIGHT:
                ncol += 1
            elif action == DOWN:
                nrow += 1
            elif action == WAIT:
                nrow = nrow
                ncol = ncol
                mode = 'stop'
        else:
            mode = 'invalid'
        self.state = (nrow, ncol, nmode)

    # Update Reward based on Pre-defined Rules
    def get_reward(self):
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 199 and agent_col == 199:
            return 100
        if mode == 'blocked':
            return self.min_reward - 1
        if (agent_row, agent_col) in self.visited:
            return -1
        if mode == 'invalid':
            return -100
        if mode == 'valid':
            return -0.01
        if mode == 'stop':
            return -0.1
        
    # Record Reflections to Action
    def act(self, action):
        self.update_state(action)
        reward = self.get_reward()
        self.reward_sum += reward
        status = self.travel_status()
        envstate = self.observe()
        return envstate, reward, status

    # Input Environment Information as Vector
    def observe(self):
        canvas = self.draw_env()
        envstate = canvas.reshape((1, -1))
        
        return envstate

    # Visualization Encodes
    def draw_env(self):
        row, col, valid = self.state
        m = self.state[0]
        n = self.state[1]
        environment = get_information(m,n)
        canvas = np.zeros((3,3))
        for i in range(3):
            for j in range(3):
                canvas[i][j] = environment[i][j][0]
                self.maze[row+i-1][col+j-1] = environment[i][j][0]
                if canvas[i][j] and environment [i][j][1]:
                    canvas[i][j] = 2
                    self.maze[row+i-1][col+j-1] = 0
        return canvas

    # Judging Travel Status 
    def travel_status(self):
        if self.reward_sum < self.threshold:
            return 'lose'
        agent_row, agent_col, mode = self.state
        nrows, ncols = self.maze.shape
        if agent_row == 199 and agent_col == 199:
            return 'win'
        return 'not_over'

    # Validation for Actions: Agent is not allowed to go into the walls and fires
    def valid_actions(self, cell=None):
        if cell is None:
            row, col, mode = self.state
        else:
            row, col = cell
        actions = [0, 1, 2, 3,4]
        nrows, ncols = self.maze.shape
        
        if row == 1:
            actions.remove(1)
        elif row>1 and self.maze[row-1,col] == 0:
            actions.remove(1)
        if row == nrows-2:
            actions.remove(3)
        elif row<nrows-2 and self.maze[row+1,col] == 0:
            actions.remove(3)

        if col == 1:
            actions.remove(0)
        elif col>1 and self.maze[row,col-1] == 0:
            actions.remove(0)    
        if col == ncols-2:
            actions.remove(2)
        elif col<ncols-2 and self.maze[row,col+1] == 0:
            actions.remove(2)

        return actions

#### Travel and Training

In [None]:
# Define Travel Function as the Core of Iterations
# Inputs are Outputs of the Previous Step
def travel(model, DQNmaze, agent_start):
    DQNmaze.reset(agent_start)
    envstate = DQNmaze.observe()
    while True:
        prev_envstate = envstate 
        # get next action
        q = model.predict(prev_envstate)
        action = np.argmax(q[0])
        #  action, get rewards and new state
        envstateupdate, reward, travel_status = DQNmaze.act(action)
        if travel_status == 'win':
            return True
        elif travel_status == 'lose':
            return False
        
# Define Details in Each Iteration
class Experience(object):
    def __init__(self, model, limit=5000, gamma=0.95):
        self.model = model
        self.limit = limit
        self.gamma = gamma
        self.memory = list()
        self.num_actions = model.output_shape[-1]

    # Store Previous Experiences as training data
    # Parameter Limit limits the Memory Upper Bound
    def remember(self, episode):
        # episode = [envstate, action, reward, envstate_next, travel_over]
        # memory[i] = episode
        self.memory.append(episode)
        if len(self.memory) > self.limit:
            del self.memory[0]

    def predict(self, envstate):
        return self.model.predict(envstate)[0]

    # Get Input Data and Targets for the Next Iteration
    def get_data(self, data_size=10):
        env_size = self.memory[0][0].shape[1]
        mem_size = len(self.memory)
        data_size = min(mem_size, data_size)
        inputs = np.zeros((data_size, env_size))
        targets = np.zeros((data_size, self.num_actions))
        for i, j in enumerate(np.random.choice(range(mem_size), data_size, replace=False)):
            envstate, action, reward, envstate_next, travel_over = self.memory[j]
            inputs[i] = envstate
            # There should be no target values for actions not taken.
            targets[i] = self.predict(envstate)
            # Q_sa = derived policy = max quality env/action = max_a' Q(s', a')
            Q_sa = np.max(self.predict(envstate_next))
            if travel_over:
                targets[i, action] = reward
            else:
                # reward + gamma * max_a' Q(s', a')
                targets[i, action] = reward + self.gamma * Q_sa
        return inputs, targets
    
# Deinfe Neural Network
def build_model(lr=0.001):
    model = Sequential()
    model.add(Dense(9, input_shape=(9,)))
    model.add(PReLU())
    model.add(Dense(9))
    model.add(PReLU())
    model.add(Dense(num_actions))
    model.compile(optimizer='adam', loss='mse')
    return model

# Build Training Architecture
def qtrain(model, maze, **opt):
    global epsilon
    n_epoch = opt.get('n_epoch', 15000)
    limit = opt.get('limit', 5000)
    data_size = opt.get('data_size', 50)
    weights_file = opt.get('weights_file', "")
    name = opt.get('name', 'model')
    start_time = datetime.datetime.now()

    # Load Previous Weight File
    if weights_file:
        print("loading weights from file: %s" % (weights_file,))
        model.load_weights(weights_file)

    # Environment Construction
    dqnmaze = DQNmaze(maze)

    # Initialize Experience Object and Related Records
    experience = Experience(model, limit=limit)
    win_history = []   # history of win/lose game
    hsize = dqnmaze.maze.size//2   # history window size
    win_rate = 0.0
    imctr = 1

    # Iteration Functioning
    for epoch in range(n_epoch):
        loss = 0.0
        agent = (1,1)
        dqnmaze.reset(agent)
        travel_over = False

        envstate = dqnmaze.observe()

        n_episodes = 0
        while not travel_over:
            valid_actions = dqnmaze.valid_actions()
            if not valid_actions: break
                
            prev_envstate = envstate
            
            # Prediction of Next Action
            if np.random.rand() < epsilon:
                action = random.choice(valid_actions)
            else:
                action = np.argmax(experience.predict(prev_envstate))
                
            # Apply action, get reward and new envstate
            envstate, reward, travel_status = dqnmaze.act(action)
            if travel_status == 'win':
                win_history.append(1)
                travel_over = True
            elif travel_status == 'lose':
                win_history.append(0)
                travel_over = True
            else:
                travel_over = False
                
            # Store Experience
            episode = [prev_envstate, action, reward, envstate, travel_over]
            experience.remember(episode)
            n_episodes += 1
            
            # Train Neural Network
            inputs, targets = experience.get_data(data_size=data_size)
            h = model.fit(
                inputs,
                targets,
                epochs=8,
                batch_size=16,
                verbose=0,
            )
            loss = model.evaluate(inputs, targets, verbose=0)

        if len(win_history) > hsize:
            win_rate = sum(win_history[-hsize:]) / hsize
    
        dt = datetime.datetime.now() - start_time
        t = format_time(dt.total_seconds())
        template = "Epoch: {:03d}/{:d} | Loss: {:.4f} | Episodes: {:d} | Win count: {:d} | Win rate: {:.3f} | time: {}"
        print(template.format(epoch, n_epoch-1, loss, n_episodes, sum(win_history), win_rate, t))

        if win_rate > 0.9 : epsilon = 0.05
        if sum(win_history[-hsize:]) == hsize:
            print("Reached 100%% win rate at epoch: %d" % (epoch,))
            break

    # Save Trained Model Weights and Architecture
    h5file = name + ".h5"
    json_file = name + ".json"
    model.save_weights(h5file, overwrite=True)
    with open(json_file, "w") as outfile:
        json.dump(model.to_json(), outfile)
    end_time = datetime.datetime.now()
    dt = datetime.datetime.now() - start_time
    seconds = dt.total_seconds()
    t = format_time(seconds)
    print('files: %s, %s' % (h5file, json_file))
    print("n_epoch: %d, max_mem: %d, data: %d, time: %s" % (epoch, max_memory, data_size, t))
    return seconds

# Reset Time Units
def format_time(seconds):
    if seconds < 400:
        s = float(seconds)
        return "%.1f seconds" % (s,)
    elif seconds < 4000:
        m = seconds / 60.0
        return "%.2f minutes" % (m,)
    else:
        h = seconds / 3600.0
        return "%.2f hours" % (h,)

In [None]:
maze = np.zeros((201,201))
environ = np.zeros((3,3))
model = build_model(environ)
qtrain(model, maze, epochs=10000, limit=0.5*maze.size, data_size=32)

In [None]:
experience = Experience(model)
Environment,Action = experience.getdata()
for i in range(Environment.shape[0]-1):
    MazeInfo = []
    m,n = location[i]
    for j in len(Environment[i]):
            if j==4:
                MazeInfo.append("Location")
            else:
                if (Environment[i]):
                    if (Environment[i]==1):
                        MazeInfo.append("   Path   ")
                    else:
                        MazeInfo.append("   Fire   ")
                else:
                    MazeInfo.append("   Wall   ")
    File = open('Solution2.txt',mode='a')
    File.writelines(['Time : ',str(i),'\n'])
    File.writelines(['Location : ( ',str(int(m)),', ',str(int(n)),' )\n'])
    File.writelines(['Environment : ',MazeInfo[0],MazeInfo[1],MazeInfo[2],'\n'])
    File.writelines(['                      ',MazeInfo[3],MazeInfo[4],MazeInfo[5],'\n'])
    File.writelines(['                      ',MazeInfo[6],MazeInfo[7],MazeInfo[8],'\n'])
    File.writelines(['Action : ',Action[i]])
    File.writelines(['\n','\n'])
    File.close()