In [446]:
import numpy as np
import operator
import matplotlib.pyplot as plt
%matplotlib inline

In [511]:
class PD_WORLD:
    ## Initialise starting data
    def __init__(self):
        # Set information about the gridworld
        self.height = 5
        self.width = 5
        self.rewards = np.zeros(( self.height, self.width)) - 1
        self.num_blocks = np.zeros(( self.height, self.width))

        # Set start location for each agent
        self.agent_locations = [(0,2), (2,2), (4,2)]

        # Dictionary to keep track of whether each agent is carrying a block
        self.agent_carrying_block = {agent_index: False for agent_index in range(3)}

        # Start here
        self.pickup_locations = [(0,4), (1,3), (4,1)]
        self.dropoff_locations = [(0,0), (2,0), (3,4)]
        self.terminal_states = self.dropoff_locations
        
        # Set grid rewards for special cells
        for i in self.pickup_locations + self.dropoff_locations:
            self.rewards[i] = 13

        for i in self.pickup_locations:
            self.num_blocks[i] = 5

    # Set available actions
        self.actions = ['UP', 'DOWN', 'LEFT', 'RIGHT']

    # Method to set whether an agent is carrying a block
    def set_carrying_block(self, agent_index, carrying):
        self.agent_carrying_block[agent_index] = carrying

    # Method to check if an agent is carrying a block
    def is_carrying_block(self, agent_index):
        return self.agent_carrying_block[agent_index]
    
    # Method to return the num_blocks value for a given location
    def get_num_blocks_at_location(self, agent_index):
        location = self.agent_locations[agent_index]
        return self.num_blocks[location[0], location[1]]
    
    ## Returns possible actions
    def get_available_actions(self, agent_index):
        """Returns the available actions for the agent based on its current location and grid boundaries."""

        current_location = self.agent_locations[agent_index]
        available_actions = []

        # Check if the agent can move UP
        if current_location[0] > 0:
            available_actions.append('UP')

        # Check if the agent can move DOWN
        if current_location[0] < self.height - 1:
            available_actions.append('DOWN')

        # Check if the agent can move LEFT
        if current_location[1] > 0:
            available_actions.append('LEFT')

        # Check if the agent can move RIGHT
        if current_location[1] < self.width - 1:
            available_actions.append('RIGHT')
        return self.actions
    
    # Shows agent locations on grid (for debugging)
    def agents_on_map(self):
        
        grid = np.zeros(( self.height, self.width))
        for loc in self.agent_locations:
            grid[loc[0], loc[1]] = 1
        return grid
    
    def get_reward(self, new_location):
        """Returns the reward for an input position"""
        return self.rewards[ new_location[0], new_location[1]]
    
    def step(self, action, agent_index):
        """Moves the agent in the specified direction. If agent is at a border, agent stays still
        but takes negative reward. Function returns the reward for the move."""
        # Store previous location
        last_location = self.agent_locations[agent_index]
        
        # UP
        if action == 'UP':
            # If agent is at the top, stay still, collect reward
            if last_location[0] == 0:
                reward = self.get_reward(last_location)
            else:
                self.agent_locations[agent_index] = ( last_location[0] - 1, last_location[1])
                reward = self.get_reward(self.agent_locations[agent_index])
        
        # DOWN
        elif action == 'DOWN':
            # If agent is at bottom, stay still, collect reward
            if last_location[0] == self.height - 1:
                reward = self.get_reward(last_location)
            else:
                self.agent_locations[agent_index] = (last_location[0] + 1, last_location[1])
                reward = self.get_reward(self.agent_locations[agent_index])
            
        # LEFT
        elif action == 'LEFT':
            # If agent is at the left, stay still, collect reward
            if last_location[1] == 0:
                reward = self.get_reward(last_location)
            else:
                self.agent_locations[agent_index] = (last_location[0], last_location[1] - 1)
                reward = self.get_reward(self.agent_locations[agent_index])

        # RIGHT
        elif action == 'RIGHT':
            # If agent is at the right, stay still, collect reward
            if last_location[1] == self.width - 1:
                reward = self.get_reward(last_location)
            else:
                self.agent_locations[agent_index] = (last_location[0], last_location[1] + 1)
                reward = self.get_reward(self.agent_locations[agent_index])
                
        # PICKUP
        elif action == 'PICKUP':
            # If agent is at a pickup spot with block, carry 1 block, collect reward
 
            if self.num_blocks[last_location] > 0:
                self.num_blocks[last_location] -= 1  # Decrease the number of blocks at the pickup location

                self.set_carrying_block(agent_index, True)  # Set the carrying block flag to True for the agent
                
            reward = self.get_reward(self.agent_locations[agent_index])

        # PICKUP
        elif action == 'DROPOFF':
            # If agent is at a dropoff spot with block, drop 1 block, collect reward

            if self.num_blocks[last_location] < 5:
                self.num_blocks[last_location] += 1  # Decrease the number of blocks at the pickup location
                self.set_carrying_block(agent_index, False)  # Set the carrying block flag to True for the agent
                
            reward = self.get_reward(self.agent_locations[agent_index])

        return reward

    # for experiment 4 only
    def change_pickup(self):
        # Move the blocks from current pickup locations to new pickup locations

        new_pickup_locations = [(3, 1), (2, 2), (1, 4)]

        for loc in self.pickup_locations:
            num_blocks = int(self.num_blocks[loc])  # Get the number of blocks at this location
            self.num_blocks[loc] = 0  # Set the number of blocks at this location to zero
            self.rewards[loc] = -1
            # If the new location is a valid grid cell, place the blocks there

        for new_loc in new_pickup_locations:
            if new_loc[0] < self.height and new_loc[1] < self.width:
                self.num_blocks[new_loc] += num_blocks
                self.rewards[new_loc] = 13

    # Method to return the num_blocks value for a given location
    def get_num_blocks(self, location):
        return self.num_blocks[location[0], location[1]]
    
    def check_termination(self):
        for dropoff_location in self.dropoff_locations:
            if self.get_num_blocks(dropoff_location) != 5:
                return False
        return True

In [513]:
# def init_q_tables
class Q_table:

    def __init__(self, environment, agent_index, epsilon=0.2, alpha=0.3, gamma=0.5):
        self.environment = environment
        self.agent_index = agent_index
        self.q_table = dict() # Store all Q-values in dictionary of dictionaries 
        for x in range(environment.height): # Loop through all possible grid spaces, create sub-dictionary for each
            for y in range(environment.width):
                self.q_table[(x,y)] = {'UP':0, 'DOWN':0, 'LEFT':0, 'RIGHT':0} # Populate sub-dictionary with zero values for possible moves

        self.epsilon = epsilon
        self.alpha = alpha
        self.gamma = gamma

    def optimal_action_exploit(self, available_actions):
        """Returns the optimal action from Q-Value table. If multiple optimal actions, chooses random choice.
        Will make an exploratory random action dependent on epsilon."""

        if np.random.uniform(0,1) < self.epsilon:
            action = available_actions[np.random.randint(0, len(available_actions))]
        else:
            q_values_of_state = self.q_table[self.environment.agent_locations[self.agent_index]]
            
            # Filter out unavailable actions (like LEFT when the agent is at the most left side)
            available_q_values = {action: q_value for action, q_value in q_values_of_state.items() if action in available_actions}

            maxValue = max(q_values_of_state.values())
            action = np.random.choice([k for k, v in q_values_of_state.items() if v == maxValue])
        return action
            
        
    def learn(self, old_state, reward, new_state, action):
        """Updates the Q-value table using Q-learning"""
        q_values_of_state = self.q_table[new_state]
        max_q_value_in_new_state = max(q_values_of_state.values())
        current_q_value = self.q_table[old_state][action]
        
        self.q_table[old_state][action] = (1 - self.alpha) * current_q_value + self.alpha * (reward + self.gamma * max_q_value_in_new_state)

        
        return action
    
    

In [514]:
# Choose a random action
def choose_random_action( available_actions):
    """Returns a random choice of the available actions"""
    return np.random.choice(available_actions)

In [515]:
pd_world = PD_WORLD()
print(pd_world.num_blocks)
print(pd_world.pickup_locations)
print(pd_world.rewards)

[[0. 0. 0. 0. 5.]
 [0. 0. 0. 5. 0.]
 [0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0.]
 [0. 5. 0. 0. 0.]]
[(0, 4), (1, 3), (4, 1)]
[[13. -1. -1. -1. 13.]
 [-1. -1. -1. 13. -1.]
 [13. -1. -1. -1. -1.]
 [-1. -1. -1. -1. 13.]
 [-1. 13. -1. -1. -1.]]


In [517]:
index = 0
q_tables = []
for loc in pd_world.agent_locations: 
    q_tables.append(Q_table(pd_world, index, alpha=0.1))
    index += 1


In [518]:
# 1A) P Random

step = 0
cumulative_reward = 0
max_steps = 9000
learn = True
# Set a fixed seed for reproducibility
np.random.seed(42)
while step < max_steps: # Run until max steps or until game is finished
        
        if pd_world.check_termination():
                pd_world = PD_WORLD()

        for agent_index in range(3):
                agent_current_location = pd_world.agent_locations[agent_index]

                in_pickup_spot = (agent_current_location in pd_world.pickup_locations)
                #print("in pickup spot ", in_pickup_spot)
                in_dropoff_spot = (agent_current_location in pd_world.dropoff_locations)
                is_carrying_block = pd_world.is_carrying_block(agent_index)
                blocks_on_cell = pd_world.get_num_blocks_at_location(agent_index)
                
                # If pickup is applicable
                if(in_pickup_spot and is_carrying_block == False and blocks_on_cell > 0):
                        action = "PICKUP"
                        print("Pickup: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                        reward = pd_world.step(action, agent_index)

                # If dropoff is applicable
                elif(in_dropoff_spot and is_carrying_block == True and blocks_on_cell < 5):
                        action = "DROPOFF"
                        print("Dropoff: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                        reward = pd_world.step(action, agent_index)
                else:
                        action = choose_random_action(pd_world.get_available_actions(agent_index)) 
                        reward = pd_world.step(action, agent_index)
                cumulative_reward += reward
        step += 1
print("Reward: ", cumulative_reward) 
#print(pd_world.num_blocks)

Pickup:  (1, 3)  agent  0  Carrying block  False
Pickup:  (4, 1)  agent  1  Carrying block  False
Dropoff:  (0, 0)  agent  0  Carrying block  True
Dropoff:  (3, 4)  agent  1  Carrying block  True
Pickup:  (4, 1)  agent  0  Carrying block  False
Pickup:  (1, 3)  agent  1  Carrying block  False
Dropoff:  (0, 0)  agent  0  Carrying block  True
Pickup:  (1, 3)  agent  2  Carrying block  False
Dropoff:  (2, 0)  agent  1  Carrying block  True
Dropoff:  (0, 0)  agent  2  Carrying block  True
Pickup:  (0, 4)  agent  0  Carrying block  False
Pickup:  (4, 1)  agent  1  Carrying block  False
Pickup:  (0, 4)  agent  2  Carrying block  False
Dropoff:  (3, 4)  agent  2  Carrying block  True
Dropoff:  (3, 4)  agent  1  Carrying block  True
Pickup:  (1, 3)  agent  2  Carrying block  False
Dropoff:  (2, 0)  agent  0  Carrying block  True
Dropoff:  (3, 4)  agent  2  Carrying block  True
Pickup:  (4, 1)  agent  0  Carrying block  False
Pickup:  (0, 4)  agent  2  Carrying block  False
Pickup:  (1, 3)  age

In [529]:
# 1C) P Exploit

# initialize grid
pd_world = PD_WORLD()

# Initialize q tables
index = 0
q_tables = []
for loc in pd_world.agent_locations: 
    q_tables.append(Q_table(pd_world, index, alpha=0.3))
    index += 1

# First 500 steps in P random Policy
step = 0
cumulative_reward = 0
max_steps = 500

# Set a fixed seed for reproducibility
np.random.seed(42)
while step < max_steps: # Run until max steps o
        
        if pd_world.check_termination():
                pd_world = PD_WORLD()

        for agent_index in range(3):
                agent_current_location = pd_world.agent_locations[agent_index]

                in_pickup_spot = (agent_current_location in pd_world.pickup_locations)
                #print("in pickup spot ", in_pickup_spot)
                in_dropoff_spot = (agent_current_location in pd_world.dropoff_locations)
                is_carrying_block = pd_world.is_carrying_block(agent_index)
                blocks_on_cell = pd_world.get_num_blocks_at_location(agent_index)
                
                # If pickup is applicable
                if(in_pickup_spot and is_carrying_block == False and blocks_on_cell > 0):
                        action = "PICKUP"
                        #print("Pickup: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                        reward = pd_world.step(action, agent_index)

                # If dropoff is applicable
                elif(in_dropoff_spot and is_carrying_block == True and blocks_on_cell < 5):
                        action = "DROPOFF"
                        #print("Dropoff: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                        reward = pd_world.step(action, agent_index)
                else:
                        action = choose_random_action(pd_world.get_available_actions(agent_index)) 
                        reward = pd_world.step(action, agent_index)
                cumulative_reward += reward
        step += 1
#print("Reward: ", cumulative_reward) 
#print(pd_world.num_blocks)

# next 8500 steps of P exploit policy
step = 0

max_steps = 8500
learn = True

while step < max_steps: # Run until max steps
    if pd_world.check_termination():
                pd_world = PD_WORLD()

    for agent_index in range(3):

        agent_current_location = pd_world.agent_locations[agent_index]

        in_pickup_spot = (agent_current_location in pd_world.pickup_locations)
        #print("in pickup spot ", in_pickup_spot)
        in_dropoff_spot = (agent_current_location in pd_world.dropoff_locations)
        is_carrying_block = pd_world.is_carrying_block(agent_index)
        blocks_on_cell = pd_world.get_num_blocks_at_location(agent_index)
        
        # If pickup is applicable
        if(in_pickup_spot and is_carrying_block == False and blocks_on_cell > 0):
                action = "PICKUP"
                #print("Pickup: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                reward = pd_world.step(action, agent_index)

        # If dropoff is applicable
        elif(in_dropoff_spot and is_carrying_block == True and blocks_on_cell < 5):
                action = "DROPOFF"
                #print("Dropoff: ", pd_world.agent_locations[agent_index], " agent ", agent_index, " Carrying block ", is_carrying_block )
                reward = pd_world.step(action, agent_index)

        else:
            # IF PICKUP AND DROPOFF NOT APPLICABLE
            action = q_tables[agent_index].optimal_action_exploit(pd_world.get_available_actions(agent_index)) 
            reward = pd_world.step(action, agent_index)
            new_state = pd_world.agent_locations[agent_index]
            #print(action, " agent ", agent_index, " new state ", new_state)
            

            q_tables[agent_index].learn(agent_current_location, reward, new_state, action)
                
        cumulative_reward += reward
    step += 1
print("Reward: ", cumulative_reward) 
#print(pd_world.num_blocks)

                #if environment.check_state() == 'TERMINAL': # If game is in terminal state, game over and start next trial
                    #environment.__init__()
                    #game_over = True 


Reward:  107638.0
