In [1]:
import numpy as np
import pandas as pd
import tkinter as tk
import time

In [7]:
class agent:
    def __init__(self, epsilon, gamma, alpha, 
                 state, actions, q_table):
        self.epsilon = epsilon # epsilon-greedy policy, default = 0.9
        self.gamma = gamma # decay-rate, default = 0.9
        self.alpha = alpha # learning-rate, default = 0.1
        
        self.state = state # coordinate tuple, example, (2,3)
        self.actions = actions # 1-D action list, ['up', 'down', 'left', 'right']
        self.q_table = q_table # 3-D array, 3rd dimension for action reward
    
    def choose_action(self):
        state_actions = self.q_table[self.state] # numpy 
        # get random rate and compare to epsilon
        randomRate = np.random.uniform()
        if randomRate > self.epsilon :
            action = self.actions.index(np.random.choice(self.actions)) # choose randomly
        elif state_actions.max() == 0 :
            p=[]
            count=0
            for idx, reward in enumerate(state_actions) :
                if reward == 0:
                    p.append(1)
                    count+=1
                else:
                    p.append(0)
            
            p = np.array(p)/count
            action = self.actions.index(np.random.choice(self.actions, p=list(p))
        else:
            action = state_actions.argmax() # derive action's index with highest reward
        
        return action
    
    def update_q_table(self, reward, action, nxt_state):
        # Q predict : choose action's reward at current state on q_table
        q_predict = self.q_table[self.state][action]
        # Q target : consider next step reward
        if nxt_state in ['win', 'fail'] :
            q_target = reward
        else:
            q_target = reward + self.gamma * self.q_table[nxt_state][action]
        # accumulate
        self.q_table[self.state][action] += self.alpha * (q_target - q_predict) 
        self.state = nxt_state # update state

SyntaxError: invalid syntax (<ipython-input-7-f98dd6936e5c>, line 30)

In [None]:
class maze_env:
    def __init__(self, actions):
        self.unit = 40
        self.actions = actions
        
    def build_map(self, size, target, t_reward, fail_list, f_reward_list):
        # check target
        if not (target[0] < size[0] and target[1] < size[1]) :
            print("target out of bound !")
            return
        if not t_reward > 0:
            print("target\'s reward should be positive !")
        # check fail block
        for idx, fail in enumerate(fail_list):
            if not (fail[0] < size[0] and fail[1] < size[1]):
                print('No '+str(idx)+'. fail block out of bound !')
                return
            if not f_reward_list[idx] < 0:
                print('No '+str(idx)+'. fail\'s reward should be negative !')
                return 
            if fail == target :
                print('No '+str(idx)+'. fail have same coordinate with target !')
                return
        self.size = size
        self.target = target
        self.fail_list = fail_list
        
        self.map = np.zeros((size))
        self.map[target] = t_reward
        for idx, fail in enumerate(fail_list):
            self.map[fail] = f_reward_list[idx]
        
        print('game map : ')
        print(self.map)
        return
    
    def env_feedback(self, state, action):
        # strategy : calculate coordinate first, and then check finish or not
        nxt_state = self.cal_coordinate(state, action)
        
        reward = self.map[nxt_state]
        if nxt_state in self.fail_list:
            nxt_state = 'fail'
        elif nxt_state == self.target:
            nxt_state = 'win'
            
        return nxt_state, reward
    
    def cal_coordinate(self, state, action):
        nxt_state = ()
        move = ()
        
        if action == 0: # up
            move = (-1,0)
            if state[0] == 0: 
                nxt_state = state # hit the top wall
            else: 
                nxt_state = tuple([sum(x) for x in zip(state,move)])
                
        elif action == 1: #down
            move = (1,0)
            if state[0] == self.size[0]-1 : 
                nxt_state = state # hit the bottom wall
            else: 
                nxt_state = tuple([sum(x) for x in zip(state,move)])
                
        elif action == 2: #left
            move = (0,-1)
            if state[1] == 0: 
                nxt_state = state # hit the left wall
            else: 
                nxt_state = tuple([sum(x) for x in zip(state,move)])
                
        elif action == 3: # right
            move = (0,1)
            if state[1] == self.size[1]-1 : 
                nxt_state = state # hit the right wall
            else: 
                nxt_state = tuple([sum(x) for x in zip(state,move)])
        
        return nxt_state
    
    def create_q_table(self):
        q_table = np.zeros(self.size + (len(self.actions),))
        print('Q_table.shape :')
        print(q_table.shape)
        return np.array(q_table)

# preprocess

In [None]:
EPISODES = 100
ACTIONS = ['up', 'down', 'left', 'right']
initSTATE = (0,0)
SIZE = (3,3) # maze size

EPSILON = 0.9
GAMMA = 0.9
ALPHA = 0.1


In [None]:
target = (2,2)
t_reward = 5
fail_list = [(0,2),(1,2)]
f_reward_list = np.random.randint(low=-7, high=-1, size=len(fail_list))

Maze = maze_env(ACTIONS)
Maze.build_map(SIZE, target, t_reward, fail_list, f_reward_list)

In [None]:
Q_table = Maze.create_q_table()

In [None]:
Agent = agent(epsilon= EPSILON, gamma= GAMMA, alpha= ALPHA,
             state = initSTATE, actions= ACTIONS,q_table= Q_table.copy())

# basic process 

In [None]:
def path(state, is_terminated):
    print(state, end='')
    if not is_terminated: print(' > ', end='')

# main process RL - Q_Learning
for episode in range(EPISODES):
    Agent.state = initSTATE
    is_terminated = False
    count = 0
    while not is_terminated :
        # choose action and get env. feedback
        action = Agent.choose_action()
#         print('action:'+str(action), end='')
        nxt_state, reward = Maze.env_feedback(state=Agent.state, action=action)
#         print('nxt_state:'+str(nxt_state), end='')
        # update Q_table
        Agent.update_q_table(reward=reward, action=action, nxt_state=nxt_state)
        
        if nxt_state in ['win','fail']:
            is_terminated = True
        # update visual info
        path(Agent.state, is_terminated)
        Agent.state = nxt_state
        count +=1
        time.sleep(0.05)
        
    print('\n Episode. '+str(episode)+' finished ... ,total step : '+str(count))
    time.sleep(2)
    

# Give punish when bumping the wall

In [None]:
def path(state, is_terminated):
    print(state, end='')
    if not is_terminated: print(' > ', end='')

# main process RL - Q_Learning
for episode in range(EPISODES):
    Agent.state = initSTATE
    is_terminated = False
    count = 0
    while not is_terminated :
        # choose action and get env. feedback
        action = Agent.choose_action()
#         print('action:'+str(action), end='')
        nxt_state, reward = Maze.env_feedback(state=Agent.state, action=action)
#         print('nxt_state:'+str(nxt_state), end='')
        # update Q_table
        ###########################################
        if Agent.state == nxt_state: # bump the wall
            reward = -1
        ###########################################
        Agent.update_q_table(reward=reward, action=action, nxt_state=nxt_state)
        
        if nxt_state in ['win','fail']:
            is_terminated = True
        # update visual info
        path(Agent.state, is_terminated)
        Agent.state = nxt_state
        count +=1
        time.sleep(0.05)
        
    print('\n Episode. '+str(episode)+' finished ... ,total step : '+str(count))
    time.sleep(2)
    

In [None]:
Agent.q_table

In [None]:
Maze = maze_env(size=(5,3), actions=['up', 'down', 'left', 'right'])

In [None]:
size=5
a = np.random.randint(low=0, high=7, size=20)
print(a.min())
a = tuple(a)


In [None]:
import time
for idx in range(5):
    a = np.random.randint(1,6,(5,4))
    time.sleep(1)
    print('\r{}'.format(a),end='')

In [None]:
UNIT = 40   # pixels
MAZE_H = 4  # grid height
MAZE_W = 4  # grid width


class Maze(tk.Tk, object):
    def __init__(self):
        super(Maze, self).__init__()
        self.action_space = ['u', 'd', 'l', 'r']
        self.n_actions = len(self.action_space)
        self.title('maze')
        self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
#         self._build_maze()
        
maze = Maze()

In [None]:
a = np.zeros((1,2,4))
b = (0,1)
a[b][3]

In [None]:
a = np.array([0,1,3,2])
a.argmax()

In [None]:
a=(1,2);b=(1,2)
# c = tuple([sum(x) for x in zip(a,b)])
# c
if a==b:
    print('T')
else:
    print('F')

In [None]:
a=[1,2,3,4]
a/5
a