#Homework 2 - Aleksa Pulai RA22/2020, Katarina Topolic RA164/2020

The following is the solution for the Level 3 assignment for the Maze problem.
The objective was to create a Maze in a form of a graph.

 For our implementation we have decided to do it the following way: you can choose how many nodes (states) the graph will have, and the number of different actions that are possible in the entire graph. The nodes are connected by randomly choosing the number of connections (up to the maximum number of connections provided as a parameter) for each node, and the action having to be taken to "cross" from one node to the other. Let's say we have chosen to have a graph with 10 nodes, and 4 actions in total, with the maximum number of connections for each node being 5. For examle, Node1 could be connected with 3 other randomly chosen nodes, with randomly chosen actions (e.g. Node3 - Action1, Node7 - Action4, Node1 - Action4; as you can see it is possible for a node to be connected with itself, of course). We could end up with some nodes being connected with only one node, and some with 5 connections, and the distribution of the actions totaly random as well. Additionaly, every connection is assigned a probability, coresponding to the probability of that particular connection being chosen (or, actually, the next state) if a specific action is chosen (using the previous example we could say that the probability of going to the Node3 from Node1 if the Action1 is chosen is 1.0, and if the chosen action is Action4 the probabilities of Node7 and Node1 being the next state is let's say 0.3 and 0.7 respecfuly) The distribution of probabilities is, also random. The type of nodes (Regular, Penalty, Teleport and Terminal) is specified, by saying how many nodes of which type we want.

The graph is stored in form of a dictionary, the keys being IDs of the state (integers from 0 to n-1, with n = number of nodes), and the values are lists, the first element being the type of the node, and the second is another dictionary, the keys of which are actions possible to choose in that state and the values a list containing next states and the corresponding probabilities of that next state being chosen with the assigned action.

## Imports


In [None]:
from abc import ABC, abstractmethod
from typing import Iterable, Callable
from copy import copy

import numpy as np
import matplotlib.pyplot as plt

import random

## Implementation of individual Nodes

In [None]:
class Node(ABC):
    """Abstract base class for all maze nodes."""

    @abstractmethod
    def get_reward(self) -> float:
        """The reward an agent receives when stepping onto this node."""
        pass

    def is_terminal(self) -> bool:
        """Checks if the node is terminal.

        When stepping onto a terminal node the agent exits
        the maze and finishes the game.
        """
        return False

    def has_value(self) -> bool:
        """Check if the node has value."""
        return True

In [None]:
class RegularNode(Node):
    """A common, non-terminal, steppable node."""

    def __init__(self, reward: float):
        self.reward = reward

    def get_reward(self) -> float:
        return self.reward

class PenaltyNode(Node):
    """A common, non-terminal, steppable node."""

    def __init__(self, reward: float):
        self.reward = reward

    def get_reward(self) -> float:
        return self.reward

class TerminalNode(Node):
    """A terminal node."""

    def __init__(self, reward: float):
        self.reward = reward

    def get_reward(self) -> float:
        return self.reward

    def is_terminal(self) -> bool:
        return True

    def has_value(self) -> bool:
        return False


class TeleportNode(Node):
    """A non-steppable node."""

    def __init__(self, reward: float):
        self.reward = reward

    def get_reward(self) -> float:
        return 0

    def has_value(self) -> bool:
        return False

#Implementation of the Maze Graph

In [None]:
class MazeGraph:

    def get_random_state(self, S_num: int) -> int:
        return random.randint(0,S_num-1)

    def get_random_action(self, A_num: int) -> int:
        return random.randint(0,A_num-1)

    def asign_probability_to_list(self, size: int) -> list:
        """
            Returns a list, whose elements are probabilities the sum of which is 1, and are rounded at two decimal points (e.g. [0.3, 0.35, 0.35]).

            Sometimes the sum turns out to be 0.99 or 1.01 for some reason. The purpose of if statements is to fix this.
        """
        probabilities = [round(random.uniform(0,1), 2) for _ in range(size)]

        total_prob = sum(probabilities)
        normalized_probabilities = [round(prob / total_prob, 2) for prob in probabilities]

        remainder = round(1.00 - sum(normalized_probabilities), 2)

        if remainder == 0:
            return normalized_probabilities
        elif remainder > 0:
            i = 0
            while normalized_probabilities[i] < remainder:
                i += 1
            round(normalized_probabilities[i] + remainder, 2)
            return normalized_probabilities
        else:
            normalized_probabilities[0] = round(normalized_probabilities[0] + remainder, 2)
            return normalized_probabilities

    def asign_prob_to_connections(self, connections: dict[int, list[int,float]]) -> dict[int, list[int,float]]:
        '''
          This method creates connections for the given state, and the states it is connected to, assigning actions and probabilities of each connection.

          connections - {action_for_that_connection: [next_state, probability of that connection]
        '''
        for action in connections:
            probabilities = self.asign_probability_to_list(len(connections[action]))
            for i in range(0, len(connections[action])):
                connections[action][i][1] = probabilities[i]
        return connections

    def connect_state(self, state_id: int, S_num: int, A_num: int, max_connections: int) -> dict[int, list[list[int,float]]]:

        '''Makes a dictionary consisting of connections between state (ID = state_id) and other states.

        state_id = the id of the state for which the connections are being made
        S_num = the total number of states in the graph
        A_num = the number of possible actions (total number of actions in the graph) to choose and get to the next state from the state (ID = state_id)
        max_connections = maximum number of connections with the state (ID = state_id)'''
        n = random.randint(1,max_connections)
        res = {}
        actions_help = {} #to ensure that we dont have two states connected by two connections with the same action
        for i in range(n):
            next_state = self.get_random_state(S_num)
            action = self.get_random_action(A_num)
            if action not in res:
                res[action] = [[next_state, -1]]
                actions_help[action] = {next_state}
            elif next_state not in actions_help[action]:
                res[action].append([next_state, -1])
                actions_help[action].add(next_state)
        res = {k: sorted(v, key=lambda x: x[0]) for k, v in sorted(res.items())} #sorts the resulting dictionary by actions
        return res

    def connect_teleport(self, state_id: int, S_num: int) -> dict[int, list[list[int,float]]]:
        """
            Defines the node to which the agent is going to be teleported to.
        """
        next_state = self.get_random_state(S_num)
        while next_state == state_id: #to ensure that it doesn't teleport to the same node (infinite teleportation :( )
            next_state = self.get_random_state(S_num)
        return {0: [[next_state, 1.0]]}

    def connect_terminal(self, state_id: int) -> dict[int, list[list[int,float]]]:
        return {-1: [[state_id, 1.0]]}

    def connect_states(self, states: dict[int, list[Node, dict[int, list[int,float]]]], S_num: int, A_num: int, max_connections: int) -> dict[int, list[Node, dict[int, list[list[int,float]]]]]:
        """
            Connects all nodes in the graph.
        """
        for state in states:
            if isinstance(states[state][0], TeleportNode):
                connections = self.connect_teleport(state, S_num)
                states[state].append(connections)
            elif isinstance(states[state][0], TerminalNode):
                connections = self.connect_terminal(state)
                states[state].append(connections)
            else:
                connections = self.connect_state(state, S_num, A_num, max_connections)
                connections = self.asign_prob_to_connections(connections)
                states[state].append(connections)
        return states

    def create_node(self, node_type: int) -> Node:
        if node_type == 0:
          return RegularNode(-1)
        elif node_type == 1:
          return PenaltyNode(-10)
        elif node_type == 2:
          return TeleportNode(0)
        else:
          return TerminalNode(0)

    def __init__(self, S_num: int, A_num: int, max_connections: int, node_specs: list[int]):

        states = {}
        node_specs_copy = node_specs
        node_type = 0
        for i in range(S_num): #creates a skeleton of the graph assigning state ids (keys) as integers, and the type of each node
            while node_specs_copy[node_type] == 0: #if?
                node_type += 1
            states[i] = [self.create_node(node_type)]
            node_specs_copy[node_type] -= 1

        states = self.connect_states(states, S_num, A_num, max_connections) #connects the nodes
        self.states = states
        self.S_num = S_num
        self.A_num = A_num
        self.max_connections = max_connections
        self.node_specs = node_specs

    def do_action(self, state_id: int, action: int) -> int:
        """
            Returns the next state by doing the action (action) from the state (state_id) respecting the probabilities assigned to connections.
        """

        possible_next_states = self.states[state_id][1][action]
        probabilities = [s[1] for s in possible_next_states]

        choices = np.arange(len(probabilities))
        chosen_value = np.random.choice(choices, p=probabilities)

        for state in self.states:
            if state == possible_next_states[chosen_value][0]:
                return state
            else:
              pass

    def __call__(self, state_id: int, action: int) -> tuple[int, float, bool]:
        """
            Applies the action (action) from the state (state_id) and gets the id of the next state,
            the reward of that action (specificaly that connection) and if the next state is terminal
        """
        next_state = self.do_action(state_id, action)
        reward = self.states[next_state][0].get_reward()
        terminal = self.states[next_state][0].is_terminal()
        return next_state, reward, terminal

    def get_actions(self, state_id) -> list[int]:
        """
            Lists the actions that are possible to take in the state (state_id)
        """
        return [k for k,_ in self.states[state_id][1].items()]

    def get_states(self) -> list[int]:
        """
            Lists the nodes (states) in the graph.
        """
        return [v for v in self.states.keys()]

    def is_terminal(self, state_id: int) -> bool:
        return self.states[state_id][0].is_terminal()

    def get_reward(self, state_id:int) -> int:
        return self.states[state_id][0].get_reward()

    def show_graph(self):
        """
          Displays the graph
        """
        for state in self.states:
          print(state, (self.states[state][0].__class__.__name__, self.states[state][1]))

## Defining the Maze Environment. Value and Action Value iteration. Policy iteration

In [None]:
class MazeEnvironment:

    def init_stochastic_policy(self) -> dict[int, dict[int, float]]: #ako zelim da politika bude stohasticka
        policy = {}
        for state in self.maze.states:
            actions = self.maze.get_actions(state)

            probabilities = [round(1.0/len(actions), 2) for i in range(len(actions))]

            policy[state] = {actions[i]: probabilities[i] for i in range(len(actions))}

        return policy

    def init_deterministic_policy(self) -> dict[int, int]:
        policy = {}
        for state in self.maze.states:
            actions = self.maze.get_actions(state)
            policy[state] = actions[random.randint(0, len(actions)-1)]

        return policy

    def init_values(self) -> dict[int, float]:
        """Randomly initialize values of states of the given environment."""
        values = {s: -10*random.random() for s in self.maze.get_states()}

        for state in values:
            if self.maze.is_terminal(state):
                values[state] = 0

        return values

    def init_action_values(self) -> dict[tuple[int, int], float]:
        """Randomly initialize action values of states of the given environment."""
        action_values = {}
        for state in self.maze.get_states():
            if self.maze.is_terminal(state):
                action_values[(state, -1)] = 0
            else:
                for action in self.maze.get_actions(state):
                    action_values[(state, action)] = -10*random.random()

        return action_values

    def __init__(self, maze: MazeGraph, policy: str):
        self.maze = maze
        if policy == 'deterministic':
            self.policy = self.init_deterministic_policy()
        elif policy == 'stochastic':
            self.policy = self.init_stochastic_policy()
        self.values = self.init_values()
        self.action_values = self.init_action_values()

     ##### VALUE ITERATION#####

    def update_state_value(self, state_id: int, values: dict[int, float], gamma: float) -> float:
        """Update value of the given state.

            state_id : id of the state for which the value is updated
            values : Values of other states.
            gamma : discount factor.
        """
        temp = []
        for action in self.maze.get_actions(state_id):
            sum = 0
            for next_state, probability in self.maze.states[state_id][1][action]:
                reward = self.maze.get_reward(next_state)
                sum += (probability*(reward + gamma*values[next_state]))
            temp.append(sum)
        return max(temp)

    def async_update_all_values(self, values: dict[int, float], gamma) -> list[float]:
        """Update values of all states
            values : Values of other states.
            gamma : discount factor.
        """
        new_values = copy(values)
        for state in self.maze.get_states():
            if not self.maze.is_terminal(state):
                new_values[state] = self.update_state_value(state, new_values, gamma)
        return new_values

    def value_iteration(self, gamma, epsilon, v0, maxiter=1000) -> tuple[list[float], int]:
        values = v0
        for k in range(maxiter):
            new_values = self.async_update_all_values(values, gamma)
            error = max([abs(new_values[state] - values[state]) for state in values])
            if error < epsilon:
               return new_values, k
            values = new_values
        return values, k

    def greedy_action_by_value(self, state, values, gamma) -> int:
        final = {}
        for action in self.maze.get_actions(state):
            temp = []
            for next_state, _ in self.maze.states[state][1][action]:
                reward = self.maze.get_reward(next_state)
                temp.append(reward + gamma*values[next_state])
            final[action] = max(temp)
        return max(final, key = lambda k: final[k])

    def optimal_policy_value_iteration(self, values, gamma):
        return {state: self.greedy_action_by_value(state, values, gamma) for state in self.maze.get_states() if not self.maze.is_terminal(state)}

    ##### ACTION VALUE (Q) ITERATION#####

    def update_action_value(self, state_id: int, action: int, action_values: dict[tuple[int, int], float], gamma: float) -> float:
        """Update action value of the given state.
            state_id : The id of the state (node).
            action : action to take from the given state.
            action_values : action values for all states
            gamma : discount factor.
        """
        sum = 0
        for next_state, probability in self.maze.states[state_id][1][action]:
            reward = self.maze.get_reward(next_state)
            temp = []
            for next_action in self.maze.get_actions(next_state):
                temp.append(action_values[(next_state, next_action)])
            sum += (probability*(reward + gamma*max(temp)))
        return sum

    def async_update_all_action_values(self, action_values: dict[tuple[int, int], float], gamma: float) -> list[float]:
        """Update action values of all states.
            action_values : action values for all states
            gamma : discount factor.
        """
        new_action_values = copy(action_values)
        for state in self.maze.get_states():
            if not self.maze.is_terminal(state):
                for action in self.maze.get_actions(state):
                    new_action_values[(state, action)] = self.update_action_value(state, action, new_action_values, gamma)
        return new_action_values

    def action_value_iteration(self, gamma, epsilon, a0, maxiter=1000) -> tuple[list[float], int]:
        action_values = a0
        for k in range(maxiter):
            new_action_values = self.async_update_all_action_values(action_values, gamma)
            error = max([abs(new_action_values[state] - action_values[state]) for state in action_values])
            if error < epsilon:
               return new_action_values, k
            action_values = new_action_values
        return action_values, k

    def greedy_action_by_action_value(self, state, action_values, gamma) -> int:
        temp = {}
        for action in self.maze.get_actions(state):
            temp[action] = action_values[(state, action)]

        return max(temp, key = lambda k: temp[k])

    def optimal_policy_action_value_iteration(self, action_values: dict[tuple[int, int], float], gamma: float) -> dict[int, int]:
        return {state: self.greedy_action_by_action_value(state, action_values, gamma) for state in self.maze.get_states() if not self.maze.is_terminal(state)}

    ##### POLICY ITERATION USING STATE VALUES (V)#####

    def derive_new_deterministic_policy_using_state_values(self, values: dict[int, float], gamma: float) -> dict[int, int]:
        new_policy = {}
        for state in self.maze.get_states():
            temp = {}
            for action in self.maze.get_actions(state):
                sum = 0
                for next_state, probability in self.maze.states[state][1][action]:
                    reward = self.maze.get_reward(next_state)
                    sum += (probability*(reward + gamma*values[next_state]))
                temp[action] = sum
            new_policy[state] = max(temp, key = lambda k: temp[k])

        return new_policy

    def update_state_values_using_given_policy(self, policy: dict[int, int], values: dict[int, float], gamma: float) -> dict[int, float]:
        new_values = {}
        for state in policy:
            sum = 0
            for next_state, probability in self.maze.states[state][1][policy[state]]:
                reward = self.maze.get_reward(next_state)
                sum += (probability*(reward + gamma*values[next_state]))
            new_values[state] = sum

        return new_values

    def derive_state_values_using_given_policy(self, policy: dict[int, int], v0: dict[int, float], gamma: float, epsilon: float, max_iterations: int) -> dict[int, float]:
        values = v0
        for k in range(max_iterations):
            new_values = self.update_state_values_using_given_policy(policy, values, gamma)
            error = max([abs(new_values[state] - values[state]) for state in values])
            if error < epsilon:
                return new_values
            values = new_values
        return values

    def policy_iteration_using_state_values(self, gamma: float, epsilon: float, max_iterations=1000) -> tuple[dict[int, int], int]:
        values = self.init_values()
        policy = self.init_deterministic_policy()

        count = 0 #counts how many times the policy hasn't changed
        iterations = 0
        while True:
            iterations += 1
            new_policy = self.derive_new_deterministic_policy_using_state_values(values, gamma)

            if new_policy == policy:
                count += 1

            if count == 3:
                return (new_policy, iterations)
            elif iterations == max_iterations:
                return (new_policy, iterations)

            policy = new_policy
            values = self.derive_state_values_using_given_policy(policy, values, gamma, epsilon, max_iterations)

      ##### POLICY ITERATION USING STATE ACTION VALUES (Q) #####

    def derive_new_deterministic_policy_using_action_values(self, action_values: dict[tuple[int, int], float], gamma: float) -> dict[int, int]:
        new_policy = {}
        for state in self.maze.get_states():
            temp = {}
            for action in self.maze.get_actions(state):
                temp[action] = action_values[(state, action)]
            new_policy[state] = max(temp, key = lambda k: temp[k])

        return new_policy

    def update_action_values_using_given_policy(self, policy: dict[int, int], action_values: dict[tuple[int, int], float], gamma: float) -> dict[tuple[int, int], float]:
        new_action_values = {}
        for state in policy:
            for action in self.maze.get_actions(state):
                sum = 0
                for next_state, probability in self.maze.states[state][1][action]:
                    reward = self.maze.get_reward(next_state)
                    sum += (probability*(reward + gamma*action_values[(next_state, policy[next_state])]))
                new_action_values[(state, action)] = sum

        return new_action_values

    def derive_action_values_using_given_policy(self, policy: dict[int, int], a0: dict[tuple[int, int], float], gamma: float, epsilon: float, max_iterations: int) -> dict[int, float]:
        action_values = a0
        for k in range(max_iterations):
            new_action_values = self.update_action_values_using_given_policy(policy, action_values, gamma)
            error = max([abs(new_action_values[(state, action)] - action_values[(state, action)]) for state, action in action_values])
            if error < epsilon:
                return new_action_values
            action_values = new_action_values
        return action_values

    def policy_iteration_using_action_values(self, gamma: float, epsilon: float, max_iterations=1000) -> tuple[dict[int, int], int]:
        action_values = self.init_action_values()
        policy = self.init_deterministic_policy()

        count = 0
        iterations = 0
        while True:
            iterations += 1
            new_policy = self.derive_new_deterministic_policy_using_action_values(action_values, gamma)

            if new_policy == policy:
                count += 1

            if count == 3:
                return (new_policy, iterations)
            elif iterations == max_iterations:
                return (new_policy, iterations)

            policy = new_policy
            action_values = self.derive_action_values_using_given_policy(policy, action_values, gamma, epsilon, max_iterations)


## Testing

Here we have an implementation of a class that will test the code written thus far. Hiperparameters can be set here.

In [None]:
class Test:
    def __init__(self):

          self.test_specs = {'S_NUM' : 5,
                            'A_NUM' : 3,
                            'MAX_CONNECTIONS' : 15,
                            'NODE_SPECS' : [2, 1, 1, 1],
                            'GAMMA' : 1,
                            'EPSILON' : 0.0000001,
                            'MAX_ITERATIONS' : 1000,
                            'POLICY' : 'deterministic'}

          try:
              self.maze = MazeGraph(self.test_specs['S_NUM'], self.test_specs['A_NUM'], self.test_specs['MAX_CONNECTIONS'], self.test_specs['NODE_SPECS'])
          except ZeroDivisionError:
              print("Oops :'(. Try again")
          self.environment = MazeEnvironment(self.maze, self.test_specs['POLICY'])

    def show_graph(self):
        self.maze.show_graph()

    def value_iteration(self):
        self.environment.values, k = self.environment.value_iteration(self.test_specs['GAMMA'], self.test_specs['EPSILON'], self.environment.values)
        print(self.environment.values, k)

    def optimal_policy_using_state_value_iteration(self):
        print(self.environment.optimal_policy_value_iteration(self.environment.values, 1))

    def action_value_iteration(self):
        self.environment.action_values, k = self.environment.action_value_iteration(self.test_specs['GAMMA'], self.test_specs['EPSILON'], self.environment.action_values)
        print(self.environment.action_values, k)

    def optimal_policy_using_action_value_iteration(self):
        print(self.environment.optimal_policy_action_value_iteration(self.environment.action_values, self.test_specs['GAMMA']))

    def policy_iteration_using_state_values(self):
        opt_pol, i = self.environment.policy_iteration_using_state_values(self.test_specs['GAMMA'], self.test_specs['EPSILON'], self.test_specs['MAX_ITERATIONS'])
        print(opt_pol, i)

    def policy_iteration_using_action_values(self):
        opt_pol, i = self.environment.policy_iteration_using_action_values(self.test_specs['GAMMA'], self.test_specs['EPSILON'], self.test_specs['MAX_ITERATIONS'])
        print(opt_pol, i)


The following code block will create a graph and print it in form of a dictionary. Due to the nature of randomness of this implementation it is possible, depending on the number of nodes, actions and connections, to get isolated nodes, graph that never reaches terminal state... In order to prevent that from happening it would've been nice for us to have implemented a method that will validate the graph, but that would take implementing DFS, Topological Sorting or some other algorithm concerning graph manipulation. We decided that that would have been an overkill so, please feel free to generate a new graph if such a scenario occurs :).
Also, sometimes (very rarely), a zero division error occurs when generating random probabilities. This is handled by an exception, so in that case try again.

Example:

0 ('RegularNode', {0: [[4, 1.0]], 1: [[1, 0.42], [2, 0.19], [4, 0.39]], 2: [[0, 0.49], [1, 0.51]]})

Here the 0 at the begining is the state ID, the keys of the dictionary are actions, and the pair for example [2, 0.19] means the ID of the next state is 2, and the probability of that actualy being the next state if the action 1 is chosen is 0.19.

In [None]:
test = Test()
test.show_graph()

0 ('RegularNode', {0: [[4, 1.0]], 1: [[1, 0.42], [2, 0.19], [4, 0.39]], 2: [[0, 0.49], [1, 0.51]]})
1 ('RegularNode', {0: [[0, 0.17], [1, 0.15], [3, 0.22], [4, 0.46]], 1: [[2, 0.88], [4, 0.12]]})
2 ('PenaltyNode', {0: [[4, 1.0]], 1: [[1, 0.26], [2, 0.45], [4, 0.29]], 2: [[0, 0.35], [1, 0.32], [2, 0.2], [3, 0.13]]})
3 ('TeleportNode', {0: [[2, 1.0]]})
4 ('TerminalNode', {-1: [[4, 1.0]]})


In [None]:
test.value_iteration()

{0: 0.0, 1: -2.964705873541097, 2: 0.0, 3: -10.0, 4: 0} 9


In [None]:
test.optimal_policy_using_state_value_iteration()

{0: 0, 1: 0, 2: 0, 3: 0}


In [None]:
test.action_value_iteration()

{(0, 0): 0.0, (0, 1): -3.5651764658025016, (0, 2): -2.511999994188752, (1, 0): -2.9647058806437507, (1, 1): -8.8, (2, 0): 0.0, (2, 1): -5.5308235289673755, (2, 2): -4.918705881806, (3, 0): -10.0, (4, -1): 0} 10


In [None]:
test.optimal_policy_using_action_value_iteration()

{0: 0, 1: 0, 2: 0, 3: 0}


In [None]:
test.policy_iteration_using_state_values()

{0: 0, 1: 0, 2: 0, 3: 0, 4: -1} 4


In [None]:
test.policy_iteration_using_action_values()

{0: 0, 1: 0, 2: 0, 3: 0, 4: -1} 5
