In [1]:
import numpy as np
import random
import matplotlib.pyplot as plt
import math
from collections import namedtuple

# First define a namedtuple to hold useful information for actions
Action = namedtuple('Action', 'name index delta_i delta_j' )
up = Action('up', 0, -1, 0)    
down = Action('down', 1, 1, 0)    
left = Action('left', 2, 0, -1)    
right = Action('right', 3, 0, 1) 

# Use a dictionary to convert indices to actions using the index as a key
# Useful for sampling actions for a given state
index_to_actions = {}
for action in [up, down, left, right]:
    index_to_actions[action.index] = action 
    
# Helpful function to convert action in string format to the action object
str_to_actions = {}
for action in [up,down,left,right]:
    str_to_actions[action.name] = action

# Base Class

The class defined below contains the basic information necessary to define a RL environment. The subsequent classes add further 
functionality used for different purposes.

The class contains:
- A step function, that takes an action and returns information from the environment. Note that it returns a boolean 'push', which is an addition to the values typically returned
- A display function, that prints an image of the current state of the environment, along with the agent. 
- A reset function, that resets the environment to its starting state.


In [2]:

class Boxworld():
    
    def __init__(self, N):
        self.boxworld = np.zeros((N,N))
        self.size = N
        
        # Set corners to obstacles
        self.boxworld[0, :] = 1 #top row
        self.boxworld[:, 0] = 1 # left column
        self.boxworld[-1, :] = 1 # bottom row
        self.boxworld[:, -1] = 1 #right column
        
        # Three of the four corners will be possible exits
        # The other will be a fake exit, which incurs a penalty upon reaching
        self.exits = [[1,1],[1,-2],[-2,1],[-2,-2] ]
        np.random.shuffle(self.exits)
        
        
        fake_exit = self.exits[0]
        self.boxworld[fake_exit[0],fake_exit[1]] = 5
        
        # Set other corners to exits
        for corner in self.exits[1:4]:
            self.boxworld[corner[0],corner[1]] = 3
        
       
        # Agent's position will be determined upon reset
        self.position_agent = None
        
        self.starting_box_pos = np.asarray(self.get_empty_cells(1))
        self.position_box = self.starting_box_pos
        
        # Assign random index for each coordinate in the grid
        index_coords = np.arange(0,N*N)
        np.random.shuffle(index_coords)
        self.coord_to_index = index_coords.reshape(N,N)
        
        # Then combine pairs of indices to get a single state for the environment 
        index_states = np.arange(0,N**4)
        np.random.shuffle(index_states)
        self.index_pairs_to_state = index_states.reshape(N*N,N*N)
        
                                        
        # run time
        self.time_elapsed = 0
        self.time_limit = self.size**4
        
        # display help
        self.dict_map_display={ 0:'.',
                                1:'X',
                                2:'B',
                                3:'E',
                                4:'A',
                                5:'!'}
        
        
        # Assign random index for each coordinate in the grid
        index_coords = np.arange(0,N*N)
        np.random.shuffle(index_coords)
        self.coord_to_index = index_coords.reshape(N,N)
        
        # Then combine pairs of indices to get a single state for the environment 
        index_states = np.arange(0,N**4)
        np.random.shuffle(index_states)
        self.index_pairs_to_state = index_states.reshape(N*N,N*N)
        

    # Function that takes an action in string form, and returns coordinates after the given action.
    def next_pos(self,str_act, old_pos):
        action = str_to_actions[str_act]
        next_position = [old_pos[0]+action.delta_i,old_pos[1]+action.delta_j]
        return np.array(next_position)
        
        
    # Given an action, recieve information from the environment
    def step(self, action: str):
        
        reward = -1
        bump = False
        push = False
        done = False
        
        # Store new agent position
        next_agent_position = self.next_pos(action, self.position_agent)

        # First check if agent bumps into wall directly
        if self.boxworld[next_agent_position[0],next_agent_position[1]] ==1:
            bump = True
            
        
        # Now see if agent pushes box
        if (next_agent_position == self.position_box).all():
            
            push= True

            
            # Find new box position
            next_box_position = self.next_pos(action, self.position_box)

            
            # Check if box get pushed into wall
            if self.boxworld[next_box_position[0], next_box_position[1]] == 1:
                reward-=10
            
            # Otherwise move agent and box
            if self.boxworld[next_box_position[0], next_box_position[1]] in [0,3,4]:
                self.position_box = next_box_position
                self.position_agent = next_agent_position
                
                
        # Case where agent moves into any other valid cell 
        elif self.boxworld[next_agent_position[0],next_agent_position[1]] !=1:
            self.position_agent = next_agent_position
                
        # Calculate rewards
        
        box_cell_type = self.boxworld[self.position_box[0],self.position_box[1]]
        if box_cell_type == 3:
            reward+=self.size**2
            done = True
            
        if box_cell_type==5:
            reward-=self.size**2
            #print("Episode Failed! (Reached dead state)")
            done = True

        # Penalise any kind of bump
        if bump:
            reward -=5
        
        # Reward any kind of push
        if push:
            reward+=1
            
        # get observations & state
        obs = self.calculate_observations()
                                                
        # Update time
        self.time_elapsed +=1
        if self.time_elapsed == self.time_limit:
            done = True
            print("Time limit expired!")

        
        # Get state of environment
        state = self.coord_to_index[ self.position_agent[0], self.position_agent[1]]
        
        # TO FIX
        # Note we return push for stochastic case
        return state, reward, done, push
    
    def display(self):
        
        envir_with_agent = self.boxworld.copy() 
        envir_with_agent[self.position_agent[0], self.position_agent[1]] = 4
        envir_with_agent[self.position_box[0], self.position_box[1]] = 2
        full_repr = ""

        for r in range(self.size):
            
            line = ""
            
            for c in range(self.size):

                string_repr = self.dict_map_display[ envir_with_agent[r,c] ] #display
                
                line += "{0:2}".format(string_repr)

            full_repr += line + "\n"

        print(full_repr)

        
    def calculate_observations(self):
        
        agent_coordinates = self.position_agent
        box_coordinates = self.position_box 
        
        # Calculate  the squares between the agent and box?
        distance = abs(agent_coordinates[0] - box_coordinates[0]) + abs(agent_coordinates[1] - box_coordinates[1]) 
        obs = {'Squares between agent and box': distance}
        return obs
        

    def get_empty_cells(self, n_cells): 
        empty_cells_coord = np.where( self.boxworld == 0 ) #find empty cells
        selected_indices = np.random.choice( np.arange(len(empty_cells_coord[0])), n_cells ) 
        selected_coordinates = empty_cells_coord[0][selected_indices], empty_cells_coord[1][selected_indices] 
        if n_cells == 1:
            return np.asarray(selected_coordinates).reshape(2,) 
        
        return selected_coordinates
        
    def reset(self):
        """
        This function resets the environment to its original state (time = 0).
        The box is placed at its original starting location determined upon initialization of the environment
        Then it places the agent and exit at new random locations.
        
        """
        self.time_elapsed = 0
        
        # position of the agent is a numpy array
        empty_cells = np.asarray(self.get_empty_cells(1))
        self.position_agent = empty_cells
        self.position_box = self.starting_box_pos

        
        # In the case where box and agent reset to same location, repeat until they no longer do
        if (self.position_agent == self.position_box).all():
                self.reset()
                
        # Same for case where box resets to an exit
        #for exit in self.exits:
            #if (self.position_box == exit).all():
                #self.reset()

        # Calculate observations
        observations = self.calculate_observations()
        
        
        state = self.index_pairs_to_state[ self.coord_to_index[self.position_agent[0], self.position_agent[1]],
                                          self.coord_to_index[self.position_box[0],self.position_box[1]] ]
        
        return state

# Example

(You can uncomment the whole cell by selecting the contents of it and pressing ctr + /)

In [6]:
# boxworld = Boxworld(7)
# boxworld.reset()
# boxworld.display()

X X X X X X X 
X ! . . . E X 
X . . . . . X 
X . . . . . X 
X . . . A . X 
X E B . . E X 
X X X X X X X 



In [7]:
# state, rew, done, push = boxworld.step('up')
# print("Reward = ",rew," : done = ",done," : push = ",push)
# boxworld.display()

Reward =  -1  : done =  False  : push =  False
X X X X X X X 
X ! . . . E X 
X . . . . . X 
X . . . A . X 
X . . . . . X 
X E B . . E X 
X X X X X X X 



# Dynamic Programming

The following class subclasses from the original base class, adding reward/transition matrices.
It also adds a function to display reward matrices.

This environment is used for dynamic programming algorithms, where the full dynamics of the system must be known

In [10]:

class Boxworld_DP(Boxworld):
    
    def __init__(self,N):
        
        super().__init__(N)
        
        #Reward/transition/Value matrices
        self.reward_matrix = -1*np.ones( (N**4,4))
        self.transition_matrix = np.zeros((N**4,4,N**4))
        
        
        #Fill the matrices
        # Iterate over all possible box & agent locations
        for k in range(1,N-1):
            for l in range(1,N-1):
                for i in range(1,N-1):
                    for j in range(1,N-1):
                        
                        # First calculate current indices, state, cell type
                        current_agent_index = self.coord_to_index[i,j]
                        current_box_index = self.coord_to_index[k,l]
                        current_state = self.index_pairs_to_state[current_box_index,current_agent_index]
                        current_cell = self.boxworld[i,j]

                    
                        for action in [up,left,down,right]:
                            # Lookahead one step for agent
                            dest_coords = [i+action.delta_i,j+action.delta_j]
                            destination_cell = self.boxworld[dest_coords[0],dest_coords[1]]
                            next_agent_index = self.coord_to_index[dest_coords[0],dest_coords[1]]                            
                            
                            #Write NAN when box and agent overlap...
                            #if current_agent_index == current_box_index:
                                #self.reward_matrix[current_state, action.index] = np.NAN
                            
                            # Check for direct bump between agent and wall
                            if destination_cell == 1:
                                self.transition_matrix[current_state, action.index, current_state] = 1 
                                self.reward_matrix[current_state, action.index] += -5   
                        
                            # Case when agent moves to other valid cell
                            if destination_cell in [0,3]:
                                next_state = self.index_pairs_to_state[current_box_index,next_agent_index]
                                self.transition_matrix[current_state,action.index,next_state] = 1
                            
                            # Case when agent pushes box
                            if next_agent_index == current_box_index:
                                
                                # Update reward matrix for a push
                                self.reward_matrix[current_state,action.index]+=1
                                
                                # Case when agent pushes box into wall
                                if self.boxworld[i+(2*action.delta_i),j+(2*action.delta_j)] ==1:
                                    self.transition_matrix[current_state, action.index, current_state] = 1
                                    self.reward_matrix[current_state,action.index]+=-10
                                
                                # Else look at cases when push is valid
                                else:
                                    # Now calculate one-step lookahead for box
                                    box_dest_coords = [i + (2 * action.delta_i), j + (2*action.delta_j)]
                                    destination_box_cell = self.boxworld[box_dest_coords[0], box_dest_coords[1]]
                                    next_box_index = self.coord_to_index[box_dest_coords[0], box_dest_coords[1]]
                                    next_state = self.index_pairs_to_state[next_box_index,next_agent_index]
                                    
                                    # Transition is valid so update the matrix
                                    self.transition_matrix[current_state,action.index,next_state] = 1
                                    
                                    # Update reward matrix for a valid push
                                    self.reward_matrix[current_state,action.index]+=1
                                    
                                    # Case when box is pushed to correct location
                                    if destination_box_cell == 3:
                                        self.reward_matrix[current_state,action.index]+=N**2
                                        
                                    # Case when box is pushed to incorrect corner
                                    if destination_box_cell == 5:
                                        self.reward_matrix[current_state,action.index] += -N**2
                                        
    # A function to print a  'snapshot' of possible rewards from current timestep
    def print_reward_matrices(self):
        
        for action in [up,down,left,right]:
            
            # Get index of box coordinates
            box_index = self.coord_to_index[self.position_box[0],self.position_box[1]]

            
            # New matrix to display rewards            
            reward_matrix =  np.zeros( (self.size,self.size) )
            
            for i in range(1, self.size-1):
                    for j in range(1, self.size-1):

                        agent_index = self.coord_to_index[i,j]
                        state = self.index_pairs_to_state[box_index,agent_index]
                        reward_matrix[i,j] = self.reward_matrix[state, action.index]
                        
            print( action.name)
            print(reward_matrix)  
            

# Example 

In [9]:
# boxworld_dp = Boxworld_DP(7)
# boxworld_dp.reset()
# boxworld_dp.display()
# boxworld_dp.print_reward_matrices()

X X X X X X X 
X ! B . . E X 
X . . A . . X 
X . . . . . X 
X . . . . . X 
X E . . . E X 
X X X X X X X 

up
[[  0.   0.   0.   0.   0.   0.   0.]
 [  0.  -6.  -6.  -6.  -6.  -6.   0.]
 [  0.  -1. -10.  -1.  -1.  -1.   0.]
 [  0.  -1.  -1.  -1.  -1.  -1.   0.]
 [  0.  -1.  -1.  -1.  -1.  -1.   0.]
 [  0.  -1.  -1.  -1.  -1.  -1.   0.]
 [  0.   0.   0.   0.   0.   0.   0.]]
down
[[ 0.  0.  0.  0.  0.  0.  0.]
 [ 0. -1. -1. -1. -1. -1.  0.]
 [ 0. -1. -1. -1. -1. -1.  0.]
 [ 0. -1. -1. -1. -1. -1.  0.]
 [ 0. -1. -1. -1. -1. -1.  0.]
 [ 0. -6. -6. -6. -6. -6.  0.]
 [ 0.  0.  0.  0.  0.  0.  0.]]
left
[[  0.   0.   0.   0.   0.   0.   0.]
 [  0.  -6.  -1. -48.  -1.  -1.   0.]
 [  0.  -6.  -1.  -1.  -1.  -1.   0.]
 [  0.  -6.  -1.  -1.  -1.  -1.   0.]
 [  0.  -6.  -1.  -1.  -1.  -1.   0.]
 [  0.  -6.  -1.  -1.  -1.  -1.   0.]
 [  0.   0.   0.   0.   0.   0.   0.]]
right
[[ 0.  0.  0.  0.  0.  0.  0.]
 [ 0.  1. -1. -1. -1. -6.  0.]
 [ 0. -1. -1. -1. -1. -6.  0.]
 [ 0. -1. -1. -1. -1. -6.  0.]

# Stochastic Environment

The following environment also derives from the base environment, but adds some new stochastic dynamics. Each time the agent attempts to push the box, there is a %10 chance their hands will slip, causing the box to be pushed an additional cell (the agent will also slip forward one cell)

**The environment needs to know whether the agent attempts to push the box in order to see if it slips, hence the 'push' return value 

In [28]:

class Boxworld_Stochastic(Boxworld):
    
    def __init__(self,N):
        
        super().__init__(N)
        
        
    def step(self, action : str):
        
        state, rew, done, push = super().step(action)
        
        # Note that a push ending the game will not cause a slip
        if done or not push:
            return state, rew, done
        
        else:

            # There is a 10% chance the box slips an extra cell
            prob_slip = 0.1
            
            slip = random.random() < prob_slip
            
            # If box doesn't slip, nothing else to do
            if not slip:
                return state, rew, done
            
            # Otherwise, take another step
            else:
                state, add_rew, done, _ = super().step(action)
                
                # Don't penalize for second step, and don't increment timesteps.
                add_rew += 1
                self.time_elapsed -= 1
                
                return state, rew + add_rew, done

# Example

In [29]:
# boxworld_stochastic = Boxworld_Stochastic(7)
# boxworld_stochastic.reset()
# boxworld_stochastic.display()

X X X X X X X 
X E . . . E X 
X . . . . . X 
X . . . . . X 
X . . B A . X 
X ! . . . E X 
X X X X X X X 



In [30]:
# state, rew, done = boxworld_stochastic.step('down')
# boxworld_stochastic.display()

X X X X X X X 
X E . . . E X 
X . . . . . X 
X . . . . . X 
X . . B . . X 
X ! . . A E X 
X X X X X X X 



# Experiments

Below are some functions for running episodes on the different variations of Boxworld, useful for training.

In [31]:
# Function to run a single episode in a fresh instance of a given environment and policy
def run_single_exp(env,policy):
    state = env.reset()
    done = False
    total_reward = 0
    while not done:
        action = policy(state)
        state, reward, done,_ = env.step(action)
        total_reward+=reward
        
    return total_reward


# Function to run a number of experiments, returning summarized information of all the runs
def run_experiments(envir, policy, number_exp):
    
    all_rewards = []
    
    for i in range(number_exp):
        
        final_reward = run_single_exp(envir, policy)
        all_rewards.append(final_reward)
    
    max_reward = max(all_rewards)
    mean_reward = np.mean(all_rewards)
    var_reward = np.std(all_rewards)
    
    return all_rewards, max_reward, mean_reward, var_reward