In [None]:
# Author: Gergő Gömöri
# Email: gomorigergo91@gmail.com

# Short description: In a grid-world, a rodent moves around in a room looking for some delicious juice.

import numpy as np
import matplotlib.pyplot as plt
import time
import random
from IPython import display
import pandas as pd
import seaborn as sns
import networkx as nx

# Colours
WHITE  = '#FFFFFF'   # empty cell
GRAY   = '#5F5F60'   # mouse
ORANGE = '#F2AA4C'   # juice
DARK   = '#2A3132'   # obstacle
GREEN  = '#ADEFD1FF' # start
BLUE   = '#ADD8E6'   # decision point


In [None]:
def draw_maze(maze):

    col_map = {0: WHITE, 1: ORANGE, 2: DARK, 3: GREEN, 4: BLUE}

    rows,cols    = maze.shape
    colored_maze = [[col_map[maze[j,i]] for i in range(cols)] for j in range(rows)]

    fig = plt.figure(1, figsize=(cols,rows))

    ax = plt.gca()
    ax.set_xticks([])
    ax.set_yticks([])

    rows,cols    = maze.shape
    colored_maze = [[col_map[maze[j,i]] for i in range(cols)] for j in range(rows)]

    fig = plt.figure(1, figsize=(cols,rows))

    grid = plt.table(cellText=None,
                            cellColours=colored_maze,
                            cellLoc='center',
                            loc=(0,0),
                            edges='closed')
    tc = grid.properties()['children']
    for cell in tc:
        cell.set_height(1.0/rows)
        cell.set_width(1.0/cols)


# Convention:
# 0 = empty
# 1 = orange juice
# 2 = obstacle
# 3 = start
# 4 = decision point

maze_small = np.array([
    [3, 0, 0, 0],
    [0, 0, 0, 0],
    [0, 0, 0, 0],
    [0, 0, 0, 1]
])
draw_maze(maze_small)
plt.savefig("maze_small", dpi = 300)
plt.show()


maze_default = np.array([
    [3, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 1]
])
draw_maze(maze_default)
plt.savefig("maze_default", dpi = 300)
plt.show()

maze_new_rew = np.array([
    [3, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0],
    [1, 0, 0, 0, 0, 0, 0, 0]
])
draw_maze(maze_new_rew)
plt.savefig("maze_new_rew", dpi = 300)
plt.show()

maze_labyrinth = np.array([
    [0, 4, 0, 2, 2, 2, 0, 4, 0],
    [2, 0, 2, 2, 2, 2, 2, 0, 2],
    [2, 4, 0, 0, 4, 0, 0, 4, 2],
    [2, 0, 2, 2, 0, 2, 2, 0, 2],
    [0, 4, 0, 2, 0, 2, 0, 4, 0],
    [2, 2, 2, 2, 0, 2, 2, 2, 2],
    [3, 0, 0, 0, 4, 2, 2, 2, 2],
    [2, 2, 2, 2, 0, 2, 2, 2, 2],
    [0, 4, 0, 2, 0, 2, 1, 4, 0],
    [2, 0, 2, 2, 0, 2, 2, 0, 2],
    [2, 4, 0, 0, 4, 0, 0, 4, 2],
    [2, 0, 2, 2, 2, 2, 2, 0, 2],
    [0, 4, 0, 2, 2, 2, 0, 4, 0]
])
draw_maze(maze_labyrinth)
plt.savefig("maze_labyrinth", dpi = 300)
plt.show()

In [None]:
class Maze:
    def __init__(self, maze):
        self.maze                     = maze
        self.actions                  = self.actions()
        self.states, self.map         = self.states()
        self.start_state              = self.get_start()
        self.terminal_state           = self.get_terminal()
        self.n_actions                = len(self.actions)
        self.n_states                 = len(self.states)
        self.rewards                  = self.rewards()

    def actions(self):
        actions = dict()
        actions[0] = ( 0,-1) # LEFT
        actions[1] = ( 0, 1) # RIGHT
        actions[2] = (-1, 0) # UP
        actions[3] = ( 1, 0) # DOWN
        return actions

    # States are (x_mouse, y_mouse)
    def states(self):
        states = dict()
        map = dict()
        s = 0
        for i in range(self.maze.shape[0]):
            for j in range(self.maze.shape[1]):
                states[s] = (i, j)
                map[(i, j)] = s
                s += 1
        return states, map

    def get_terminal(self):
        for i in range(self.maze.shape[0]):
            for j in range(self.maze.shape[1]):
                if self.maze[i, j] == 1:
                    terminal_state = self.map[(i, j)]
        return terminal_state

    def get_start(self):
        for i in range(self.maze.shape[0]):
            for j in range(self.maze.shape[1]):
                if self.maze[i, j] == 3:
                    start = self.map[(i, j)]
        return start

    def possible_actions(self, state):
        # In total 8 special cases plus check for obstacles
        (x, y) = self.states[state]
        poss_actions = np.array([0, 1, 2, 3])

        # Top left corner
        if x == 0 and y == 0:
            poss_actions = poss_actions[np.all([poss_actions != 0, poss_actions != 2], axis = 0)]
        
        # Bottom left corner
        if x == (self.maze.shape[0] - 1) and y == 0:
            poss_actions = poss_actions[np.all([poss_actions != 0, poss_actions != 3], axis = 0)]
        
        # Top right corner
        if x == 0 and y == (self.maze.shape[1] - 1):
            poss_actions = poss_actions[np.all([poss_actions != 1, poss_actions != 2], axis = 0)]

        # Bottom right corner
        if x == (self.maze.shape[0] - 1) and y == (self.maze.shape[1] - 1):
            poss_actions = poss_actions[np.all([poss_actions != 1, poss_actions != 3], axis = 0)]

        # Left edge
        if y == 0:
            poss_actions = poss_actions[poss_actions != 0]

        # Right edge
        if y == (self.maze.shape[1] - 1):
            poss_actions = poss_actions[poss_actions != 1]

        # Upper edge
        if x == 0:
            poss_actions = poss_actions[poss_actions != 2]

        # Bottom edge
        if x == (self.maze.shape[0] - 1):
            poss_actions = poss_actions[poss_actions != 3]

        # Check obstacle in each direction
        # To the left
        if y > 0:
            if self.maze[x, y - 1] == 2:
                poss_actions = poss_actions[poss_actions != 0]

        # To the right
        if y != (self.maze.shape[1] - 1): 
            if self.maze[x, y + 1] == 2:
                poss_actions = poss_actions[poss_actions != 1]

        # Upwards
        if x > 0:
            if self.maze[x - 1, y] == 2:
                poss_actions = poss_actions[poss_actions != 2]

        # Downwards
        if x != (self.maze.shape[0] - 1):
            if self.maze[x + 1, y] == 2:
                poss_actions = poss_actions[poss_actions != 3]
        
        # No actions possible from walls
        if self.maze[x, y] == 2:
            poss_actions = []

        return poss_actions

    def move(self, state, action):
        # Compute the future position
        row = self.states[state][0] + self.actions[action][0]
        col = self.states[state][1] + self.actions[action][1]
        return self.map[(row, col)]

    def rewards(self):
        rewards = np.full((self.n_states, self.n_actions), np.NINF)

        # Reward is only obtained from terminal state
        for s in range(self.n_states):
            for a in self.possible_actions(s):
                if self.move(s, a) == self.terminal_state:
                    rewards[s, a] = 1
                else:
                    rewards[s, a] = 0
        return rewards

    def simulate(self, start, policy):
        path = []
        counter = 0
        path.append(self.states[start])
        s = start
        
        while s != self.terminal_state:
            if counter == 25 or (policy[s] not in self.possible_actions(s)):
                path = []
                break

            next_s = self.move(s, policy[s])
            path.append(self.states[next_s])
            s = next_s
            counter += 1

        if path == []:
            total_steps = np.inf
        else:
            total_steps = len(path) - 1
        return path, total_steps

    def show(self):
        print('The states are :')
        print(self.states)
        print('The mapping of the states:')
        print(self.map)
        print('The start state is:')
        print(self.start_state)
        print('The terminal state is:')
        print(self.terminal_state)
        print(self.states[self.terminal_state])
        print('The actions are:')
        print(self.actions)
        print('The rewards are:')
        print(self.rewards)


# Description of the environment
env = Maze(maze_default)
env.show()

In [None]:
def create_map(successor_representation, target, env):

    # Initialization
    map = []
    buffer = []
    nodes = []

    # Add first node
    buffer.append([target])
    nodes.append(target)

    for i in range(1000000):
        batch = []
        next_nodes = []
        for j in range(len(buffer[0])):

            selected = find_best_neighbours(successor_representation, buffer[0][j], env)

            if selected.shape == (0,):
                continue

            candidate_nodes = selected[:, 2].astype(int)

            for m in range(len(candidate_nodes)):
                if candidate_nodes[m] not in nodes:
                    batch.append(selected[m])
                    next_nodes.append(candidate_nodes[m])
                    nodes.append(candidate_nodes[m])

        if next_nodes == []:
            break
        else:
            map.append(batch)
            buffer.append(next_nodes)
            del buffer[0]

    return map


In [None]:
def probably_node_leads_to(successor_representation, node, env):

    M_row = successor_representation[node, :]
    results = []

    for i in range(0, len(M_row), env.n_actions):
        segment = M_row[i : i + env.n_actions]
        sum = np.sum(segment)
        divisor = len(segment)
        for ind, val in enumerate(segment):
            if val == 0:
                divisor -= 1
        if divisor == 0:
            sum = 0
        else:
            sum /= divisor
        results.append(sum)

    return np.argsort(results)[-2]


In [None]:
def find_best_neighbours(successor_representation, target, env):

    # 1. A list is returned with elements [target, value_in_M, ID]
    neighbours = np.array([[target, successor_representation[i, target], i] for i in range(len(successor_representation[:, target])) if ((successor_representation[i, target] != 0) and (i != target))])

    if neighbours.shape == (0,):
        return neighbours

    # 2. Find out which state does the target most possibly lead to
    leads_to_state = probably_node_leads_to(successor_representation, target, env)

    # 3. Remove the nodes which are initiated at the state where this node leads to (these nodes just lead back)

    neighbour_candidate_states = np.floor(neighbours[:, 2] / env.n_actions)

    ind_leading_back = []

    for ind, state in enumerate(neighbour_candidate_states):
        if state == leads_to_state:
            ind_leading_back.append(ind)

    neighbours = np.delete(neighbours, ind_leading_back, axis = 0)

    # 4. Remove the neighbours which do not lead to the state of the target

    target_state = np.floor(target / env.n_actions)
    leads_elsewhere = []

    for ind, cand_neigh in enumerate(neighbours):
        current_node = int(cand_neigh[2])
        current_node_leads_to = probably_node_leads_to(successor_representation, current_node, env)
        if current_node_leads_to != target_state:
            leads_elsewhere.append(ind)

    neighbours = np.delete(neighbours, leads_elsewhere, axis = 0)

    return neighbours


In [None]:
def update_value_weights_grad(env, successor_representation, weights, target, reward, map, learning_rate_wave, discount_factor_wave):
    # Parameters
    # - learning_rate_wave
    # - discount_factor_wave

    # Initialization
    params = weights
    dopamine = {}

    # Extract nodes from the map
    nodes = []
    nodes.append([target])
    for i in range(len(map)):
        batch = []
        for j in range(len(map[i])):
            batch.append(int(map[i][j][2]))
        nodes.append(batch)


    # Looping through rings
    for k, subset in enumerate(nodes):
        # Looping through elements of the ring
        for node in subset:
            time_from_terminal = k

            # 1. Compute the feature vector representing (s, a)
            x_sa = np.array([1 if i == node else 0 for i in range(env.n_states * env.n_actions)]).reshape(-1, 1)

            # 2. Compute the q_hat_sa
            q_hat_sa = np.dot(np.transpose(params), np.dot(successor_representation, x_sa))

            # 3. Compute grad_q_hat_sa
            grad_q_hat_sa = np.dot(successor_representation, x_sa)
            
            # 4. Compute the discounted return
            discounted_return = np.power(discount_factor_wave, time_from_terminal) * reward

            # 6. Compute update_vec
            update_vec = learning_rate_wave * (discounted_return - q_hat_sa) * grad_q_hat_sa
            dopamine[node] = (discounted_return - q_hat_sa).tolist()[0][0]

            # 7. Parameter update
            params += update_vec

    return params, dopamine


In [None]:
def compute_eps(episode_id, num_episodes, eps_min, expl_ratio):
    expl_phase = expl_ratio * num_episodes
    if episode_id < expl_phase:
        eps = 1.0
    else:
        eps = np.power(eps_min, (episode_id - expl_phase) / (num_episodes - expl_phase))
    
    return eps

In [None]:
# Dopamine waves visualization

def visualize_waves(map, dopamine):
    nodes_on_map = [j for i in map for j in i]

    final_nodes = [int(nodes_on_map[0][0])]
    final_edges = []

    for element in nodes_on_map:
        final_nodes.append(int(element[2]))
        final_edges.append((int(element[0]), int(element[2])))

    G = nx.Graph()
    G.add_nodes_from(final_nodes)
    G.add_edges_from(final_edges)

    colors = []
    for node in G.nodes():
        color = dopamine[node]
        colors.append(color)

    cmap = plt.cm.get_cmap('rainbow')
    cog_map = nx.draw_spring(G, node_color = colors, cmap = cmap, with_labels = True)
    cbar = plt.cm.ScalarMappable(cmap=cmap)
    plt.colorbar(cbar)
    plt.axis("equal")
    plt.savefig("dopamine_waves", dpi = 300, bbox_inches = "tight")
    plt.show()

In [None]:
def sarsa(num_episodes, learning_rate, discount_factor, eps_min, expl_ratio, env_update, rand, in_labyrinth):
    # Initialization
    random.setstate(rand)
    results = []

    if in_labyrinth:
        env = Maze(maze_labyrinth)
    else:
        env = Maze(maze_default)

    start_state = env.start_state

    Q = np.zeros((env.n_states, env.n_actions))

    # SARSA algorithm
    for i in range(num_episodes):

        if i == int(np.around(env_update * num_episodes)):
            env = Maze(maze_new_rew)

        state = env.start_state

        epsilon = compute_eps(i, num_episodes, eps_min, expl_ratio)

        # Select an epsilon-greedy action
        action = np.argmax(Q[state, :])
        if (random.random() < epsilon) or (action not in env.possible_actions(state)):
            action = random.choice(env.possible_actions(state))

        while state != env.terminal_state:
            reward = env.rewards[state, action]
            next_state = env.move(state, action)

            next_action = np.argmax(Q[next_state, :])
            if (random.random() < epsilon) or (next_action not in env.possible_actions(next_state)):
                next_action = random.choice(env.possible_actions(next_state))

            Q[state, action] += learning_rate * (reward + discount_factor * Q[next_state, next_action] - Q[state, action])

            if next_state == env.terminal_state:
                # Obtain policy
                policy = np.full(env.n_states, -1)
                for s in range(env.n_states):
                    policy[s] = np.argmax(Q[s, :])

                # Extract the current performance between the starting state and the terminal state
                _, res = env.simulate(start_state, policy)
                results.append(res)

            state = next_state
            action = next_action

    return Q, results


In [None]:
def successor_representation_sarsa(num_episodes, learning_rate_base, discount_factor_base, eps_min, expl_ratio, env_update, rand, in_labyrinth):
    # Initialization
    random.setstate(rand)
    results = []

    if in_labyrinth:
        env = Maze(maze_labyrinth)
    else:
        env = Maze(maze_default)

    start_state = env.start_state

    successor_representation = np.zeros((env.n_states * env.n_actions, env.n_states * env.n_actions))
    reward_estimation = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

    # "Successor Representation - SARSA" algorithm
    for i in range(num_episodes):

        if i == int(np.around(env_update * num_episodes)):
            env = Maze(maze_new_rew)
            reward_estimation = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

        state = env.start_state

        epsilon = compute_eps(i, num_episodes, eps_min, expl_ratio)

        # Select an epsilon-greedy action
        action = np.argmax(np.dot(successor_representation[state * env.n_actions : state * env.n_actions + env.n_actions, :], reward_estimation))
        if (random.random() < epsilon) or (action not in env.possible_actions(state)):
            action = random.choice(env.possible_actions(state))

        while state != env.terminal_state:
            reward = env.rewards[state, action]
            next_state = env.move(state, action)

            # Select next action with epsilon-greedy
            next_action = np.argmax(np.dot(successor_representation[next_state * env.n_actions : next_state * env.n_actions + env.n_actions, :], reward_estimation))
            if (random.random() < epsilon) or (next_action not in env.possible_actions(next_state)):
                next_action = random.choice(env.possible_actions(next_state))

            # Update Successor Representation
            I_sa = np.array([1 if (i == state * env.n_actions + action) else 0 for i in range(env.n_states * env.n_actions)])
            td_error = I_sa + discount_factor_base * successor_representation[next_state * env.n_actions + next_action, :] - successor_representation[state * env.n_actions + action, :]
            successor_representation[state * env.n_actions + action, :] += learning_rate_base * td_error

            # Update reward estimation
            if next_state == env.terminal_state:
                reward_estimation[state * env.n_actions + action] += learning_rate_base * (reward - reward_estimation[state * env.n_actions + action])

                # Obtain policy
                policy = np.full(env.n_states, -1)
                for s in range(env.n_states):
                    policy[s] = np.argmax(np.dot(successor_representation[s * env.n_actions : s * env.n_actions + env.n_actions, :], reward_estimation))
            
                # Extract the current performance between (0, 0) and the terminal state
                _, res = env.simulate(start_state, policy)
                results.append(res)

            state = next_state
            action = next_action

    return successor_representation, reward_estimation, results


In [None]:
def successor_representation_wave(num_episodes, learning_rate_base, discount_factor_base, eps_min, expl_ratio, learning_rate_wave, discount_factor_wave, env_update, rand, in_labyrinth):
    # Initialization
    random.setstate(rand)
    results = []

    if in_labyrinth:
        env = Maze(maze_labyrinth)
    else:
        env = Maze(maze_default)

    start_state = env.start_state

    successor_representation = np.zeros((env.n_states * env.n_actions, env.n_states * env.n_actions))
    weights = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

    # "Successor Representation - SARSA - wave-like" algorithm
    for i in range(num_episodes):
        if i == int(np.around(env_update * num_episodes)):
            env = Maze(maze_new_rew)
            weights = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

        state = env.start_state

        epsilon = compute_eps(i, num_episodes, eps_min, expl_ratio)

        # Select an epsilon-greedy action
        action = np.argmax(np.dot(np.transpose(weights), successor_representation[:, state * env.n_actions : state * env.n_actions + env.n_actions]))
        if (random.random() < epsilon) or (action not in env.possible_actions(state)):
            action = random.choice(env.possible_actions(state))

        while state != env.terminal_state:
            reward = env.rewards[state, action]
            next_state = env.move(state, action)

            # Select next action with epsilon-greedy
            next_action = np.argmax(np.dot(np.transpose(weights), successor_representation[:, next_state * env.n_actions : next_state * env.n_actions + env.n_actions]))
            if (random.random() < epsilon) or (next_action not in env.possible_actions(next_state)):
                next_action = random.choice(env.possible_actions(next_state))

            # Update Successor Representation
            I_sa = np.array([1 if (i == state * env.n_actions + action) else 0 for i in range(env.n_states * env.n_actions)])
            td_error = I_sa + discount_factor_base * successor_representation[next_state * env.n_actions + next_action, :] - successor_representation[state * env.n_actions + action, :]
            successor_representation[state * env.n_actions + action, :] += learning_rate_base * td_error

            if next_state == env.terminal_state:
                # Update reward estimation
                target = state * env.n_actions + action
                map = create_map(successor_representation, target, env)
                weights, dop = update_value_weights_grad(env, successor_representation, weights, target, reward, map, learning_rate_wave, discount_factor_wave)

                # if (i == int(np.around(0.4 * num_episodes))):
                #     visualize_waves(map, dop)

                # Obtain policy
                policy = np.full(env.n_states, -1)
                for s in range(env.n_states):
                    policy[s] = np.argmax(np.dot(np.transpose(weights), successor_representation[:, s * env.n_actions : s * env.n_actions + env.n_actions]))
            
                # Extract the current performance between (0, 0) and the terminal state
                _, res = env.simulate(start_state, policy)
                results.append(res)

            state = next_state
            action = next_action

    return successor_representation, weights, map, dop, results


In [None]:
def sarsa_lambda(num_episodes, learning_rate, discount_factor, eps_min, expl_ratio, trace_decay, env_update, rand, in_labyrinth):
    # Initialization
    random.setstate(rand)
    results = []

    if in_labyrinth:
        env = Maze(maze_labyrinth)
    else:
        env = Maze(maze_default)

    start_state = env.start_state
    weights     = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

    # SARSA(lambda) algorithm
    for i in range(num_episodes):

        if i == int(np.around(env_update * num_episodes)):
            env = Maze(maze_new_rew)

        state = env.start_state

        epsilon = compute_eps(i, num_episodes, eps_min, expl_ratio)

        # Select an epsilon-greedy action
        action = np.argmax(weights[state * env.n_actions : state * env.n_actions + env.n_actions])
        if (random.random() < epsilon) or (action not in env.possible_actions(state)):
            action = random.choice(env.possible_actions(state))

        # Generate one-hot feature vector
        feature = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)
        feature[state * env.n_actions + action] = 1

        eligibility_trace = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)

        while state != env.terminal_state:
            reward = env.rewards[state, action]
            next_state = env.move(state, action)

            eligibility_trace = discount_factor * trace_decay * eligibility_trace + feature

            next_action = np.argmax(weights[next_state * env.n_actions : next_state * env.n_actions + env.n_actions])
            if (random.random() < epsilon) or (next_action not in env.possible_actions(next_state)):
                next_action = random.choice(env.possible_actions(next_state))

            next_feature = np.zeros(env.n_states * env.n_actions).reshape(-1, 1)
            next_feature[next_state * env.n_actions + next_action] = 1

            reward_prediction_error = reward + discount_factor * np.dot(np.transpose(weights), next_feature) - np.dot(np.transpose(weights), feature)

            weights += learning_rate * reward_prediction_error * eligibility_trace

            if next_state == env.terminal_state:
                # Obtain policy
                policy = [np.argmax(a) for a in [weights[s : s + env.n_actions] for s in range(0, len(weights), env.n_actions)]]

                # Extract the current performance between (0, 0) and the terminal state
                _, res = env.simulate(start_state, policy)
                results.append(res)

            feature = next_feature
            state = next_state
            action = next_action

    return weights, results


In [None]:
# ------------------------------------------------------------------------------

In [None]:
# Benchmarking simple environment
# a) Speed
# b) Reliability
# c) Flexibility

env = Maze(maze_default)

num_episodes = 40
num_test_cases = 10
num_perturbations = 100

env_update = 1.1
run_in_labyrinth = False

# Hyperparameters

# 1) Base
lr_base = 0.05
df_base = 0.99
eps_min = 0.05
expl_ratio = 1.0

# 2) Wave-like
lr_wave = 0.5
df_wave = 0.5

# 3) SARSA(lambda)
trace_decay = 0.9

# Initialization
# 0: SARSA Q-learning, 1: SR, 2: Wave, 3: Eligibility trace
# 4 algorithms being compared, first row: when was the earliest step an optimal trajectory was found, second row: which algorithm
speed_test = np.zeros((num_test_cases * 4, 2))

# 4 algorithms being compared, first row: how many times did the agent found the reward, second row: which algorithm
reliability_test = np.zeros((num_test_cases * 4, 2))

flexibility_test = []

for i in range(num_test_cases):
    rand_state = random.getstate()

    Q_sarsa, fin_res_sarsa = sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)
    speed_test[i * 4, 0] = np.argmin(fin_res_sarsa)
    speed_test[i * 4, 1] = 0
    fin_res_sarsa = np.array([0 if (i == np.inf) else i for i in fin_res_sarsa])
    reliability_test[i * 4, 0] = np.count_nonzero(fin_res_sarsa) / num_episodes
    reliability_test[i * 4, 1] = 0

    succ_rep_base, rew_est_base, fin_res_base = successor_representation_sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 1, 0] = np.argmin(fin_res_base)
    speed_test[(i * 4) + 1, 1] = 1
    fin_res_base = np.array([0 if (i == np.inf) else i for i in fin_res_base])
    reliability_test[(i * 4) + 1, 0] = np.count_nonzero(fin_res_base) / num_episodes
    reliability_test[(i * 4) + 1, 1] = 1

    succ_rep_wave, weights_wave, map_final, dop, fin_res_wave = successor_representation_wave(num_episodes, lr_base, df_base, eps_min, expl_ratio, lr_wave, df_wave, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 2, 0] = np.argmin(fin_res_wave)
    speed_test[(i * 4) + 2, 1] = 2
    fin_res_wave = np.array([0 if (i == np.inf) else i for i in fin_res_wave])
    reliability_test[(i * 4) + 2, 0] = np.count_nonzero(fin_res_wave) / num_episodes
    reliability_test[(i * 4) + 2, 1] = 2

    w, fin_res_lambda = sarsa_lambda(num_episodes, lr_base, df_base, eps_min, expl_ratio, trace_decay, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 3, 0] = np.argmin(fin_res_lambda)
    speed_test[(i * 4) + 3, 1] = 3
    fin_res_lambda = np.array([0 if (i == np.inf) else i for i in fin_res_lambda])
    reliability_test[(i * 4) + 3, 0] = np.count_nonzero(fin_res_lambda) / num_episodes
    reliability_test[(i * 4) + 3, 1] = 3

    # Flexibility test looping through rows of the maze
    for r in range(env.maze.shape[0]):
        distracted_states = []
        for c in range(env.maze.shape[1]):
            distracted_states.append(env.map[(r, c)])
        distraction_level = 2

        # SARSA
        policy_sarsa = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-distraction_level]
            else:
                policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-1]

        _, pert_sarsa = env.simulate(env.start_state, policy_sarsa)
        if pert_sarsa == np.inf:
            success = 0
        else:
            success = 1

        # Successor Representation
        policy_sr = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-distraction_level]
            else:
                policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-1]

        _, pert_sr = env.simulate(env.start_state, policy_sr)
        if pert_sr == np.inf:
            success = 0
        else:
            success = 1

        # Wave-like
        policy_wave = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-distraction_level]
            else:
                policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-1]

        _, pert_wave = env.simulate(env.start_state, policy_wave)
        if pert_wave == np.inf:
            success = 0
        else:
            success = 1

        # Eligibility traces
        policy_lambda = np.full(env.n_states, -1)
        w = w.flatten()
        for state in range(env.n_states):
            if state in distracted_states:
                policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-distraction_level]
            else:
                policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-1]

        _, pert_lambda = env.simulate(env.start_state, policy_lambda)
        if pert_lambda == np.inf:
            success = 0
        else:
            success = 1

    # Flexibility test looping through columns of the maze
    for c in range(env.maze.shape[1]):
        distracted_states = []
        for r in range(env.maze.shape[0]):
            distracted_states.append(env.map[(r, c)])
        distraction_level = 2

        # SARSA
        policy_sarsa = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-distraction_level]
            else:
                policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-1]

        _, pert_sarsa = env.simulate(env.start_state, policy_sarsa)
        if pert_sarsa == np.inf:
            success = 0
        else:
            success = 1

        # Successor Representation
        policy_sr = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-distraction_level]
            else:
                policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-1]

        _, pert_sr = env.simulate(env.start_state, policy_sr)
        if pert_sr == np.inf:
            success = 0
        else:
            success = 1

        # Wave-like
        policy_wave = np.full(env.n_states, -1)
        for state in range(env.n_states):
            if state in distracted_states:
                policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-distraction_level]
            else:
                policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-1]

        _, pert_wave = env.simulate(env.start_state, policy_wave)
        if pert_wave == np.inf:
            success = 0
        else:
            success = 1

        # Eligibility traces
        policy_lambda = np.full(env.n_states, -1)
        w = w.flatten()
        for state in range(env.n_states):
            if state in distracted_states:
                policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-distraction_level]
            else:
                policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-1]

        _, pert_lambda = env.simulate(env.start_state, policy_lambda)
        if pert_lambda == np.inf:
            success = 0
        else:
            success = 1


    # Flexibility test randomly selected state perturbations
    for j in range(num_perturbations):
        for ind, k in enumerate([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]):
            distracted_states = random.sample([s for s in range(env.n_states) if s != env.terminal_state], k = int(np.floor(env.n_states * k)))
            distraction_level = 2

            # SARSA
            policy_sarsa = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-distraction_level]
                else:
                    policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-1]

            _, pert_sarsa = env.simulate(env.start_state, policy_sarsa)
            if pert_sarsa == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 0, k])

            # Successor Representation
            policy_sr = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-distraction_level]
                else:
                    policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-1]

            _, pert_sr = env.simulate(env.start_state, policy_sr)
            if pert_sr == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 1, k])


            # Wave-like
            policy_wave = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-distraction_level]
                else:
                    policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-1]

            _, pert_wave = env.simulate(env.start_state, policy_wave)
            if pert_wave == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 2, k])


            # Eligibility traces
            policy_lambda = np.full(env.n_states, -1)
            w = w.flatten()
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-distraction_level]
                else:
                    policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-1]

            _, pert_lambda = env.simulate(env.start_state, policy_lambda)
            if pert_lambda == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 3, k])

    print(i)

flexibility_test = np.array(flexibility_test)

sns.color_palette("colorblind")
# Plotting the results
# 1. Speed
df_speed_test = pd.DataFrame(speed_test, columns = ["first_reached", "algorithms"])
df_speed_test = df_speed_test.astype('int32')
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
df_speed_test = df_speed_test.replace({"algorithms": give_names})
sns.set_theme(style = "white")
sns_speed_test = sns.catplot(x = "algorithms", y = "first_reached", kind = "violin", data = df_speed_test)
sns_speed_test.set(ylim = (0, None))
sns_speed_test.set(xlabel="Algorithms", ylabel = "First attempt leading to the reward")
sns_speed_test.savefig("speed_test_simple", dpi = 300)

# 2. Reliability
df_reliability_test = pd.DataFrame(reliability_test, columns = ["ratio_reward_obtained", "algorithms"])
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
df_reliability_test = df_reliability_test.replace({"algorithms": give_names})
sns.set_theme(style = "white")
sns_reliability_test = sns.catplot(x = "algorithms", y = "ratio_reward_obtained", kind = "violin", data = df_reliability_test)
sns_reliability_test.set(xlabel="Algorithms", ylabel = "Ratio of the attempts leading to the reward")
sns_reliability_test.set(ylim = (0.0, 1.0))
sns_reliability_test.savefig("reliability_test_simple", dpi = 300)

# 3. Flexibility
df_flexibility_test = pd.DataFrame(flexibility_test, columns = ["ratio_reward_obtained", "algorithms", "pert_level"])
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
pert_names = {0.1: "10 %", 0.2: "20 %", 0.3: "30 %", 0.4: "40 %", 0.5: "50 %", 0.6: "60 %", 0.7: "70 %", 0.8: "80 %"}
df_flexibility_test = df_flexibility_test.replace({"algorithms": give_names})
df_flexibility_test = df_flexibility_test.replace({"pert_level": pert_names})
sns.set_theme(style = "darkgrid")
sns_flexibility_test = sns.relplot(x = "pert_level", y = "ratio_reward_obtained", hue = "algorithms", kind = "line", data = df_flexibility_test)
sns_flexibility_test.set(xlabel="Ratio of perturbed states", ylabel = "Ratio of the attempts leading to the reward")
sns_flexibility_test.savefig("flexibility_test_simple", dpi = 300)

In [None]:
# Benchmarking for change in reward location
# a) Flexibility

env = Maze(maze_new_rew)

num_episodes = 40
num_test_cases = 10
num_perturbations = 100

env_update = 0.875
run_in_labyrinth = False

# Hyperparameters

# 1) Base
lr_base = 0.05
df_base = 0.99
eps_min = 0.05
expl_ratio = 1.0

# 2) Wave-like
lr_wave = 0.5
df_wave = 0.5

# 3) SARSA(lambda)
trace_decay = 0.9

flexibility_test = []

for i in range(num_test_cases):
    rand_state = random.getstate()

    Q_sarsa, fin_res_sarsa = sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)

    succ_rep_base, rew_est_base, fin_res_base = successor_representation_sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)

    succ_rep_wave, weights_wave, map_final, dop, fin_res_wave = successor_representation_wave(num_episodes, lr_base, df_base, eps_min, expl_ratio, lr_wave, df_wave, env_update, rand_state, run_in_labyrinth)

    w, fin_res_lambda = sarsa_lambda(num_episodes, lr_base, df_base, eps_min, expl_ratio, trace_decay, env_update, rand_state, run_in_labyrinth)

    # Flexibility test, randomly selected states being distracted
    for j in range(num_perturbations):
        for ind, k in enumerate([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]):
            distracted_states = random.sample([s for s in range(env.n_states) if s != env.terminal_state], k = int(np.floor(env.n_states * k)))
            distraction_level = 2

            # SARSA
            policy_sarsa = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-distraction_level]
                else:
                    policy_sarsa[state] = np.argsort(Q_sarsa[state, :])[-1]

            _, pert_sarsa = env.simulate(env.start_state, policy_sarsa)
            if pert_sarsa == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 0, k])

            # Successor Representation
            policy_sr = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-distraction_level]
                else:
                    policy_sr[state] = np.argsort(np.dot(succ_rep_base[state * env.n_actions : state * env.n_actions + env.n_actions, :], rew_est_base).flatten())[-1]

            _, pert_sr = env.simulate(env.start_state, policy_sr)
            if pert_sr == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 1, k])


            # Wave-like
            policy_wave = np.full(env.n_states, -1)
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-distraction_level]
                else:
                    policy_wave[state] = np.argsort(np.dot(np.transpose(weights_wave), succ_rep_wave[:, state * env.n_actions : state * env.n_actions + env.n_actions]).flatten())[-1]

            _, pert_wave = env.simulate(env.start_state, policy_wave)
            if pert_wave == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 2, k])


            # Eligibility traces
            policy_lambda = np.full(env.n_states, -1)
            w = w.flatten()
            for state in range(env.n_states):
                if state in distracted_states:
                    policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-distraction_level]
                else:
                    policy_lambda[state] = np.argsort(w[state * env.n_actions : state * env.n_actions + env.n_actions])[-1]

            _, pert_lambda = env.simulate(env.start_state, policy_lambda)
            if pert_lambda == np.inf:
                success = 0
            else:
                success = 1
            flexibility_test.append([success, 3, k])

    print(i)

flexibility_test = np.array(flexibility_test)

sns.color_palette("colorblind")
# Plotting the results
# 1. Flexibility
df_flexibility_test = pd.DataFrame(flexibility_test, columns = ["ratio_reward_obtained", "algorithms", "pert_level"])
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
pert_names = {0.1: "10 %", 0.2: "20 %", 0.3: "30 %", 0.4: "40 %", 0.5: "50 %", 0.6: "60 %", 0.7: "70 %", 0.8: "80 %"}
df_flexibility_test = df_flexibility_test.replace({"algorithms": give_names})
df_flexibility_test = df_flexibility_test.replace({"pert_level": pert_names})
sns.set_theme(style = "darkgrid")
sns_flexibility_test = sns.relplot(x = "pert_level", y = "ratio_reward_obtained", hue = "algorithms", kind = "line", data = df_flexibility_test)
sns_flexibility_test.set(xlabel="Ratio of perturbed states", ylabel = "Ratio of the attempts leading to the reward")
sns_flexibility_test.savefig("flexibility_test_change_rew", dpi = 300)


In [None]:
# Benchmarking in the labyrinth

# a) Speed
# b) Reliability

num_episodes = 20
num_test_cases = 10

env_update = 1.1
run_in_labyrinth = True

# Hyperparameters

# 1) Base
lr_base = 0.05
df_base = 0.99
eps_min = 0.05
expl_ratio = 1.0

# 2) Wave-like
lr_wave = 0.5
df_wave = 0.6

# 3) SARSA(lambda)
trace_decay = 0.9

# Initialization
# 0: SARSA Q-learning, 1: SR, 2: Wave, 3: Eligibility trace
# 4 algorithms being compared, first row: when was the earliest step an optimal trajectory was found, second row: which algorithm
speed_test = np.zeros((num_test_cases * 4, 2))

# 4 algorithms being compared, first row: how many times did the agent found the reward, second row: which algorithm
reliability_test = np.zeros((num_test_cases * 4, 2))

for i in range(num_test_cases):
    rand_state = random.getstate()

    Q_sarsa, fin_res_sarsa = sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)
    speed_test[i * 4, 0] = np.argmin(fin_res_sarsa)
    speed_test[i * 4, 1] = 0
    fin_res_sarsa = np.array([0 if (i == np.inf) else i for i in fin_res_sarsa])
    reliability_test[i * 4, 0] = np.count_nonzero(fin_res_sarsa) / num_episodes
    reliability_test[i * 4, 1] = 0

    succ_rep_base, rew_est_base, fin_res_base = successor_representation_sarsa(num_episodes, lr_base, df_base, eps_min, expl_ratio, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 1, 0] = np.argmin(fin_res_base)
    speed_test[(i * 4) + 1, 1] = 1
    fin_res_base = np.array([0 if (i == np.inf) else i for i in fin_res_base])
    reliability_test[(i * 4) + 1, 0] = np.count_nonzero(fin_res_base) / num_episodes
    reliability_test[(i * 4) + 1, 1] = 1

    succ_rep_wave, weights_wave, map_final, dop, fin_res_wave = successor_representation_wave(num_episodes, lr_base, df_base, eps_min, expl_ratio, lr_wave, df_wave, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 2, 0] = np.argmin(fin_res_wave)
    speed_test[(i * 4) + 2, 1] = 2
    fin_res_wave = np.array([0 if (i == np.inf) else i for i in fin_res_wave])
    reliability_test[(i * 4) + 2, 0] = np.count_nonzero(fin_res_wave) / num_episodes
    reliability_test[(i * 4) + 2, 1] = 2

    w, fin_res_lambda = sarsa_lambda(num_episodes, lr_base, df_base, eps_min, expl_ratio, trace_decay, env_update, rand_state, run_in_labyrinth)
    speed_test[(i * 4) + 3, 0] = np.argmin(fin_res_lambda)
    speed_test[(i * 4) + 3, 1] = 3
    fin_res_lambda = np.array([0 if (i == np.inf) else i for i in fin_res_lambda])
    reliability_test[(i * 4) + 3, 0] = np.count_nonzero(fin_res_lambda) / num_episodes
    reliability_test[(i * 4) + 3, 1] = 3

    print(i)

sns.color_palette("colorblind")
# Plotting the results
# 1. Speed
df_speed_test = pd.DataFrame(speed_test, columns = ["first_reached", "algorithms"])
df_speed_test = df_speed_test.astype('int32')
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
df_speed_test = df_speed_test.replace({"algorithms": give_names})
sns.set_theme(style = "white")
sns_speed_test = sns.catplot(x = "algorithms", y = "first_reached", kind = "violin", data = df_speed_test)
sns_speed_test.set(ylim = (0, None))
sns_speed_test.set(xlabel="Algorithms", ylabel = "First attempt leading to the reward")
sns_speed_test.savefig("speed_test_labyrinth", dpi = 300)

# 2. Reliability
df_reliability_test = pd.DataFrame(reliability_test, columns = ["ratio_reward_obtained", "algorithms"])
give_names = {0: "SARSA", 1: "Original SR", 2: "Wave-like SR", 3: "SARSA($\lambda$)"}
df_reliability_test = df_reliability_test.replace({"algorithms": give_names})
sns.set_theme(style = "white")
sns_reliability_test = sns.catplot(x = "algorithms", y = "ratio_reward_obtained", kind = "violin", data = df_reliability_test)
sns_reliability_test.set(xlabel="Algorithms", ylabel = "Ratio of the attempts leading to the reward")
sns_reliability_test.set(ylim = (0.0, 1.0))
sns_reliability_test.savefig("reliability_test_labyrinth", dpi = 300)
