In [1]:
import numpy as np
import pandas as pd

In [2]:
class RlAgent:
    """RL agent for the Roomba"""

    def __init__(self, state_ranges):
        dims = [x[1]+1 for x in state_ranges]
        dims.append(8)
        self.q : np.ndarray = np.ones(dims, dtype="float64") 
        self.state = 0
        self.next_state = 0
        self.reward = 0
        self.action = 0
        self.turn = 0
        self.epsilon = 0.2
        self.annealing_rate = 1
        self.alpha = 0.1
        self.gamma = 0.9
        self.eta = 0.1
        self.number_of_states = self.q.size
        self.number_of_actions = 8 # 4 cardinal directions for power on and off
        self.verbose = False

    def get_number_of_states(self):
        return self.number_of_states

    def get_number_of_actions(self):
        return self.number_of_actions

    def e_greedy(self, actions: np.ndarray):
        self.epsilon *= self.annealing_rate
        a_star_idx = np.argmax(actions)
        rng = np.random.default_rng()
        if self.epsilon <= rng.random():
            if self.verbose: print('Exploit: {}'.format(a_star_idx))
            return a_star_idx
        else:
            b = actions.size
            idx = rng.integers(low=0, high=b)
            if self.verbose: print('Explore: {}'.format(idx))
            return idx

    def select_action(self, state: tuple[int, int, int, int, int, int, int]) -> int:
        self.turn += 1
        self.state = state
        actions = self.q[state[0], state[1], state[2], state[3], state[4], state[5], state[6], ]
        action = self.e_greedy(actions)
        self.action = action
        return action

    def update_q(self, old_state, action, reward, new_state, done):
        q = self.q[*old_state, action]
        q_prime = max(self.q[*new_state, ])
        if done:
            self.q[*old_state, action] = q + self.eta * (reward - q)
        else:
            self.q[*self.state, self.action] = q + self.eta * (reward + (self.gamma * q_prime) - q)

In [3]:
import random

def generate_roomba_map(size_x: int, size_y: int, dirt_probability:float = 0.2, seed: int | None = None) ->tuple[list[list[str]], tuple[int, int]]:
    """
    Generates a 16x16 tile map for a Roomba simulation.
    
    Args:
    dirt_probability: The probability of a tile being dirty (float between 0 and 1).
    seed: The seed for the random number generator.  Makes generation repeatable.
    
    Returns:
    A 2D list representing the map.
    """
    
    if seed is not None:
        random.seed(seed)

    # Create a 16x16 grid with walls around the perimeter
    map = [['#' for _ in range(size_x)] for _ in range(size_y)]

    # Choose a random side for the charging station
    side = random.randint(0, 3)

    rnd_x = random.randint(1, size_x-2) # left-right placement
    rnd_y = random.randint(1, size_y-2) # top-down placement
    if side == 0:  # Top
        map[1][rnd_x] = '@'
        start_pos = (1, rnd_x) # first-row, random width placement
    elif side == 1:  # Bottom
        map[size_y-2][rnd_x] = '@'
        start_pos = (size_y-2, rnd_x)
    elif side == 2: # Left
        map[rnd_y][1] = '@'
        start_pos = (rnd_y, 1)
    elif side == 3: # Right
        map[rnd_y][size_x-2] = '@'
        start_pos = (rnd_y, size_x-2)
    else:
        raise RuntimeError("Unexpected side value")
      
    for row in range(1, size_y-1):
        for col in range(1, size_x-1):
            if map[row][col] == '@':
                continue
            # Add a clear tile with a chance of dirt
            if random.random() < dirt_probability:
                map[row][col] = '*'
            else:
                map[row][col] = '.'

    return map, start_pos

# Example usage:
room_map, charging_base = generate_roomba_map(8,16, dirt_probability=0.3, seed=42)

# Print the map
for row in room_map:
  print(' '.join(row)) 
print(charging_base)

# # # # # # # #
# @ * * . . . #
# * . * * . * #
# * . . * . . #
# * . . . * . #
# . * * . . . #
# . . . . . . #
# . . . . * * #
# * * * * * . #
# . . * * . . #
# . * . * . . #
# . . . . . * #
# * . * * . . #
# . . . . . * #
# * . * . . . #
# # # # # # # #
(1, 1)


In [4]:
import copy
import random
import math

class RoombaEnv():

    NUMBER_OF_ACTIONS = 8 #  N_on, N_off, S_on, S_off, E_on, E_off, W_on, W_off
    NUMBER_OF_BATTLVL = 4 # High, Medium, Low, Dead
    NUM_CLEANLINESS_LEVELS = 3 # Clean, Dirty, Impassable
    BATTERY_SIZE = 4
    
    LOOK_UP: dict[str, int] = {'#': 0, "*": 1, "@": 2, ".": 2}
    
    def _roomba_reset(self, es_flag: bool, charge: float | None):
        """
        Reset roomba
        :param es_flag: exploring starts 
        :param charge: initial charge percentage (float between 0 and 1)
        """
        if es_flag:
            while True:
                pos = (random.randint(0, len(self.map[0])), random.randint(0, len(self.map)))
                if self.map[pos[0]][pos[1]] != '#':
                    break
        else:
            pos = self.charge_loc
        
        self.pos: tuple[int, int] = pos
        
        self.battery_lvl: float = math.ceil(len(self.map) * len(self.map[0]) * self.BATTERY_SIZE * charge)
        self.battery_cap: float = len(self.map) * len(self.map[0]) * self.BATTERY_SIZE
        self.clean_success: bool = False        
        self.prev_action = None
        self.curr_action = None
                

    def __init__(self, map: list[list[str]], charging_location: (int, int), 
                 es_flag: bool = False, charge: float | None = 1, verbose: bool = False):
        """
        Set class properties with environment constants
        
        """
        
        self.verbose = verbose
        self.starting_map = copy.deepcopy(map)
        self.map = map
        self.charge_loc = charging_location
        self.wall_hit = False
        # assumes rectangular map
        self.total_states = (len(map) * len(map[0]) * 
                             self.NUMBER_OF_BATTLVL * pow(self.NUM_CLEANLINESS_LEVELS, 4))        
        self._roomba_reset(es_flag, charge)
        

    def get_number_of_states(self) -> int:
        # Environment constant pass thru
        return self.total_states
    
    def get_state_dims(self) -> tuple[tuple[int, int], tuple[int, int], 
                                      tuple[int, int], tuple[int, int],
                                      tuple[int, int], tuple[int, int], tuple[int, int]]:
        """
        Get the dimensions of each parameter of the state.  This helps initialize a model for many algorithms
        :return: Tuple of tuples representing the range of each parameter of the state.
        """
        return ((0, len(self.map[0])-1), (0, len(self.map)-1), (0, self.NUMBER_OF_BATTLVL-1), 
                (0, 2), (0, 2), (0, 2), (0, 2))

    def get_number_of_actions(self) -> int:
        # Environment constant pass thru
        return self.NUMBER_OF_ACTIONS

    def reset(self, es_flag: bool = False, charge: float | None = 1) \
            -> tuple[int, int, int, int, int, int, int]:
        """
        Reset the state of the game to a determined start_state if es_flag is False
        Otherwise if es_flag is True then reset game to a random start state
        Return the resulting state for agent to act on.
        """
        if charge is None:
            charge = random.random()
        self.map = copy.deepcopy(self.starting_map)
        self._roomba_reset(es_flag, charge)
        
        return self.get_state()

    def get_state(self) -> tuple[int, int, int, int, int, int, int]:
        """
        Return current environment state.
        Return a tuple of the current state of the roomba. (x, y, charge, 4 x [peeks])
        """
        return (self.pos[0], self.pos[1], 
                int(math.ceil(self.battery_lvl/(self.battery_cap/3))), 
                self.LOOK_UP[self.map[self.pos[0]][self.pos[1]+1]], # Look East
                self.LOOK_UP[self.map[self.pos[0]+1][self.pos[1]]], # Look South
                self.LOOK_UP[self.map[self.pos[0]][self.pos[1]-1]], # Look West
                self.LOOK_UP[self.map[self.pos[0]-1][self.pos[1]]]) # Look North


    def step(self, action: int) -> (tuple[int, int, int, int, int, int, int], float, bool):
        """
        Given an action, determine the resulting next_state. 
        Based on next_state determine the resulting reward for getting there.
        Update current environment state and find if it is an end state.
        """
        next_state = self._perform_action(action)
        done = self._get_terminal_flag()
        reward = self._get_reward(action, done)
        return next_state, reward, done

    def _perform_action(self, action: int) ->  tuple[int, int, int, int, int, int, int]:
        """
        action: 'N_off'=0, 'S_off'=1, 'E_off'=2, 'W_off'=3, 'N_on'=4, 'S_on'=5, 'E_on'=6, 'W_on'=7
        """
        self.prev_action = self.curr_action
        self.curr_action = action
        
        ## Find battery level
        power_on = action > 3
        self.battery_lvl -= 0.09 * 10 # default power reduction
        if power_on:
            self.battery_lvl -= 0.1 * 10 # power is on, reduce power more
        self.battery_lvl = max(self.battery_lvl, 0)
        if self.verbose:
            print("Battery level: ", self.battery_lvl)

        ## Check map movement
        new_loc = [self.pos[0], self.pos[1]]
        old_loc = new_loc.copy() # For verbose tracking purposes
        action_mod = action % 4
        if action_mod == 0:
            new_loc[0] -= 1
            if self.verbose: print('North')
        elif action_mod == 1:
            new_loc[0] += 1
            if self.verbose: print('South')
        elif action_mod ==2:
            new_loc[1] += 1
            if self.verbose: print('East')
        else:
            new_loc[1] -= 1
            if self.verbose: print('West')
        new_loc_tile = self.map[new_loc[0]][new_loc[1]]
        
        self.clean_success = False
        self.wall_hit = False
        if new_loc_tile != '#':
            self.pos = (new_loc[0], new_loc[1]) # not a wall so update
            if self.verbose: print(self.pos)
            if new_loc_tile == '*' and power_on:
                self.map[self.pos[0]][self.pos[1]] = '.'
                self.clean_success = True            
        else:
            self.wall_hit = True
            if self.verbose:
                print('Hit a wall, revert')
        
        if self.verbose:
            print("Current spot: {} ({},{})".format(self.map[old_loc[0]][old_loc[1]], 
                                                    old_loc[0], old_loc[1]))
            print("Next spot: {} ({},{})".format(new_loc_tile, new_loc[0], new_loc[1]))
            xmap = copy.deepcopy(self.map)
            xmap[self.pos[0]][self.pos[1]] = 'O'
            for row in xmap:
                print(' '.join(row))
        
        return self.get_state()

    def _get_reward(self, action: int, term: bool) -> float:
        """
        Determines reward for agent
        """
        reward = -1.0
        if self.wall_hit:
            reward -= 10
        
        if action > 3:
            if self.clean_success:
                reward += 10
            else:
                reward -= 10
                
        if term:
            dirty_count = sum([x.count('*') for x in self.map])
            if self.battery_lvl == 0 or self.battery_lvl == 3:
                reward -= dirty_count * 50
            else:
                reward -= dirty_count * 10

        if self.prev_action is not None and self.curr_action % 4 == self.prev_action % 4:
            reward += 0
        else:
            reward -= 15
        return reward

    def _get_terminal_flag(self) -> bool:
        """
        Return if current state is in a list of set terminal states
        """
        return self.map[self.pos[0]][self.pos[1]] == '@' or self.battery_lvl == 0
    
    def get_clean_level(self) -> float:
        return 1- sum([x.count('*') for x in self.map]) / (len(self.map) * len(self.map[0]))
    
    def get_dirty_count(self) -> float:
        return sum([x.count('*') for x in self.map])


In [None]:
# Need to train on different maps, full and sparse, and across the 3 different power levels.

SEED = 42
SIZE = 16
agent = None
episodes = 8000

def main():
    global agent # for debugging, keeping agent to call in later cells
    
    #show map
    disp_map, charging_base = generate_roomba_map(SIZE, SIZE, dirt_probability=0.8, seed=SEED)
    for row in disp_map:
        print(' '.join(row))
    
    environment = RoombaEnv(disp_map, charging_base)
    agent = RlAgent(environment.get_state_dims())
    for i in range(episodes):
        # reset the game and observe the current state
        agent.epsilon = 0.8 - (i/episodes)*0.8 # testing out episodic decay of epsilon
        current_state = environment.reset()
        game_end = False
        total_reward = 0
        # Do until the game ends:
        while not game_end:
            action = agent.select_action(current_state)
            new_state, reward, game_end = environment.step(action)
            agent.update_q(current_state, action, reward, new_state, game_end)
            current_state = new_state
            total_reward += reward
        if i % 1000 == 0:
            print("Game Over, clean level: {}, dirty count: {}, reward: {}".
                  format(environment.get_clean_level(),
                  environment.get_dirty_count(),
                  total_reward))
    with open('Project1.txt', 'wt') as f:
        print(agent.q, file=f)
        
    print("Test the policy: ")
    agent.verbose = True
    agent.epsilon = 0
    current_state = environment.reset()
    environment.verbose = True
    game_end = False
    # Do until the game ends:
    count = 0
    while not game_end:
        action = agent.select_action(current_state)
        new_state, reward, game_end = environment.step(action)
        current_state = new_state
        count += 1
        # Allow verbose to see maps, but not every map, too much
        if count % 100 == 0:
            agent.verbose = True
        else:
            agent.verbose = False
    print("\nProgram completed successfully.")
    for row in environment.map:
        print(' '.join(row))

main()

In [None]:
# independently test other maps with trained agent
print("Test the policy: ")
disp_map, charging_base = generate_roomba_map(SIZE, SIZE, dirt_probability=0.8, seed=SEED)
environment = RoombaEnv(disp_map, charging_base)
agent.verbose = True
agent.epsilon = 0 # Full exploit?
current_state = environment.reset()
environment.verbose = True
game_end = False
count = 0
# Do until the game ends:
while not game_end:
    action = agent.select_action(current_state)
    new_state, reward, game_end = environment.step(action)
    current_state = new_state
    count += 1
    # Allow verbose to see maps, but not every map, too much
    if count % 100 == 0:
        agent.verbose = True
    else:
        agent.verbose = False
print("\nProgram completed successfully.")
for row in environment.map:
    print(' '.join(row))