In [24]:
import numpy as np
# Generate environment,'#' is forbidden area,'x' is target area
env = np.array([
    ['*','*','*','*','*'],
    ['*','#','#','*','*'],
    ['*','*','#','*','*'],
    ['*','#','x','#','*'],
    ['*','#','*','*','*']
])

class q_learning():
    def __init__(self, env, _lambda,k,alpha = 0.1, step = 100,threshold = 1e-3):
        self.env = env
        self._lambda = _lambda # discount rate
        self.k = k # maximum number of iterations
        self.alpha = alpha # learning rate
        self.step = step # number of steps in each episode
        self.threshold = threshold # threshold for convergence

        self.m,self.n = self.env.shape
        self.action_num = 5
        self.v = np.zeros((self.m,self.n)) # State value
        self.q = np.zeros((self.m,self.n,self.action_num)) # Action-value
        self.policy = np.zeros((self.m,self.n),dtype=int) # Target policy
        self.returns = np.zeros((self.m,self.n,self.action_num)) # accumulated returns
        self.num = np.zeros((self.m,self.n,self.action_num)) # number of visits

    def next_state(self,x,y,a):
        """return the next state index"""
        xx, yy = [-1, 0, 1, 0, 0], [0, 1, 0, -1, 0] # action space（up, right, down, left, stay）
        reward = 0
        isboundary = False
        x_next = x + xx[a]
        y_next = y + yy[a]
        # check the boundary
        if x_next < 0 or x_next >= self.m or y_next < 0 or y_next >= self.n: 
            x_next, y_next = x, y
            isboundary = True
        # target area
        if self.env[x_next,y_next] == 'x' and not isboundary:
            reward = 1
        # boundary area
        elif isboundary:
            reward = -1
        # forbidden area
        elif self.env[x_next,y_next] == '#': 
            reward = -1
        return x_next, y_next, reward

    def generate_episode(self):
        """generate an episode"""
        episode = []
        # generate initial state
        while True:
            x = np.random.randint(0,self.m)
            y = np.random.randint(0,self.n)
            if self.env[x,y] != '#' and self.env[x,y] != 'x':
                break
        # generate episode by uniform distribution(Πb)
        for _ in range(self.step):
            a = np.random.randint(0,self.action_num)
            x_next, y_next, reward = self.next_state(x,y,a)
            episode.append((x,y,a,reward))
            x, y = x_next, y_next
        return episode
    
    def policy_update(self):
        """update the policy"""
        while self.k > 0:
            self.k -= 1
            episode = self.generate_episode()
            for t in range(len(episode)-1):
                # updata the q-value for (St,at)
                s_t_x = episode[t][0] # St
                s_t_y = episode[t][1]
                a_t = episode[t][2] # at
                r_t_1 = episode[t][3] # rt+1
                s_t_1_x = episode[t+1][0] # St+1
                s_t_1_y = episode[t+1][1]
                TD_target = r_t_1 + self._lambda * np.max(self.q[s_t_1_x,s_t_1_y])
                TD_error = TD_target - self.q[s_t_x,s_t_y,a_t]
                self.q[s_t_x,s_t_y,a_t] = self.q[s_t_x,s_t_y,a_t] + self.alpha * TD_error
            
                # update target policy for St
                self.policy[s_t_x,s_t_y] = np.argmax(self.q[s_t_x,s_t_y])

    def show_policy(self):
        """show the optimal policy"""
        s = "↑→↓←O" # action display
        for x in range(self.m):
            for y in range(self.n):
                print(s[self.policy[x,y]], end=" ")
            print(" ")
    
if __name__ == "__main__":
    ql = q_learning(env, 
                    _lambda = 0.9,
                    k = 20,
                    alpha = 0.1,
                    step = 100)
    ql.policy_update()
    ql.show_policy()
    

→ → → → ↓  
↑ ↑ → → ↓  
↑ ← ↓ → ↓  
↑ → O ← ↓  
↑ → ↑ ← ←  
