In [2]:
import numpy as np
from collections import namedtuple

In [35]:
# - i: row index of agent
# - j: column index of agent
# - x: 1 if agent is carrying a block
# - i_distance: distance between the agents' i values
# - j_distance: distance between the agents' j values
# - a: 1 if pickup location 1 has blocks left
# - b: 1 if pickup location 2 has blocks left
# - c: 1 if dropoff location 1 has capacity left
# - d: 1 if dropoff location 2 has capacity left
# - e: 1 if dropoff location 3 has capacity left
# - f: 1 if dropoff location 4 has capacity left
State = namedtuple('State', 
                   ['i', 'j', 'x', 'i_distance', 'j_distance', 'a', 'b', 'c', 'd', 'e', 'f'])

#### PDWorld

In [4]:
class PDWorld:
    def __init__(self):
        self.size = 5
        self.alpha = 0.3,
        self.gamma = 0.5,
        self.pickup_locations = [(3,5), (4,2)]
        self.dropoff_locations = [(1,1), (1,5), (3,3), (5,5)]
        self.board = np.zeros((self.size, self.size))

    def setup(self, size: int, alpha: float, gamma: float, pickup_locations: list, dropoff_locations: list):
        if size:
            self.size = size
            self.board = np.zeros((size, size))
        if alpha:
            self.alpha = alpha
        if gamma:
            self.gamma = gamma
        if pickup_locations:
            self.pickup_locations = pickup_locations
        if dropoff_locations:
            self.dropoff_locations = dropoff_locations


#### Driver

In [5]:
# global PDWorld object
world = PDWorld()

In [1]:
# functions to run experiments
def experiment_1a():
    world.setup(size = 5,
                alpha = 0.3,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(steps=7500, policy='PRANDOM')
    world.summary()

def experiment_1b():
    world.setup(size = 5,
                alpha = 0.3,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(steps=7500, policy='PGREEDY')
    world.summary()

def experiment_1c():
    world.setup(size = 5,
                alpha = 0.3,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(steps=7500, policy='PEXPLOIT')
    world.summary()
    world.display_q_table(agent='male')

def experiment_2():
    world.setup(size = 5,
                alpha = 0.3,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM', method='SARSA')
    world.run(steps=7500, policy='PEXPLOIT', method='SARSA')
    world.summary()
    world.display_q_table(agent='male')

def experiment_3a():
    world.setup(size = 5,
                alpha = 0.15,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(steps=7500, policy='PEXPLOIT')
    world.summary()
    world.display_q_table(agent='male')

def experiment_3b():
    world.setup(size = 5,
                alpha = 0.45,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(steps=7500, policy='PEXPLOIT')
    world.summary()
    world.display_q_table(agent='male')

def experiment_4():
    world.setup(size = 5,
                alpha = 0.3,
                gamma = 0.5,
                pickup_capacity = 10,
                dropoff_capacity = 5,
                agent_start_locations = [(1,3), (5,3)],
                pickup_locations = [(3,5), (4,2)],
                dropoff_locations = [(1,1), (1,5), (3,3), (5,5)])
    world.run(steps=500, policy='PRANDOM')
    world.run(total_runs=3, policy='PEXPLOIT', animate=True)
    world.summary()
    world.display_q_table(agent='male')
    
    world.setup(pickup_locations = [(1,2), (4,5)])
    world.run(total_runs=3, policy='PEXPLOIT', animate=True)
    world.summary()
    world.display_q_table(agent='male')


#### Q-Table

In [36]:
class QTable:
    def __init__(self):
        self.q_table = np.zeros((world.size**4 * 128, 6))
        self.__operator_dict = {'n':0, 's':1, 'e':2, 'w':3, 'p':4, 'd':5}
    
    def next_operator(self, current_state: State, applicable_operators: list[str], policy: str ='PRANDOM') -> str:
        """Returns the next operator to be applied given the current state, method, and policy as a string.

        Parameters
        ----------
        current_state: State
            Named tuple containg state information.
            - i: row index of agent
            - j: column index of agent
            - x: 1 if agent is carrying a block
            - i_distance: distance between the agents' i values
            - j_distance: distance between the agents' j values
            - a: 1 if pickup location 1 has blocks left
            - b: 1 if pickup location 2 has blocks left
            - c: 1 if dropoff location 1 has capacity left
            - d: 1 if dropoff location 2 has capacity left
            - e: 1 if dropoff location 3 has capacity left
            - f: 1 if dropoff location 4 has capacity left

        applicable_operators: list
            List of applicable operators in the current state
        
        policy: ['PRANDOM' | 'PEXPLOIT' | 'PGREEDY']
            Policy to use when selecting next operator
        
        Returns
        -------
            String corresponding to operator to apply:
            ['n' | 's' | 'e' | 'w' | 'p' | 'd']
        """
        assert policy in ['PRANDOM', 'PEXPLOIT', 'PGREEDY'], 'Error: Invalid policy'
        next_operator = self.__next_operator(current_state, applicable_operators, policy)
        return ['n', 's', 'e', 'w', 'p', 'd'][next_operator]

    def update_q_table(self, previous_state: State, operator: str, current_state: State, applicable_operators: list[str], policy: str = 'PRANDOM', method: str = 'QL'):
        """Updates the Q-table values for the previous state and action used using the given policy and method.

        Parameters
        ----------
        previous_state: State
            Named tuple containg previous state information.
            - i: row index of agent
            - j: column index of agent
            - x: 1 if agent is carrying a block
            - i_distance: distance between the agents' i values
            - j_distance: distance between the agents' j values
            - a: 1 if pickup location 1 has blocks left
            - b: 1 if pickup location 2 has blocks left
            - c: 1 if dropoff location 1 has capacity left
            - d: 1 if dropoff location 2 has capacity left
            - e: 1 if dropoff location 3 has capacity left
            - f: 1 if dropoff location 4 has capacity left

        operator: ['n' | 's' | 'e' | 'w' | 'p' | 'd']
            Operator that was used from previous state to current state

        current_state: State
            Named tuple containg current state information.

        applicable_operators: list
            List of applicable operators in the current state

        policy: ['PRANDOM' | 'PEXPLOIT' | 'PGREEDY']
            Policy to use when selecting next operator (used only in SARSA)
        
        method: ['QL' | 'SARSA']
            Method to use when updating Q-table

        Returns
        -------
            None
        """
        assert method in ['QL', 'SARSA']
        
        action = self.__operator_dict[operator]
        if method == 'SARSA':
            self.q_table[self.__encode_state(previous_state), action] = (
                self.q_table[self.__encode_state(previous_state), action] +
                world.alpha * (world.reward((previous_state.i, previous_state.j), operator) +
                world.gamma * self.q_table[self.__encode_state(current_state), self.__next_operator(current_state, policy)] -
                self.q_table[self.__encode_state(previous_state), action])
            )
        elif method == 'QL':
            self.q_table[self.__encode_state(previous_state), action] = (
                (1 - world.alpha) * self.q_table[self.__encode_state(previous_state), action] + 
                world.alpha * (world.reward((current_state.i, current_state.j), operator) + 
                world.gamma * max(self.q_table[self.__encode_state(current_state)][[self.__operator_dict[operator] for operator in applicable_operators]]))
            )

    def __next_operator(self, current_state: State, applicable_operators: list[str], policy: str ='PRANDOM') -> int:
        """Returns the next operator to be applied given the current state and policy as an index

        Parameters
        ----------
        current_state: State
            Named tuple containg state information.

        applicable_operators: list
            List of applicable operators in the current state

        policy: ['PRANDOM' | 'PEXPLOIT' | 'PGREEDY']
            Policy to use when selecting next operator
        
        Returns
        -------
            Column index of q-table corresponding to the operator to take
            - 0: north
            - 1: south
            - 2: east
            - 3: west
            - 4: pick up
            - 5: drop off
        """
        applicable_operators_as_indices = [self.__operator_dict[operator] for operator in applicable_operators]
        if 4 in applicable_operators_as_indices:
            return 4
        if 5 in applicable_operators_as_indices:
            return 5
        else:
            max_val_operators = np.flatnonzero(self.q_table[self.__encode_state(current_state)] == np.max(self.q_table[self.__encode_state(current_state)]))
            if policy == 'PRANDOM':
                # select applicable operator randomly
                return np.random.choice(applicable_operators_as_indices)
            elif policy == 'PEXPLOIT':
                if np.random.rand() < 0.8:
                    # select applicable operator with highest q-value
                    return np.random.choice(max_val_operators)
                else:
                    # select applicable operator randomly from operators without highest q-value
                    return np.random.choice(np.setdiff1d(applicable_operators_as_indices, max_val_operators))
            elif policy == 'PGREEDY':
                # select applicable operator with highest q-value
                return np.random.choice(max_val_operators)

    def __encode_state(state: State) -> int:
        """Encodes the given state into its row index in the Q-table

        Parameters
        ----------
        state : State
            Named tuple containing state information
        
        Returns
        -------
        int
            integer index of state in Q-table
        """
        multipliers = np.array([world.size**3 * 128, world.size**2 * 128, world.size**2 * 64, world.size * 64, 64, 32, 16, 8, 4, 2, 1])
        return np.sum(np.multiply(np.array(state), multipliers))
    
    def __decode_index(index: int) -> State:
        """Decodes the given row index of the Q-table into a State named tuple

        Parameters
        ----------
        index : int
            Row index of the Q-table
        
        Returns
        -------
            Named tuple containing state information
        """
        state_values = []
        divisors = [world.size**3 * 128, world.size**2 * 128, world.size**2 * 64, world.size * 64, 64, 32, 16, 8, 4, 2, 1]
        remainder = index
        for divisor in divisors:
            state_values.append(remainder // divisor)
            remainder = remainder % divisor
        return State(*state_values)    
