In [None]:
import numpy as np
import copy
import random

In [None]:
# Create Gridworld object

class Gridworld:
    
    def __init__(self,dim):
        
        self.dim = dim
        self.gid_world = []
        self.reward_world = []
        
        self.gid_world_original = []
        self.reward_world_original = []
        
        self.free_fields = []
        self.goal = []
        self.tele_idx = []
        
        self.finished = False
    
    def build(self):
        
        ######### Build GW #########

        for x in range(self.dim+1):
            self.grid_world = ["O"] * (x * x)

        # Create obstacles

        self.grid_world = np.array(self.grid_world)

        num_obs = int((self.dim*self.dim)/8) # specify number of obstacles

        obstacle_indices = np.random.choice(np.arange(1,self.grid_world.size), replace=False, size=num_obs) # start at one to not place obstacles at player pos

        self.grid_world[obstacle_indices] = "X"
        
        ######### Build RW and Teleport #########

        # Assign rewards to states

        self.reward_world =copy.deepcopy(self.grid_world.flatten()) # create seperate array for reward

        self.free_fields = [x for x in np.arange(self.reward_world.size) if x not in obstacle_indices if x != 0] # generate list of free fields

        np.random.shuffle(self.free_fields) # shuffle free fields

        self.goal = self.free_fields[-1] # choose index for postive reward (terminal state)
        
        self.tele_idx = [self.free_fields[-2],self.free_fields[-3]]# choose 2 indices for teleports
        
        rew_neg_amount = 5 # specify number of negative rewards

        rew_neg_indices = np.random.choice(self.free_fields[1:-3], replace=False, size=rew_neg_amount) # randomly choose incides for negative rewards

        self.reward_world[rew_neg_indices] = "-" # place negative rewards

        self.reward_world[self.goal] = "+" # place positive reward (terminal state)
        
        self.reward_world[self.tele_idx] = "§" # place teleports
        
        
        # Set starting point #

        self.grid_world[0] = "P" # top left
        

        # Save Backup

        self.reward_world_original = copy.deepcopy(self.reward_world) 
        
        self.grid_world_original = copy.deepcopy(self.grid_world)
        
        
        return self.grid_world, self.reward_world
    
    def reset(self):
        
        self.reward_world = copy.deepcopy(self.reward_world_original)
        self.grid_world = copy.deepcopy(self.grid_world_original)

        return self.grid_world, self.reward_world

    def move(self, action):
        
        idx = np.where(self.grid_world == "P")[0]
        idx = idx[0]
        
        self.reward = 0 # maybe introduce an immediate reward of -0.3 for all moves that dont land on a reward field
        
        if action == "left":
            
            # if path is Oob
            
            if idx == 0 or (idx%self.dim) == 0:
        
                return self.grid_world, self.finished, self.reward
            
            else:
                
                if self.grid_world[idx-1]  == "X":
                    
                    return self.grid_world, self.finished, self.reward
                
                # check for teleports and teleport with 60% probability
                
                elif idx-1 == self.tele_idx[0] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[1]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                    
                elif idx-1 == self.tele_idx[1] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[0]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                   
                # if path isnt blocked
                
                else:
                    
                    # check for rewards
                    
                    if self.reward_world[idx-1] == "+":
                        
                        self.reward = +10
                        self.finished = True
                        
                    elif self.reward_world[idx-1] == "-":
                        
                        self.reward = -1
                        
                    # move player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[idx-1] = "P"
                
                return self.grid_world, self.finished, self.reward
            
        elif action == "right":
            
            # if path is Oob
            
            if idx == (self.dim-1) or idx == (len(self.grid_world)-1):
                
                return self.grid_world, self.finished, self.reward
            
            else:
                
                if self.grid_world[idx+1] == "X":
                    
                    return self.grid_world, self.finished, self.reward
                
                # check for teleports and teleport with 60% probability
                
                elif idx+1 == self.tele_idx[0] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[1]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                    
                elif idx+1 == self.tele_idx[1] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[0]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                
                # if path isnt blocked
                
                else:
                    
                    # check for rewards
                    
                    if self.reward_world[idx+1] == "+":
                        
                        self.reward = +1
                        self.finished = True
                        
                    elif self.reward_world[idx+1] == "-":
                        
                        self.reward = -1
                        
                    # move player
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[idx+1] = "P"
                    
                    return self.grid_world, self.finished, self.reward
            
        elif action == "up":
            
             # if path is Oob
            
            if idx in range(0,(self.dim-1)):
                
                return self.grid_world, self.finished, self.reward
            
            else:
                
                if self.grid_world[idx-self.dim] == "X":
                    
                    return self.grid_world, self.finished, self.reward
                
                # check for teleports and teleport with 60% probability
                
                elif idx-self.dim == self.tele_idx[0] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[1]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                    
                elif idx-self.dim == self.tele_idx[1] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[0]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                
                # if path isnt blocked
                
                else:
                    
                    # check for rewards
                    
                    if self.reward_world[idx-self.dim] == "+":
                        
                        self.reward = +1
                        self.finished = True
                        
                    elif self.reward_world[idx-self.dim] == "-":
                        
                        self.reward = -1
                        
                    # move player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[idx-self.dim] = "P"
                    
                    return self.grid_world, self.finished, self.reward
            
        elif action == "down":
            
             # if path is Oob
            
            if idx in range(len(self.grid_world)-self.dim,len(self.grid_world)):
                
                return self.grid_world, self.finished, self.reward
            
            else:
                
                if self.grid_world[idx+self.dim] == "X":
                    
                    return self.grid_world, self.finished, self.reward
                
                # check for teleports and teleport with 60% probability
                
                elif idx+self.dim == self.tele_idx[0] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[1]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                    
                elif idx+self.dim == self.tele_idx[1] and random.randint(0,100) > 40:
                    
                    # teleport player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[self.tele_idx[0]] = "P"
                    
                    return self.grid_world, self.finished, self.reward
                
                # if path isnt blocked
                
                else:
                    
                    # check for rewards
                    
                    if self.reward_world[idx+self.dim] == "+":
                        
                        self.reward = +1
                        self.finished = True
                        
                    elif self.reward_world[idx+self.dim] == "-":
                        
                        self.reward = -1
                        
                    # move player
                    
                    self.grid_world[idx] = "O"
                    
                    self.grid_world[idx+self.dim] = "P"
                    
                    return self.grid_world, self.finished, self.reward
            
        else:
            print("Please choose an action [left,right,up,down]!")        
    
    def visualize(self):
        
        # Show GW
        
        print("Gridworld:\n")
        print(self.grid_world.reshape(((self.dim, self.dim))))
        print("\n")

        # Show RW

        print("Rewardworld:\n")
        print(self.reward_world.reshape(((self.dim, self.dim))))
        print("\n")
        
        pass

In [None]:
# Specify dimensions

while True:    
    try:
        dim = int(input("Please provide your desired grid dimension (dim X dim):\n"))
        
        if dim >= 5:
            break
            
        print("Dimension needs to be larger than 4!\n")
    
    except:
        print("Please provide an integer value!\n")

In [None]:
# Create GW object

gw = Gridworld(dim)

gw.build()

# Show GW
#the state in the reward world with + is the terminal state

gw.visualize()

In [None]:
# gw.move("right")
# gw.visualize()

In [None]:
# gw.reset()
# gw.visualize()

n-step Sarsa, this code is taken from https://www.geeksforgeeks.org/sarsa-reinforcement-learning/

In [None]:
import random 

# set initial parameters
epsilon = 0.4
total_episodes = 2000000
max_steps = 50
alpha = 0.8
gamma = 0.9

# define actionspace
action_space=[0,1,2,3]
dict_moves={0:"left",1:"right",2:"up",3:"down"}

# list to save the SAR
n_list=[]
 
# initialize Q value table 
Q = np.zeros((dim*dim,len(action_space))) # dim: number of states x actions

In [None]:
def choose_action(state):
    
    if np.random.uniform(0, 1) < epsilon:
        
        # sample random action
        action = random.sample(action_space,1)[0] 
        
        return action
        
    else:
        
        # get best action (maximize Q value)
        action = np.argmax(Q[state, :]) # requires index of the player (state)
        
        return action

In [None]:
def update(state1,state2,reward,action1,action2):
    
#     print("current state (s): ",state1, "\n", "next state (s'): ",
#           state2,"\n","reward (r): ",reward,"\n", "current action (a): ",
#           dict_moves[action1],"\n", "next action (a'): ",dict_moves[action2],"\n")
    
    idx1 = np.where(state1 == "P")[0][0]
    #print("index 1: ",idx1)
    
    idx2 = np.where(state2 =="P")[0][0]
    #print("index 2: ",idx2)
    
    # get estimates
    Q_old = Q[idx1, action1]
    Q_new = reward + gamma * Q[idx2, action2]
    Q[idx1, action1] = Q_old + alpha * (Q_new - Q_old) 
    
    pass


In [None]:
# initializing the reward
reward = 0

# epoch_guide

ep_ = 0

# Starting the SARSA learning

for episode in range(total_episodes):
    
    # every 100000 epochs
    if (ep_ % 100000) == 0 and ep_ != 0:
        print("Epoch: " + str(ep_))
        
        # check exploration status
        unique, counts = np.unique(Q, return_counts=True)
        tmp = dict(zip(unique, counts))
        if tmp[0] < ((dim*dim)/4): # if a less then a quarter of Q values is unexplored end the algorithm
            break
        else:
            print("Unexplored: ",(tmp[0]/Q.size)*100, "%")
    
    t = 0 # reset temperatur during episodes
    state1, _ = gw.reset() # reset gridworld and get initial state    
    idx_state1 = np.where(state1 == "P")[0][0] # state needs to be an index for the condition work 
    action1 = choose_action(idx_state1) # get current action

    while t < max_steps: 
         
        # getting the next state
        state2, done, reward = gw.move(dict_moves[action1])
        
        # get index of player in next state
        idx_state2 = np.where(state2 == "P")[0][0]
        
        # choose the next action
        action2 = choose_action(idx_state2) # state needs to be an index for the condition work
         
        # calculate the Q-value
        update(state1, state2, reward, action1, action2)
         
        # update state and action
        state1 = state2
        action1 = action2
         
        # update the respective vaLues
        t += 1
        #reward += 1
         
        #If at the end of learning process
        if done:
            ep_ += 1
            break
        else:
            ep_ += 1
            
print("Learning finished!")        

In [None]:
# Print final Q value table
print(Q)

In [None]:
# print learned policy for every state

moves_ls = []
dummy = np.array([0.,0.,0.,0.])

for i in Q:
    if (i==dummy).all(): # if state only has value of 0s
        moves_ls.append("None")
        
    else:
        moves_ls.append(dict_moves[np.argmax(i)])
    
moves_ls = np.array(moves_ls)
moves_ls = moves_ls.reshape(dim,dim)

gw.reset() 
gw.visualize()

print("Learned moves: \n")
print(moves_ls)