# Implementation of SARSA solving a gridworld environment

In [25]:
import numpy as np
import random as r
from operator import add
from tqdm import tqdm


In [26]:
class Gridworld:

    def __init__(self, size=8, reward=10):
        self.size = size
        self.reward = reward

        # Create 2d list with filled with zeros
        self.grid = [[0] * self.size for i in range(self.size)]

        # Current state of agent
        self.current_state = None
        self.blocked_fields = None
        self.goal_idx = None
        self.done = False

    def reset(self):

        # Input error correction
        if self.size < 5 or isinstance(self.size, int) == False:
            print("The size entered for the grid is too small or was no integer. Please try an integer bigger than 4.")

        else:
            # Create negative fields
            for j in range(int(self.size)):
                self.grid[r.randint(0, self.size-1)][r.randint(0, self.size-1)] = r.uniform(-1, 0)

            # Saves indeces of blocked field
            blocked_fields_idx = []
            # Create blocked fields
            for k in range(int(self.size/1.5)):
                blocked_fields = [r.randint(0, self.size-1), r.randint(0, self.size-1)]
                blocked_fields_idx.append(blocked_fields)

                self.grid[blocked_fields[0]][blocked_fields[1]] = "X"

            # Save list of blocked fields for step() function
            self.blocked_fields = blocked_fields_idx

            # initial index of terminal state
            goal_idx = [r.randint(0, self.size-1), r.randint(0, self.size-1)]

            # Check adjacent fields to index of terminal state and update until no blocked field is adjacent
            while list(map(add, goal_idx, [0, -1])) in blocked_fields_idx or list(
                    map(add, goal_idx, [0, +1])) in blocked_fields_idx or list(
                    map(add, goal_idx, [-1, 0])) in blocked_fields_idx or list(
                    map(add, goal_idx, [+1, 0])) in blocked_fields_idx:
                goal_idx = [r.randint(0, self.size-1), r.randint(0, self.size-1)]

            # Save index of terminal state for step() function
            self.goal_idx = goal_idx

            # Create terminal state with positive reward
            self.grid[goal_idx[0]][goal_idx[1]] = self.reward

            starting_point_idx = [r.randint(0, self.size-1), r.randint(0, self.size-1)]

            # Update starting point until it is a zero value (Not blocked- or terminal field)
            while self.grid[starting_point_idx[0]][starting_point_idx[1]] != 0:
                starting_point_idx = [r.randint(0, self.size-1), r.randint(0, self.size-1)]

            # Set current state to starting state
            self.current_state = starting_point_idx

            return self.state_as_int(self.current_state)

    def step(self, action: int, new_state: list) -> tuple[int, int, bool]:
        '''
        Moves agent in grid

        args:
        action(int): How agent should move (0=left, 1=right, 2=up, 3=down)
        '''
        self.current_state: list = new_state

        # Left
        if action == 0:
            # Compute result of action
            test_state = list(map(add, self.current_state, [0, -1]))

            # New list index is out of bounds (left)
            if test_state[1] == -1:
                #print("Wall")
                pass

            # New list index is on field marked with "X"
            elif test_state in self.blocked_fields:
                #print("field blocked")
                pass

            # Action lead to terminal state
            elif test_state == self.goal_idx:
                #print("Terminal state reached")
                self.current_state = test_state
                done = True

            # Update state (take action)
            else:
                self.current_state = test_state
                self.done = False
            
            # Return new state, reward & bool wether terminal state is reached
            return self.state_as_int(self.current_state), self.grid[self.current_state[0]][
                self.current_state[1]], self.done

        # Right
        if action == 1:
            # Compute result of action
            test_state = list(map(add, self.current_state, [0, +1]))

            # New list index is out of bounds (right)
            if test_state[1] == self.size:
                #print("Wall")
                pass

            # New list index is on field marked with "X"
            elif test_state in self.blocked_fields:
                #print("field blocked")
                pass

            # Action lead to terminal state
            elif test_state == self.goal_idx:
               #print("Terminal state reached")
                self.current_state = test_state
                self.done = True

            # Update state (take action)
            else:
                self.current_state = test_state

            # Return new state, reward & bool wether terminal state is reached
            return self.state_as_int(self.current_state), self.grid[self.current_state[0]][
                self.current_state[1]], self.done


        # Up
        if action == 2:
            # Compute result of action
            test_state = list(map(add, self.current_state, [-1, 0]))

            # New list index is out of bounds (right)
            if test_state[0] == -1:
                #print("Wall")
                pass

            # New list index is on field marked with "X"
            elif test_state in self.blocked_fields:
                #print("field blocked")
                pass

            # Action lead to terminal state
            elif test_state == self.goal_idx:
                #print("Terminal state reached")
                self.current_state = test_state
                self.done = True

            # Update state (take action)
            else:
                self.current_state = test_state

            # Return new state, reward & bool wether terminal state is reached
            return self.state_as_int(self.current_state), self.grid[self.current_state[0]][
                self.current_state[1]], self.done
                

        # down
        if action == 3:
            # Compute result of action
            test_state = list(map(add, self.current_state, [+1, 0]))

            # New list index is out of bounds (right)
            if test_state[0] == self.size:
                #print("Wall")
                pass

            # New list index is on field marked with "X"
            elif test_state in self.blocked_fields:
                #print("field blocked")
                pass

            # Action lead to terminal state
            elif test_state == self.goal_idx:
                #print("Terminal state reached")
                self.current_state = test_state
                self.done = True

            # Update state (take action)
            else:
                self.current_state = test_state

            # Return new state, reward & bool wether terminal state is reached
            return self.state_as_int(self.current_state), self.grid[self.current_state[0]][
                self.current_state[1]], self.done

        #print(self.current_state)

    def state_as_int(self, state: list) -> int:
        return (state[0] * self.size) + state[1]

    def int_as_state(self, index):
        return [int(index / self.size), int(index % self.size)]

    def visualise(self):
        print(self.current_state, self.goal_idx)
        # Print in matrix format - Rest of program still uses lists
        print(np.matrix(self.grid))


# SARSA implementation

In [27]:
# Hyperparameters
episodes = 60000
steps = 100
alpha = 0.8
gamma = 0.9

state_space = 6
action_space = 4
grid = Gridworld(state_space)
start_state = grid.reset()

q_values = np.zeros(((state_space*state_space), action_space))


In [28]:

# Function to learn the Q-value for state, action pair
# Represents TD Error for SARSA
def update(state, new_state, reward, action, new_action):
    prediction = q_values[state, action]
    target = reward + gamma * q_values[new_state, new_action]
    q_values[state, action] = q_values[state, action] + alpha * (target - prediction)


# Starting the SARSA learning
for episode in tqdm(range(episodes)):
    t = 0
    
    action = np.argmax(q_values[start_state, :])
    state = start_state

    while t < steps:
        state = grid.int_as_state(state)
        #print(state)
        # Getting the next state
        new_state, reward, done = grid.step(action, state)

        # Choosing random next action with small chance
        if t == int(0.2 * steps) or t == int(0.4 * steps) or t == int(0.6 * steps) or t == int(0.8 * steps):
            new_action = r.randint(0,3)
        
        # Choose next action from Q-Value matrix
        else:
            new_action = np.argmax(q_values[new_state, :])

        # Updating the Q-value matrix
        update(state, new_state, reward, action, new_action)

        state = new_state
        action = new_action

        # Updating the respective vaLues
        t += 1

        # Terminal state is reached
        if done:
            break

print(q_values)

100%|██████████| 60000/60000 [02:39<00:00, 375.74it/s]

[[ 1.64318419e-240  1.60076101e+000 -5.17399826e-001 -1.17888402e-008]
 [-3.41084246e-042 -9.77898022e-002 -5.17399826e-001 -1.48539310e-006]
 [-1.58227054e-027  1.60256000e+000  0.00000000e+000  0.00000000e+000]
 [-1.00376131e-001  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.00000000e+000  0.00000000e+000  0.00000000e+000  0.00000000e+000]
 [ 0.0




In [29]:
# Test Q-Values

moves = 100

for i in range(moves):
    action = np.argmax(q_values[new_state, :])

    new_state, reward, done = grid.step(action)
    print(new_state)
    print(reward)

    if done:
        print("finished")
    


TypeError: step() missing 1 required positional argument: 'new_state'