In [None]:
import numpy as np
import matplotlib.pyplot as plt
import random

In [None]:
actions = np.array([
    [1, 0], [-1, 0], [0, 1], [0, -1]
])

In [None]:
class Map:
    color_list = {10: 'blue', 1: 'orange', 0: 'g', -1: 'y', -100: 'r'}

    def __init__(self, height, width, negative_rate, cliff_rate):
        # All empty
        self.game_map = np.zeros((height, width))

        # Add random negative terrain AND cliff
        total = height*width
        negative_num = int(negative_rate*total)
        cliff_num = int(cliff_rate*total)
        li = np.random.choice(range(total), negative_num+cliff_num)
        for i in li[0:negative_num]:
            self.game_map[i//width][i % width] = -1
        for i in li[negative_num:]:
            self.game_map[i//width][i % width] = -100

        # Start and end points are always 0
        self.game_map[0][0] = 1
        self.game_map[height-1][width-1] = 10

    # function to add grid
    def fill_grid(self, x, y, xw, yw, color):
        plt.fill_between(np.linspace(x-xw, x+xw, 10),
                         y+yw, y-yw, 
                         facecolor=color)
        return
    
    # funcion to draw a arrow
    def draw_arrow(self, x, y, d_x, d_y, size, color):
        plt.quiver(x-d_x*size, y-d_y*size, 
                    d_x*size*2, d_y *size*2,
                    units='xy', scale=1, color=color)
        return

    def display_gamemap(self):
        plt.figure(figsize=(9, 3))
        
        # int -> color
        for i in range(self.game_map.shape[0]):
            for j in range(self.game_map.shape[1]):
                self.fill_grid(j+1, i+1,
                               0.48, 0.48, 
                               self.color_list[self.game_map[i][j]])

        plt.yticks(np.arange(1, self.game_map.shape[0]+1, 1))
        plt.xticks(np.arange(1, self.game_map.shape[1]+1, 1))
        plt.show()
        return

    def display_gamemap_and_arrows(self, guide):
        plt.figure(figsize=(9, 3))
            
        # int -> color
        for i in range(self.game_map.shape[0]):
            for j in range(self.game_map.shape[1]):
                self.fill_grid(j+1, i+1, 
                               0.48, 0.48, 
                               self.color_list[self.game_map[i][j]])
                self.draw_arrow(j+1,i+1,
                        actions[guide[i][j]][1], actions[guide[i][j]][0],
                        0.4, 'black')

        plt.yticks(np.arange(1, self.game_map.shape[0]+1, 1))
        plt.xticks(np.arange(1, self.game_map.shape[1]+1, 1))


game_map = Map(4, 12, 0.3, 0.1)
game_map.display_gamemap()

In [None]:
class SARSA:
    epsilon = 0.4
    alpha = 0.0002
    gamma = 0.98
    TotalSteps = 400000
    TtoC = {1:-1, 10:-1, 0:-1, -1:-5, -100:-100}

    def __init__(self, game_map, actions) -> None:
        self.game_map = game_map
        self.BeginPoint = np.array([0, 0])
        self.EndPoint = np.array([game_map.shape[0]-1, game_map.shape[1]-1])
        self.actions = actions
        self.Q = np.zeros((game_map.shape[0], game_map.shape[1], actions.shape[0]))
        return

    # return the index of selected action
    def epsilon_greedy(self, pos) -> int:
        Q_actions = self.Q[pos[0]][pos[1]]
        if self.epsilon < np.random.rand():
            return np.argmax(Q_actions)
        else:
            return random.sample(range(len(Q_actions)), 1)[0]

    def nexts(self, s1, act_index) -> np.array:
        s2 = s1+self.actions[act_index]
        s2[0] = np.clip(s2[0], 0, self.game_map.shape[0]-1)
        s2[1] = np.clip(s2[1], 0, self.game_map.shape[1]-1)
        return s2

    def Reward(self, dst) -> int:
        return self.TtoC[self.game_map[dst[0]][dst[1]]]

    def update(self) -> None:
        List = []
        success = []
        Record = []
        for episode in range(1, self.TotalSteps+1):

            s1 = np.array([0, 0])
            SUM = 0
            lenth = 0
            while True:
                s1_act_index = self.epsilon_greedy(s1)
                s2 = self.nexts(s1, s1_act_index)
                reward = self.Reward(s2)
                s2_act_index = self.epsilon_greedy(s2)

                SUM += reward
                lenth += 1

                q1 = self.Q[s1[0]][s1[1]][s1_act_index]
                q2 = self.Q[s2[0]][s2[1]][s2_act_index]

                # TD_error=Q_now-TD_target
                TD_error = q1-(reward+self.gamma*q2)

                self.Q[s1[0]][s1[1]][s1_act_index] -= self.alpha*TD_error
                s1 = s2.copy()

                if self.game_map[s1[0]][s1[1]] == -100 or lenth > 100:
                    success.append(0)
                    break
                elif self.game_map[s1[0]][s1[1]] == 10:
                    success.append(1)
                    break

            List.append(SUM)
            if episode % 1000 == 0:
                self.epsilon *= 0.99
                Record.append(np.average(List[-1000:]))
                print('Train ---- episode={}, ave_successes={:.3f} ave_reward={:.3f} '.format(
                    episode,
                    np.average(success[-1000:]),
                    Record[-1])
                )
        return Record

    # output
    def get_guide(self) -> np.array:
        guides = np.zeros(self.game_map.shape)
        for i in range(guides.shape[0]):
            for j in range(guides.shape[1]):
                guides[i][j] = np.argmax(self.Q[i][j])
        return guides.astype(np.int32)


sarsa = SARSA(game_map.game_map, actions)
SList = sarsa.update()
Sguide = sarsa.get_guide()

In [None]:
plt.plot(SList)
plt.xlabel('episodes(x1000)')
plt.ylabel('Reward')

In [None]:
game_map.display_gamemap_and_arrows(Sguide)

In [None]:
class Muti_SARSA:
    epsilon = 0.4
    alpha = 0.0002
    gamma = 0.98
    TotalSteps = 400000
    TtoC = {1:-1, 10:-1, 0:-1, -1:-5, -100:-100}

    def __init__(self,game_map,actions) -> None:
        self.game_map=game_map
        self.BeginPoint=np.array([0,0])
        self.EndPoint=np.array([game_map.shape[0]-1,game_map.shape[1]-1])
        self.actions=actions
        self.Q=np.zeros((game_map.shape[0],game_map.shape[1],actions.shape[0]))
        return
    
    # return the index of selected action
    def epsilon_greedy(self,pos) -> int:
        Q_actions=self.Q[pos[0]][pos[1]]
        if self.epsilon<np.random.rand():
            return np.argmax(Q_actions)
        else:
            return random.sample(range(len(Q_actions)),1)[0]
    
    def nexts(self,s1,act_index) -> np.array:
        s2=s1+self.actions[act_index]
        s2[0]=np.clip(s2[0],0,self.game_map.shape[0]-1)
        s2[1]=np.clip(s2[1],0,self.game_map.shape[1]-1)
        return s2

    def Reward(self,dst) -> int:
        return self.TtoC[self.game_map[dst[0]][dst[1]]]

    def update(self) -> None:
        List=[]
        success=[]
        Record=[]
        for episode in range(1,self.TotalSteps+1):

            s1=np.array([0,0])
            SUM=0
            lenth=0
            while True:
                s1_act_index=self.epsilon_greedy(s1)
                s2=self.nexts(s1,s1_act_index)
                reward1=self.Reward(s2)

                s2_act_index=self.epsilon_greedy(s2)
                s3=self.nexts(s2,s2_act_index)
                reward2=self.Reward(s3)

                s3_act_index=self.epsilon_greedy(s3)
                s4=self.nexts(s3,s3_act_index)
                reward3=self.Reward(s4)

                s4_act_index=self.epsilon_greedy(s4)

                SUM+=reward1
                lenth+=1
                
                q1=self.Q[s1[0]][s1[1]][s1_act_index]
                q4=self.Q[s4[0]][s4[1]][s4_act_index]

                # TD_error=Q_now-TD_target
                TD_error=q1-(reward1+pow(self.gamma,1)*reward2+pow(self.gamma,2)*reward3+pow(self.gamma,3)*q4)

                self.Q[s1[0]][s1[1]][s1_act_index]-=self.alpha*TD_error
                s1=s2.copy()

                if self.game_map[s1[0]][s1[1]]==-100 or lenth>100:
                    success.append(0)
                    break
                elif self.game_map[s1[0]][s1[1]]==10:
                    success.append(1)
                    break
            
            List.append(SUM)
            if episode%1000==0:
                self.epsilon*=0.99
                Record.append(np.average(List[-1000:]))
                print('Train ---- episode={}, ave_successes={:.3f} ave_reward={:.3f} '.format(
                    episode, 
                    np.average(success[-1000:]),
                    Record[-1])
                )
        return Record
    
    def get_guide(self) -> np.array:
        guides=np.zeros(self.game_map.shape)
        for i  in range(guides.shape[0]):
            for j in range(guides.shape[1]):
                guides[i][j]=np.argmax(self.Q[i][j])
        return guides.astype(np.int32)

        

muti_sarsa=Muti_SARSA(game_map.game_map,actions)
muti_List=muti_sarsa.update()
muti_guide=muti_sarsa.get_guide()

In [None]:
plt.plot(muti_List)
plt.xlabel('episodes(x1000)')
plt.ylabel('Reward')

In [None]:
game_map.display_gamemap_and_arrows(muti_guide)