# Task 3: Q-learning for mobile robot navigation

## Libraries 

In [1]:
# importing numpy library
import numpy as np

import matplotlib.pyplot as plt

# auxiliary functions

def action_decode(act_code):
    dirs = {0: "N", 1: "E", 2: "S", 3: "W"}
    return dirs[act_code]

def action_encode(act):
    dir_codes = {"N": 0, "E": 1, "S": 2, "W": 3}
    return dir_codes(act)

def display_learning(series, label):
    n_episodes = len(series)
    show_n = 20
    show_step = int(n_episodes/show_n)
    sequence = []
    for i in range(show_n):
        sequence.append(np.mean(series[show_step*i:show_step*(i+1)]))
        print((i+1) * show_step, ' episodes ', label, sequence[-1])
    print('\n')
    plt.figure()
    plt.plot(sequence)
    plt.ylabel(label)
    plt.xlabel('episodes')

## Grid Environment Class


In [2]:
class grid_env():
# definition of the maze environment

    def __init__(self, width = 5, height = 5, start = [0, 0], debug = False):
        # Contructor methods create the environment with some given options
        self.width = width
        self.height = height
        self.start = start
        self.goal = [self.width - 1, self.height - 1]
        self.debug = debug
        self.n_states = self.width * self.height
        self.reset()
        
    def reset(self):
        # Reset method puts the state at the starting position
        self.pos = self.start[:]   # columns, rows
        return self.pos, 0, False        

    def state_decode(self, obs_code):
        r = obs_code // self.width
        c = obs_code % self.width
        return([c, r])
    
    def state_encode(self, position):
        code = position[0] + position[1] * (self.width) # columns, rows
        return(code)

    def step(self, action):
        # Depending on the action, update the environment state
        if action == "S" and (self.pos[1] < self.height -1):
            self.pos[1] += 1
        elif action == "N" and self.pos[1] > 0:
            self.pos[1] -= 1
        elif action == "W" and self.pos[0] > 0:
            self.pos[0] -= 1
        elif action == "E" and (self.pos[0] < self.width -1):
            self.pos[0] += 1

        done = (self.pos == self.goal)  # check if goal was reached
        if done:
            reward = self.width + self.height  # reward at goal
        else:
            reward = -1  # negative reward at every step

        if self.debug:
            print(self.render())

        return self.pos, reward, done

    def render(self):
        res = ""
        for y in range(self.height):
            for x in range(self.width):
                if self.goal[0] == x and self.goal[1] == y:
                    if self.pos[0] == x and self.pos[1] == y:
                        res += "@"
                    else:
                        res += "o"
                    continue
                if self.pos[0] == x and self.pos[1] == y:
                    res += "x"
                else:
                    res += "_"
            res += "\n"
        return(res)

## Agent Class


In [3]:
class agent():
# definition of the agent

    def __init__(self, n_obs, discount = 1, learning_rate = 0.1, eps = {'start': 1, 'min': 0.01, 'decay': 0.001}):
        self.action_space = np.asarray([0, 1, 2, 3])  # north, east, south, west
        n_actions = np.shape(self.action_space)[0]
        self.Q_table = np.zeros((n_obs, n_actions))

        self.epsilon = eps['start']   #initialize the exploration probability to 1
        self.epsilon_decay = eps['decay']   #exploration decreasing decay for exponential decreasing
        self.epsilon_min = eps['min']   # minimum of exploration proba
        
        self.gamma = discount   #discounted factor
        self.alpha = learning_rate   #learning rate
    
    def action_selection(self, state):
        if np.random.uniform(0,1) < self.epsilon:
            action = self.action_space[np.random.randint(0, 3)]   # choose a random action with probability epsilon
        else:
            action = np.argmax(self.Q_table[state,:])  # choose the best action for that state with prob 1-epsilon
        return(action)

    def policy_update(self, action, reward, state, next_state):
        self.Q_table[state, action] = (1 - self.alpha) * self.Q_table[state, action] + self.alpha*(reward + self.gamma*max(self.Q_table[next_state,:]))

    def decrease_exploration(self, e):
        self.epsilon = max(self.epsilon_min, np.exp(-self.epsilon_decay*e))
        
    def test_agent(self, env):
        state, _, done = env.reset()
        steps = 0
        while not done and steps < 100:
            action = ag.action_selection(env.state_encode(state))
            next_state, reward, done = env.step(action_decode(action))
            steps += 1
        print(steps)

    def train(self, env, n_episodes = 1000, max_steps = 100):
        all_rewards = []
        all_steps = []
        for e in range(n_episodes):   # iterate over episodes
            state, _, done = env.reset()
            trial_reward = 0
            t = 0
            while not done and t < max_steps:
                action = ag.action_selection(env.state_encode(state))  # step 1: choose an action
                old_state = state[:]
                next_state, reward, done = env.step(action_decode(action))    # steps 2 and 3: The environment runs the chosen action and returns next state and reward
                ag.policy_update(action, reward, env.state_encode(old_state), env.state_encode(next_state))  # step 4: policy update
                trial_reward += reward
                t += 1
            ag.decrease_exploration(e)
            all_rewards.append(trial_reward)
            all_steps.append(t)
        return(all_rewards, all_steps)

Task 3

A.	Modify the maze, changing size and shape, and different start positions. How many steps does it take to reach the target? Does the performance vary as you expect?

B.	Change the agent learning parameters (e.g. learning rate, discount factor, exploration values). How does performance change in terms of learning speed and ability to reach the target? What happens if exploration is always maximum? And if it decreases very quickly?

C. Optional. Change the reward applied to different types of actions and test the learning performance. Are you able to find values for which learning is even faster? Imagine that there was a hole in the maze: how can you make the agent learn to avoid it?

# Task 3.A: Modifying the Maze

## Modifying the maze size, shape, and start position

### Maze with a size of 10x10:

In [None]:
# Define grid_env and agent classes

# Task 3.A: Modifying the Maze for 10x10 maze with start position at [0, 0]
maze_height = 10  # Change the height
maze_width = 10  # Change the width
start_pos = [0, 0]  # Change the start position
maze = grid_env(maze_width, maze_height, start_pos)

# Create an instance of the agent class
ag = agent(n_obs=maze.n_states)

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
print("Learning Performance:")
display_learning(rewards, "Reward ")
display_learning(steps_taken, "Steps ")

# Test the agent on the modified maze
print("\nTesting the agent on the modified maze...")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)


### Maze with a size of 10x15:

In [None]:
# Define grid_env and agent classes

# Task 3.A: Modifying the Maze for 10x15 maze with start position at [0, 0]
maze_height = 10  # Change the height
maze_width = 15  # Change the width
start_pos = [0, 0]  # Change the start position
maze = grid_env(maze_width, maze_height, start_pos)

# Create an instance of the agent class
ag = agent(n_obs=maze.n_states)

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
print("Learning Performance:")
display_learning(rewards, "Reward ")
display_learning(steps_taken, "Steps ")

# Test the agent on the modified maze
print("\nTesting the agent on the modified maze...")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)


### Modifying the Maze for 10x10 maze with start position at [1, 2]

In [None]:
# Task 3.A: Modifying the Maze for 10x10 maze with start position at [1, 2]
maze_height = 10  # Change the height
maze_width = 10  # Change the width
start_pos = [1, 2]  # Change the start position
maze = grid_env(maze_width, maze_height, start_pos)

# Create an instance of the agent class
ag = agent(n_obs=maze.n_states)

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
print("Learning Performance:")
display_learning(rewards, "Reward ")
display_learning(steps_taken, "Steps ")

# Test the agent on the modified maze
print("\nTesting the agent on the modified maze...")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)


### Modifying the Maze for 10x10 maze with start position at [6, 2]

In [None]:
# Task 3.A: Modifying the Maze for 10x10 maze with start position at [6, 2]
maze_height = 10  # Change the height
maze_width = 10  # Change the width
start_pos = [6, 2]  # Change the start position
maze = grid_env(maze_width, maze_height, start_pos)

# Train the agent with the modified maze
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
print("\n\nLearning Performance:")
display_learning(rewards, "Reward ")
display_learning(steps_taken, "Steps ")

# Test the agent on the modified maze
print("\nTesting the agent on the modified maze...")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

# Task 2.B: Changing Agent Learning Parameters

## Changing the agent learning parameters

In [None]:
# Higher Learning Rate

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.001, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate, 'min' is the minimum exploration rate, and 'decay' is the rate at which exploration decreases

# Define modified discount factor
discount_factor = 0.9
# The discount factor determines how much importance is given to future rewards

# Define modified learning rate (higher value)
learning_rate = 0.8
# A higher learning rate means the agent will update its Q-values more aggressively based on new experiences
# This can lead to faster learning but may also cause instability if the rate is too high

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

In [None]:
# Lower Learning Rate

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.001, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate, 'min' is the minimum exploration rate, and 'decay' is the rate at which exploration decreases

# Define modified discount factor
discount_factor = 0.9
# The discount factor determines how much importance is given to future rewards

# Define modified learning rate (lower value)
learning_rate = 0.2
# A lower learning rate means the agent will update its Q-values more conservatively based on new experiences
# This can lead to slower learning but may be more stable and less prone to divergence

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

## Changing Agent Learning Parameters - Exploration Parameter

In [None]:
# Higher Minimum Exploration Rate

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.1, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate (set to 1 for maximum exploration at the beginning)
# 'min' is the minimum exploration rate (set to 0.1, higher than the default value of 0.001)
# A higher minimum exploration rate means the agent will continue to explore more even after many episodes
# 'decay' is the rate at which exploration decreases (set to 0.01, a typical value)

# Define modified discount factor
discount_factor = 0.9
# The discount factor determines how much importance is given to future rewards
# A value of 0.9 is a common choice, which means future rewards are slightly discounted

# Define modified learning rate
learning_rate = 0.5
# The learning rate controls how much the agent updates its Q-values based on new experiences
# A value of 0.5 is a moderate learning rate, which is a good starting point

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

In [None]:
# Lower Minimum Exploration Rate

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.0001, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate (set to 1 for maximum exploration at the beginning)
# 'min' is the minimum exploration rate (set to 0.0001, lower than the default value of 0.001)
# A lower minimum exploration rate means the agent will exploit more after many episodes
# 'decay' is the rate at which exploration decreases (set to 0.01, a typical value)

# Define modified discount factor
discount_factor = 0.9
# The discount factor determines how much importance is given to future rewards
# A value of 0.9 is a common choice, which means future rewards are slightly discounted

# Define modified learning rate
learning_rate = 0.5
# The learning rate controls how much the agent updates its Q-values based on new experiences
# A value of 0.5 is a moderate learning rate, which is a good starting point

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

## Changing Agent Learning Parameters - Discount Factor

In [None]:
# Higher Discount Factor

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.001, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate (set to 1 for maximum exploration at the beginning)
# 'min' is the minimum exploration rate (set to 0.001, a common value)
# 'decay' is the rate at which exploration decreases (set to 0.01, a typical value)

# Define modified discount factor (higher value)
discount_factor = 0.99
# The discount factor determines how much importance is given to future rewards
# A higher discount factor (closer to 1) means the agent will value future rewards more
# This can lead to more farsighted decision-making, but may also make the learning process slower and more sensitive to long-term rewards

# Define modified learning rate
learning_rate = 0.5
# The learning rate controls how much the agent updates its Q-values based on new experiences
# A value of 0.5 is a moderate learning rate, which is a good starting point

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

In [None]:
# Lower Discount Factor

# Define modified exploration parameters
epsilon = {'start': 1, 'min': 0.001, 'decay': 0.01}
# The exploration parameters control the exploration-exploitation trade-off
# 'start' is the initial exploration rate (set to 1 for maximum exploration at the beginning)
# 'min' is the minimum exploration rate (set to 0.001, a common value)
# 'decay' is the rate at which exploration decreases (set to 0.01, a typical value)

# Define modified discount factor (lower value)
discount_factor = 0.2
# The discount factor determines how much importance is given to future rewards
# A lower discount factor (closer to 0) means the agent will value immediate rewards more
# This can lead to more short-sighted decision-making, but may also make the learning process faster and less sensitive to long-term rewards

# Define modified learning rate
learning_rate = 0.5
# The learning rate controls how much the agent updates its Q-values based on new experiences
# A value of 0.5 is a moderate learning rate, which is a good starting point

# Train the agent with the modified maze
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

# Display learning performance
display_learning(rewards, "reward ")
display_learning(steps_taken, "steps ")

## Training the Agent
# Create an instance of the agent class with modified parameters
ag = agent(maze.n_states, discount=discount_factor, learning_rate=learning_rate, eps=epsilon)

# Train the agent with the modified learning parameters
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the Agent
# Test the agent with the modified learning parameters
print("Testing the agent with modified learning parameters")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)

# Task 3.C: Changing Reward and Testing Learning Performance

In [14]:
def action_decode(act_code):
    dirs = {0: "N", 1: "E", 2: "S", 3: "W"}
    return dirs[act_code]

def action_encode(act):
    dir_codes = {"N": 0, "E": 1, "S": 2, "W": 3}
    return dir_codes[act]

## Grid Environment Class

In [15]:
class grid_env():
    def __init__(self, width=5, height=5, start=[0, 0], debug=False):
        # Initialize the environment with given parameters
        self.width = width
        self.height = height
        self.start = start
        self.goal = [self.width - 1, self.height - 1]
        self.debug = debug
        self.n_states = self.width * self.height
        self.reset()

    def reset(self):
        # Reset the environment to the starting position
        self.pos = self.start[:]
        return self.pos, 0, False

    def state_decode(self, obs_code):
        # Decode the state code into coordinates
        r = obs_code // self.width
        c = obs_code % self.width
        return [c, r]

    def state_encode(self, position):
        # Encode the coordinates into a state code
        code = position[0] + position[1] * (self.width)
        return code

    def step(self, action):
        # Take a step in the environment based on the action
        new_pos = self.pos[:]
        if action == "S" and (self.pos[1] < self.height - 1):
            new_pos[1] += 1
        elif action == "N" and self.pos[1] > 0:
            new_pos[1] -= 1
        elif action == "W" and self.pos[0] > 0:
            new_pos[0] -= 1
        elif action == "E" and (self.pos[0] < self.width - 1):
            new_pos[0] += 1

        # Check if the new position is the "hole"
        if new_pos == [2, 3]:  # Example hole position
            reward = -1000  # Large negative reward for falling in the hole
            done = True
        else:
            self.pos = new_pos
            done = (self.pos == self.goal)
            if done:
                reward = self.width + self.height  # Reward at goal
            else:
                reward = -1  # Negative reward for each step

        if self.debug:
            print(self.render())

        return self.pos, reward, done

    def render(self):
        # Render the environment
        res = ""
        for y in range(self.height):
            for x in range(self.width):
                if self.goal[0] == x and self.goal[1] == y:
                    if self.pos[0] == x and self.pos[1] == y:
                        res += "@"
                    else:
                        res += "o"
                    continue
                if self.pos[0] == x and self.pos[1] == y:
                    res += "x"
                else:
                    res += "_"
            res += "\n"
        return res


## Agent Class

In [16]:
class agent():
    def __init__(self, n_obs, discount=1, learning_rate=0.1, eps={'start': 1, 'min': 0.01, 'decay': 0.001}):
        # Initialize the agent with given parameters
        self.action_space = np.asarray([0, 1, 2, 3])
        n_actions = np.shape(self.action_space)[0]
        self.Q_table = np.zeros((n_obs, n_actions))
        self.epsilon = eps['start']
        self.epsilon_decay = eps['decay']
        self.epsilon_min = eps['min']
        self.gamma = discount
        self.alpha = learning_rate

    def action_selection(self, state):
        # Select an action based on epsilon-greedy policy
        if np.random.uniform(0, 1) < self.epsilon:
            action = self.action_space[np.random.randint(0, 3)]
        else:
            action = np.argmax(self.Q_table[state, :])
        return action

    def policy_update(self, action, reward, state, next_state):
        # Update the Q-table based on the observed transition
        self.Q_table[state, action] = (1 - self.alpha) * self.Q_table[state, action] + self.alpha * (
                    reward + self.gamma * max(self.Q_table[next_state, :]))

    def decrease_exploration(self, e):
        # Decrease exploration rate
        self.epsilon = max(self.epsilon_min, np.exp(-self.epsilon_decay * e))

    def test_agent(self, env):
        # Test the agent's performance in the environment
        state, _, done = env.reset()
        steps = 0
        while not done and steps < 1000:
            action = ag.action_selection(env.state_encode(state))
            next_state, reward, done = env.step(action_decode(action))
            steps += 1
        print(f"Steps taken: {steps}")

    def train(self, env, n_episodes=10000, max_steps=1000):
        # Train the agent in the environment
        all_rewards = []
        all_steps = []
        for e in range(n_episodes):
            state, _, done = env.reset()
            trial_reward = 0
            t = 0
            while not done and t < max_steps:
                action = ag.action_selection(env.state_encode(state))
                old_state = state[:]
                next_state, reward, done = env.step(action_decode(action))
                ag.policy_update(action, reward, env.state_encode(old_state), env.state_encode(next_state))
                trial_reward += reward
                t += 1
            ag.decrease_exploration(e)
            all_rewards.append(trial_reward)
            all_steps.append(t)
        return all_rewards, all_steps


## Changing Reward and Simulating a Hole in the Maze

In [17]:
# Create grid environment and agent
maze_width = 5
maze_height = 5
start_pos = [0, 0]
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag = agent(n_obs=maze.n_states)


## Training the model


In [None]:
# Train the agent
episodes = 5000
steps = 100
rewards, steps_taken = ag.train(maze, episodes, steps)

## Testing the model

In [None]:
# Test the agent with modified reward and hole
print("Testing the agent with modified reward and hole...")
maze = grid_env(maze_width, maze_height, start_pos, debug=True)
ag.test_agent(maze)