In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import copy
import os, sys
import random
import pickle
import argparse
import logging
import tensorflow as tf
tf.enable_eager_execution()

np.random.seed(15)
tf.random.set_random_seed(15)
random.seed(15)

np.set_printoptions(precision=2, threshold=np.inf)


In [None]:
class Maze(object):
    WALL = 2
    EMPTY = 8
    LEFT = 0
    RIGHT = 1 # right or forward
    BONUS = 1000
    
    def __init__(self, width, length): 
        self.length = length
        self.width = width
        self.maze = np.ones((self.width, self.length)) * Maze.WALL

        self.generate_maze()
        
        #set self.maze_mask
        #self.shortest_solutions
        self.get_shortest_solutions()
        
        #self.longest_shortest, used to calculate objective value
        self.get_longest_shortest_solutions()
        
        # used to normalize objective value
        self.best_score = self.get_attainable_score()

        #initialize the agent position in the maze
        self.reset()
        
        
        
    
    def generate_maze(self):
        # generate walls, doors
        
        spaces = np.random.randint(low=1, high=4, size=self.length)
        cum_spaces = np.cumsum(spaces) # leave the first col empty
 
        for ind, val in enumerate(cum_spaces):
            if val >= self.length-1:
                self.wall_position = cum_spaces[:ind]
                break
        if self.wall_position[0] > 1:
            self.wall_position[0] = 1
        if self.wall_position[-1] < self.length-1:
            self.wall_position = np.append(self.wall_position, self.length-1)
                
        self.road_position = np.array([]).astype(np.int)
        for ind in np.arange(self.length-1):
            if ind not in self.wall_position:
                self.road_position = np.append(self.road_position, ind)
        
        for i in self.road_position:
            self.maze[1:-1,i]=Maze.EMPTY
        
        self.door_position = np.random.randint(low=1, high=self.width-1, size=len(self.wall_position))
        #print(self.door_position)
    
        # get door position
        self.door_position = np.zeros(len(self.wall_position), dtype = np.int)
        self.door_position[-1] = np.random.randint(low=1, high=self.width-1) #1~self.width-2 available door position
        for ind in np.arange(len(self.wall_position)-2, -1, -1):
            if self.wall_position[ind] == self.wall_position[ind+1] -1: # two walls together
                self.door_position[ind] = self.door_position[ind+1]
                
            else:
                self.door_position[ind] = np.random.randint(low=1, high=self.width-1)
        
        # Fill door cue
        self.maze[ self.door_position[-1], self.wall_position[-1] ] = Maze.RIGHT # default last door due
        for i in np.arange(len(self.wall_position)-2, -1, -1):
            if self.door_position[i+1] < self.door_position[i]:
                self.maze[self.door_position[i], self.wall_position[i]] = Maze.LEFT
            else: 
                self.maze[self.door_position[i], self.wall_position[i]] = Maze.RIGHT
                
                
                
       
                
    def print_maze(self, x=-1, y=-1):
        if x>=0 and y>=0:
            tmp = self.maze[x,y]
            self.maze[x,y] = -1 # position of the agent
            
        print("  ", end="")    
        #for i in np.arange(self.length):
        #    print('%d ' % i, end='')
        print("\n")
        
        for j in np.arange(self.width):
            print('%d ' % j, end='')
            for i in np.arange(self.length):
            
                if self.maze[j,i]==Maze.WALL: # wall position
                    print('H ',end='')
                elif self.maze[j,i]==Maze.EMPTY:
                    print('  ',end='')# road
                elif self.maze[j,i]==-1:
                    print('T ',end='')
                    self.maze[x,y]= tmp
                else:
                    print('%d ' % self.maze[j,i], end='')
            print('\n')

        
    def get_shortest_solutions(self):
        # get the shortest length to the end of maze from each layer
        
        self.maze_mask = np.zeros(self.length, dtype=np.int)
        for ind, val in enumerate(self.wall_position):
            self.maze_mask[val] = self.door_position[ind]
       
        self.shortest_solutions = np.zeros(self.length, dtype=np.int)
        step = 0
        next_wall = self.length-1
        for ind in np.arange(self.length-2, -1, -1):
            if self.maze_mask[ind] == 0: # road
                step += 1
                self.shortest_solutions[ind] = step
            else: # wall
                step += np.abs(self.maze_mask[next_wall] - self.maze_mask[ind])+1 #1 out the door, +diff for vert.
                self.shortest_solutions[ind] = step
                next_wall = ind
        

    
    def get_distance_escape(self, x, y):
        # get the shortest distance to escape from the current position
        vertical_distance = 0
        if y in self.road_position:
            for next_wall_ind in np.arange(y+1, y+4, 1):
                if next_wall_ind in self.wall_position: break
            vertical_distance = np.abs(self.maze_mask[next_wall_ind] - x)
        return self.shortest_solutions[y]+vertical_distance
                

        
    def get_longest_shortest_solutions(self):
        # get the shortest length from corner of starting to the end out maze
        left = self.get_distance_escape(1,0)
        right = self.get_distance_escape(self.width-2,0)
        
        self.longest_shortest = np.maximum(left, right)+5 # higher than true value
    
    
    def get_attainable_score(self):
        position = []
        x = self.door_position[0] # in front of the first door
        y = 0
        position.append([x,y])
        
        score = np.float32(0)
        door_signal=self.maze[self.door_position[0], 1]
        r=[]
        self.steps = 0
        
        while True:
            self.steps += 1
            if self.maze[x, y+1]!=Maze.WALL: # road
                y += 1
                if y in self.wall_position:
                    door_signal = self.maze[x,y]
            else: # wall
                if door_signal == 0 and self.maze[x-1,y]==Maze.WALL: # init location make door signal no more signal
                    door_signal = 1
                if door_signal == 1 and self.maze[x+1,y]==Maze.WALL:
                    door_signal = 0
                x += int(door_signal*2-1)
                
            position.append([x,y])
            r.append((self.longest_shortest - self.get_distance_escape(x,y) )/self.longest_shortest-1)
            score += (self.longest_shortest - self.get_distance_escape(x,y) )/self.longest_shortest-1
            if y == self.length-1:
                r[-1] += Maze.BONUS
                score += Maze.BONUS
                break

        self.average_reward = np.mean(r)
     
        return score
        
    """
    def get_attainable_score(self):
        position = []
        x = self.door_position[0] # in front of the first door
        y = 0
        score = np.float32(0)
        pass_maze = 0
        door_signal=self.maze[self.door_position[0], 1]
        r=[]
        for _ in np.arange(300, -1, -1):
            position.append([x,y])
            if y != self.length-1:
                r.append((self.longest_shortest - self.get_distance_escape(x,y) )/self.longest_shortest + pass_maze)
                score += (self.longest_shortest - self.get_distance_escape(x,y) )/self.longest_shortest + pass_maze
            if self.maze[x, y+1]!=Maze.WALL: # road
                y += 1
                if y in self.wall_position:
                    door_signal = self.maze[x,y]
                if y == self.length-1:
                    pass_maze += 1
                    y=0
            else: # wall
                if door_signal == 0 and self.maze[x-1,y]==Maze.WALL: # init location make door signal no more signal
                    door_signal = 1
                if door_signal == 1 and self.maze[x+1,y]==Maze.WALL:
                    door_signal = 0
                x += int(door_signal*2-1)
        
        #print(position)
        self.average_reward = np.mean(r)
     
        return score
    """
    
    def reset(self):
        self.score = 0 
        
        self.position = np.array([self.door_position[-1], 0]) # in front of the last door
        self.trajectory = []
        self.trajectory.append(self.position)
        
        
        x, y = self.position
        observation = self.perception()
        
        return observation
        
    def perception(self):
        x, y = self.position
        observation = np.zeros(6)
        
        if self.maze[x,y+1] == Maze.WALL:
            observation[0]=1
        else: observation[0]=0
        
        if self.maze[x-1,y+1] == Maze.WALL:
            observation[1]=1
        else: observation[1]=0
        
        if self.maze[x+1,y+1] == Maze.WALL:
            observation[2] = 1
        else: observation[2]=0
        
        if self.maze[x-1,y] == Maze.WALL:
            observation[4]=1
        else: observation[4]=0
        
        if self.maze[x+1,y] == Maze.WALL:
            observation[5]=1
        else: observation[5]=0
        
        if y in self.wall_position:
            observation[3] = self.maze[x, y]
            
        return observation
    
    def step(self, action):
        
        x, y = self.position
        
        up = int(action[0])
        down = int(action[1])
        
        crash_wall = False
        pass_wall = False
        if down == 1 and up == 0:
            if self.maze[x+1,y]==Maze.WALL:
                crash_wall = True
 
            if  self.maze[x+1,y] != Maze.WALL:
                self.position = x+1, y
                self.trajectory.append(self.position)

        elif down == 0 and up == 1:
            if self.maze[x-1,y] == Maze.WALL:
                crash_wall = True
            
            if  self.maze[x-1,y] != Maze.WALL:
                self.position = x-1, y
                self.trajectory.append(self.position)


        elif down == 1 and up == 1:
        
            if self.maze[x,y+1] != Maze.WALL:
                pass_wall = True
                self.position = x,y+1
                self.trajectory.append(self.position)
            else:
                crash_wall = True
                
        elif down == 0 and up == 0:
            self.position = x, y
            self.trajectory.append(self.position)
            
        
        x,y = self.position
        reward = (self.longest_shortest - self.get_distance_escape(x,y))/self.longest_shortest -1 
        reward = int(pass_wall) - int(crash_wall)
        self.score +=  reward    
        fitness = self.get_fitness()
        
  
        
        if y == self.length-1:# at the end of the maze 
            done = True
            observation_ = 'Terminal'
            reward += Maze.BONUS # the final reward should be larger than sum of small reward on the way
            self.score += Maze.BONUS
            fitness = self.get_fitness()
        else:
            done = False
            observation_ = self.perception()


        return observation_, fitness, reward, done
            
    """def step(self, action):
        
        x, y = self.position
        
        up = int(action[0])
        down = int(action[1])
        
        if down == 1 and up == 0:
            #if self.maze.maze[x+1,y]==Maze.WALL:
            #    r-= 1
 
            if  self.maze[x+1,y] != Maze.WALL:
                self.position = x+1, y
                self.trajectory.append(self.position)

        elif down == 0 and up == 1:
            #if self.maze.maze[x-1,y] == Maze.WALL:
            #    r-= 1
            
            if  self.maze[x-1,y] != Maze.WALL:
                self.position = x-1, y
                self.trajectory.append(self.position)


        elif down == 1 and up == 1 or down == 0 and up == 0:
        
            if self.maze[x,y+1] != Maze.WALL:
                #r+=20
                self.position = x,y+1
                self.trajectory.append(self.position)
            #else:
            #    r-= 1
        
        x,y = self.position
        reward = (self.longest_shortest - self.get_distance_escape(x,y))/self.longest_shortest -1
        self.score +=  reward    
        fitness = self.get_fitness()
        

        if y == self.length-1:# at the end of the maze 
            done = True
            observation_ = np.zeros(6)
            reward += Maze.BONUS
            self.score+= Maze.BONUS
            fitness = self.get_fitness()
            
        else:
            done = False
            observation_ = self.perception()
            
        

        return observation_, fitness, reward, done
    """
    
    
    def get_fitness(self):
        
        return self.score#/self.best_score 


In [None]:
def Draw(fitness, xlabel="Episodes", ylabel="Fitness", label = 'fitness trend', constant=0):
    plt.plot(np.arange(len(fitness)), fitness, color='blue', label=label,linestyle = '-')
    plt.plot(np.arange(len(fitness)), [constant]*len(fitness), color='red', label="Best",linestyle = '-')
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend() # 显示图例
    plt.show()

def num2action(num):
    # set the output nodes according to the action
    # used when have an action and want to set the output nodes
        numbers = {
            0 : '011',
            1 : '100',
            2 : '110',
            3 : '111'
        }
        return numbers.get(num, None)
    
def action2num(action):
    # get the action according to the output nodes
    # used when have the action and want to get the action index and update q_table
    numbers = {
            '011':0,
            '100':1,
            '110':2,
            '111':3
        }
    return numbers.get(action, None)
    
    
def observation2index(observation): 
    # observation: array
    # get the state index in the q-table
    input_val, marker = 0, 1
        
    for val in observation: # 03456
        if val == 1:
            input_val += marker
        marker *= 2
    return int(input_val)



class QAgent:

    best_input_ids = [0,3,4,5,6]
    best_output_ids = [6,7,8]
    best_gates = np.array([[0,0,0,0,0,0,1,0],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [1,0,0,0,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [1,0,0,0,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,1,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [0,0,0,0,1,0,0,0],
                        [0,0,0,0,0,0,1,0],
                        [1,0,0,0,0,0,0,0],
                        [0,0,0,0,0,0,0,1],
                        [1,0,0,0,0,0,0,0]])



    def __init__(self, input_ids, output_ids, learning_rate=0.01, reward_decay=0.98, e_greedy=0.9):
        
        self.input_ids=input_ids
        self.output_ids=output_ids
        self.lr = learning_rate
        self.gamma = reward_decay
        self.epsilon = e_greedy
        self.q_table = np.zeros((2**len(input_ids), 2**2))
        
        self.memory = 0


    def choose_action(self, observation):
        # observation 03456
        # action 876
        # action selection
        if np.random.uniform() < self.epsilon: # choose best action
            
            input_val = observation2index(observation)
            
            state_action = self.q_table[input_val, :]
            # some actions may have the same value, randomly choose on in these actions
            max_index = np.argwhere(state_action == np.max(state_action)).flatten().tolist()
            output_val = np.random.choice(max_index)
            #print(observation, input_val, state_action, max_index, output_val, num2action(output_val))
        else:
            # choose random action
            output_val = np.random.choice(2**2)
        
        action = num2action(output_val)
        self.memory = int(action[-1])
        
        return action
        
    def learn(self, s, a, r, s_):
        input_val = observation2index(s)
        action = action2num(str(a))
        q_predict = self.q_table[input_val, action]
        
        next_input_val = observation2index(s_)
        if next_input_val != 31:#s_ != 'Terminal':
            q_target = r + self.gamma * self.q_table[next_input_val, :].max()  # next state is not terminal
        else:
            q_target = r  # next state is terminal
        self.q_table[input_val, action] += self.lr * (q_target - q_predict)  # update


def update(input_ids, output_ids):

    agent = QAgent(input_ids, output_ids)
    
    experience_buffer = []
    
    Num_episode = 300
    max_step = 300
    fitness = np.zeros(Num_episode)
    average_reward = np.zeros(Num_episode)
    
    for episode in range(Num_episode):
        if episode > 0 and episode%10 == 0:
            print('.',end='')
        if episode > 0 and episode%500 == 0: print(" ")
            
        maze = Maze(10, 50)
        observation = maze.reset()
        observation = np.append(observation, agent.memory)
        observation = observation[agent.input_ids]
        
        rewards = []
        
        steps = 0
        
        while True:
        
            action = agent.choose_action(observation)        

            observation_, fitness[episode], reward, done = maze.step(action)
            observation_ = np.append(observation_, agent.memory)
            observation_ = observation_[agent.input_ids]
            
            rewards.append(reward)

            #if (episode == Num_episode-1):
            #    print("step :", maze.position, observation, action, reward)
            # RL learn from this transition
            agent.learn(observation, action, reward, observation_)

            # swap observation
            observation = observation_
            
            steps += 1
            if done or steps>max_step: break
            
        average_reward[episode] = np.mean(rewards)

    print("\n agent's average reward: %f, best average reward: %f. Rate:%f" % (np.mean(rewards), maze.average_reward, np.mean(rewards)/maze.average_reward))
    print(agent.q_table)
    Draw(fitness, constant = maze.best_score)
    Draw(average_reward, ylabel = "Average reward", label = 'Reward trend', constant =maze.average_reward)
    
    # end of game
    print('game over')
    with open("./Qsave_model/agent.pickle","wb") as f:
        pickle.dump(agent, f)

np.random.seed(0)
update([0,1,2,3,4,5,6], [6,7,8])#QAgent.best_input_ids, QAgent.best_output_ids

#np.random.seed(9)
#import cProfile
#cProfile.run('test()')

In [None]:


def Rollout():

    agent = QAgent(QAgent.best_input_ids, QAgent.best_output_ids)
    agent.q_table = np.array([[25.09, 25.09, 25.1,  25.09],
     [23.16, 24.88, 23.18, 23.19],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [24.12, 24.28, 25.18, 23.88],
     [24.52, 22.8,  22.74, 22.67],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [25.83, 24.29, 24.32, 24.26],
     [22.83, 23.62, 22.84, 22.83],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [25.42, 25.59, 25.31, 25.33],
     [ 0,    0,    0,    0  ],
     [25.05, 25.04, 25.16, 25.02],
     [ 0,    0,    0,    0  ],
     [24.99, 25.07, 25.17, 25.04],
     [24.92, 23.07, 23.33, 23.12],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [24.62, 24.42, 24.4,  24.38],
     [22.85, 22.64, 22.58, 22.54],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [24.73, 24.37, 25.15, 24.47],
     [22.79, 24.11, 22.75, 22.74],
     [ 0,    0,    0,    0  ],
     [ 0,    0,    0,    0  ],
     [24.02, 24.35, 25.08, 24.21],
     [ 0,    0,    0,    0  ],
     [24.76, 25.06, 24.76, 24.72],
     [ 0,    0,    0,    0  ]])
    with open('./Qsave_model/agent.pickle','rb') as f:
        agent = pickle.load(f)
    
    experience_buffer = []

    max_step = 300


    maze = Maze(10, 50)
    maze.print_maze()
    observation = maze.reset()
    observation = np.append(observation, agent.memory)
    observation = observation[agent.input_ids]

    rewards = []

    steps = 0

    while True:

        action = agent.choose_action(observation)        

        observation_, fitness, reward, done = maze.step(action)
        observation_ = np.append(observation_, agent.memory)
        observation_ = observation_[agent.input_ids]

        rewards.append(reward)

        #if (episode == Num_episode-1):
        #    print("step :", maze.position, observation, action, reward)
        # RL learn from this transition
        agent.learn(observation, action, reward, observation_)

        # swap observation
        observation = observation_

        steps += 1
        if done or steps>max_step: break
            
    print(fitness)
    print("\n agent's average reward: %f, best average reward: %f. Rate: %f" % (np.mean(rewards), maze.average_reward, np.mean(rewards)/maze.average_reward))
    print("\n agent's steps: %f, best steps: %f. Extra: %f "% (steps, maze.steps, steps-maze.steps))
    print(np.stack(maze.trajectory))
    #print(np.stack(rewards))
    # end of game
    print('game over')

Rollout()


