# Import needed modules

In [1]:
from agents import *
from itertools import combinations
from random import choice
from typing import List
import collections
import numpy as np
import sys

# Only if needed for new python versions
if sys.version_info >= (3, 10):
    import collections
    collections.Iterable = collections.abc.Iterable
    collections.Sequence = collections.abc.Sequence

# Easy environment 

In [2]:
class SimpleRoomba(Agent):
    location = [0,1]
    direction = Direction("down")
    
    def moveforward(self, success=True):
        '''moveforward possible only if success (i.e. valid destination location)'''
        if not success:
            return
        if self.direction.direction == Direction.R:
            self.location[0] += 1
        elif self.direction.direction == Direction.L:
            self.location[0] -= 1
        elif self.direction.direction == Direction.D:
            self.location[1] += 1
        elif self.direction.direction == Direction.U:
            self.location[1] -= 1
    
    def turn(self, d):
        self.direction = self.direction + d
        
    def suck(self, thing):
        '''returns True upon success or False otherwise'''
        if isinstance(thing, Dirt):
            return True
        return False
        
def program(percepts):
    '''Returns an action based on it's percepts'''
        
    for p in percepts: # first eat or drink - you're a dog!
        if isinstance(p, Dirt):
            return 'suck'
        if isinstance(p,Bump): # then check if you are at an edge and have to turn
            turn = False
            choice = random.choice((1,2));
        else:
            choice = random.choice((1,2,3,4)) # 1-right, 2-left, others-forward
    if choice == 1:
        return 'turnright'
    elif choice == 2:
        return 'turnleft'
    else:
        return 'moveforward'

In [3]:
class VacumEnv(GraphicEnvironment):
    def percept(self, agent):
        '''return a list of things that are in our agent's location'''
        things = self.list_things_at(agent.location)
        loc = copy.deepcopy(agent.location) # find out the target location
        #Check if agent is about to bump into a wall
        if agent.direction.direction == Direction.R:
            loc[0] += 1
        elif agent.direction.direction == Direction.L:
            loc[0] -= 1
        elif agent.direction.direction == Direction.D:
            loc[1] += 1
        elif agent.direction.direction == Direction.U:
            loc[1] -= 1
        if not self.is_inbounds(loc):
            things.append(Bump())
        return things
    
    def execute_action(self, agent, action):
        '''changes the state of the environment based on what the agent does.'''
        if action == 'turnright':
            print('{} decided to {} at location: {}'.format(str(agent)[1:-1], action, agent.location))
            agent.turn(Direction.R)
        elif action == 'turnleft':
            print('{} decided to {} at location: {}'.format(str(agent)[1:-1], action, agent.location))
            agent.turn(Direction.L)
        elif action == 'moveforward':
            print('{} decided to move {}wards at location: {}'.format(str(agent)[1:-1], agent.direction.direction, agent.location))
            agent.moveforward()
        elif action == "suck":
            items = self.list_things_at(agent.location, tclass=Dirt)
            if len(items) != 0:
                if agent.suck(items[0]):
                    print('{} suck {} at location: {}'
                          .format(str(agent)[1:-1], str(items[0])[1:-1], agent.location))
                    self.delete_thing(items[0])
                    
    def is_done(self):
        '''By default, we're done when we can't find a live agent, 
        but to prevent killing our cute dog, we will stop before itself - when there is no more food or water'''
        no_edibles = not any(isinstance(thing, Dirt) for thing in self.things)
        return no_edibles

## Generate all possible dirt configurations on a 4 * 4 grid

In [4]:
dirt_configs = []
for i in range(1, 17):
    for combo in combinations(range(16), i):
        config = [0] * 16
        for index in combo:
            config[index] = 1
        dirt_configs.append(config)
        
room = VacumEnv(2,2, color={'SimpleRoomba': (200,0,0), 'Dirt': (125, 100, 80)})
roomba = SimpleRoomba(program)
dirt = Dirt()
room.add_thing(roomba, [0,0])
room.add_thing(dirt, [1,1])
print("Roomba started at [0,0], facing down")
room.run(step=100,delay=0)

Roomba started at [0,0], facing down


SimpleRoomba decided to turnleft at location: [0, 0]


SimpleRoomba decided to move rightwards at location: [0, 0]


SimpleRoomba decided to turnright at location: [1, 0]


SimpleRoomba decided to move downwards at location: [1, 0]


SimpleRoomba ate Dirt at location: [1, 1]


It is completed


# Complex environment 

### Define our Agenet Class
> The agent is a vacum cleaner

Has the following capabilities:

- Whathever

In [5]:
class Roomba(Agent):
    def MoveRight(self) -> None:
        '''MoveRight possible only if success (i.e. valid destination location)'''
        if self.collided[0]:
            return
        self.location = (self.location[0] + 1, self.location[1])
        
    def MoveDown(self) -> None:
        '''MoveDown possible only if success (i.e. valid destination location)'''
        if self.collided[1]:
            return
        self.location = (self.location[0], self.location[1] + 1)
            
    def MoveLeft(self) -> None:
        '''MoveLeft possible only if success (i.e. valid destination location)'''
        if self.collided[2]:
            return
        self.location = (self.location[0] - 1, self.location[1])
        
    def MoveUp(self) -> None:
        '''MoveUp possible only if success (i.e. valid destination location)'''
        if self.collided[3]:
            return
        self.location = (self.location[0], self.location[1] - 1)
        
    def Suck(self, thing):
        '''returns True upon success or False otherwise'''
        if isinstance(thing, Dirt):
            return True
        return False

### Enviroment
> Defining a Vacumm Cleaner environment

Our vacum cleaner envirnment has blah blah balh

- Good 
- Awsome
- and everythign else

---

TODO:

- [X] Add Obstacle, almost correct placement still need fixes though
- [X] Make obstacle work
- [X] Collision needs to be fixed 
- [ ] Add walls in the corners if there are walls near it in both direction

### Define the agant program
> Silly reflex agent movments

It has the following moves: 

- Attack 
- Just kidding

---

Then use the agent in the rumba enviroment for a run

In [6]:
class Roomba_Env(XYEnvironment):
    # Define members of the class Roomba_Env
    # ______________________________________
    
    # Probability to spawn a pit in a location
    dirt_probability = random.uniform(0.1, 0.3)
    # Probability to spawn an obstacle in a location
    obstacle_probability = random.uniform(0.1, 0.9) 
    
    # Define the global variable for the colors
    colors = {
        #'Obstacle': (80, 80, 80),
        'Obstacle': (44, 53, 57),
        'Wall': (44, 53, 57),
        'Roomba': (200, 0, 0),
        'Dirt': (125, 100, 80)}
    #________________________________________
    
    # Room should be random size + 3
    def __init__(self, verbose = False, random = True, width=9, height=9):
        if random == True:
            # Generate random dimensions between 9 and 15
            width = np.random.randint(9, 15)
            height = np.random.randint(9, 15)
        
        self.verbose = verbose
        
        # Call the above class with the custom size
        super().__init__(width, height)
        
        # Creates a grid
        self.grid = BlockGrid(width, height, fill=(200, 200, 200))
        self.bounded = True
        
        self.init_world()
           
    def add_walls(self) -> None:
        """Put walls around the entire perimeter of the grid."""
        num_layers = 2
        for layer in range(num_layers):
            for x in range(layer, self.width - layer):
                if layer == 0 or random.random() < (num_layers - layer) / num_layers:
                    self.add_thing(Wall(), (x, layer))
                if layer == 0 or random.random() < (num_layers - layer) / num_layers:
                    self.add_thing(Wall(), (x, self.height - 1 - layer))
                    
            for y in range(layer + 1, self.height - 1 - layer):
                if layer == 0 or random.random() < (num_layers - layer) / num_layers:
                    self.add_thing(Wall(), (layer, y))
                if layer == 0 or random.random() < (num_layers - layer) / num_layers:
                    self.add_thing(Wall(), (self.width - 1 - layer, y))
                    
        # Check corners of the second layer
        corners = [(1, 1), (1, self.height - 2), (self.width - 2, 1), (self.width - 2, self.height - 2)]

        # Check each corner
        for x, y in corners:
            # Add a wall if there are 4 things
            if sum(1 for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]
                        for _ in self.list_things_at((x + dx, y + dy))) == 4:
            
                self.add_thing(Wall(), (x, y), True)
 
                    
    def add_obstacles(self, x_bounds: List[int], y_bounds: List[int]) -> None:
        """Add objects inside grid that work like walls."""
        # first 4 all essentialy because we do not want chess grid like walls 
        # The second 2 are becuase we do not want full lines of walls + obstacles which block the agent
        surroundings = [(1 , 1), (-1, -1), (-1, 1), (1, -1), (1, 0), (0, 1)]
        
        # Loops to add the obstacles
        for x in range(*x_bounds):
            for y in range(*y_bounds):
                if random.random() < self.obstacle_probability:
                    # Rivedi  
                    # Check if the location is not a wall and has at least one free adjacent cell in every direction of movement
                    if all(self.is_inbounds((x + dx, y + dy)) and \
                        not any((isinstance(thing, Wall) or isinstance(thing, Obstacle)) for thing in self.list_things_at((x + dx, y + dy))) for dx, dy in surroundings):
                        self.add_thing(Obstacle(), (x, y), True)
                        
    def add_dirt(self, x_bounds: List[int], y_bounds: List[int]) -> None:
        """Add dirt inside grid"""
        for x in range(*x_bounds):
            for y in range(*y_bounds):
                if random.random() < self.dirt_probability and \
                not any((isinstance(thing, Obstacle) or (isinstance(thing, Wall))) for thing in self.list_things_at((x, y))):
                    # Then you can add the dirt
                    self.add_thing(Dirt(), (x, y), True)
                    
    def init_world(self,) -> None:
        """Spawn items in the world based on probabilities from the book"""
        # Bounds for the objects and dirt to not overlap with the walls
        x_bounds = [1, self.x_end - 1]
        y_bounds = [1, self.y_end - 1]
    
        # add walls objects and dit 
        self.add_walls()
        self.add_obstacles(x_bounds, y_bounds)
        self.add_dirt(x_bounds, y_bounds)

        
    def get_world(self) -> List[List[object]]:
        """Returns all the items in the world in a format
        understandable by the ipythonblocks BlockGrid."""
        result = []
        x_start, y_start = (0, 0)
        x_end, y_end = self.width, self.height

        for x in range(x_start, x_end):
            row = []
            for y in range(y_start, y_end):
                row.append(self.list_things_at((x, y)))
            result.append(row)
            
        return result
    
    def percepts_from(self, agent, location, tclass=Thing):
        """Return percepts from a given location"""
        thing_percepts = {
            Obstacle: Bump(),
            Wall: Bump(),
        }
        # Create the results of what is in that position
        result = [thing_percepts.get(thing.__class__, thing) for thing in self.list_things_at(location)]
        
        return result if len(result) else [None]

    def percept(self, agent):
        """Return things in adjacent (not diagonal) cells of the agent.
        Result format: [Left, Right, Up, Down, Center / Current location]"""
        x, y = agent.location
        result = []
        # Clockwise movements remember that the grid you seed as y fliipped
        result.append(self.percepts_from(agent, (x + 1, y))) # Right
        result.append(self.percepts_from(agent, (x, y + 1))) # Down: really important
        result.append(self.percepts_from(agent, (x - 1, y))) # Left 
        result.append(self.percepts_from(agent, (x, y - 1))) # Up: be careful (-1) is up
        result.append(self.percepts_from(agent, (x, y))) # In that position
        
        # List of percepts
        return result
    
    def execute_action(self, agent, action) -> None:
        '''changes the state of the environment based on what the agent does.'''
        agent.collided = 4 * [False]
        
        # Get the percepts that I care about
        percepts = self.percept(agent)[:-1]
        
        # Create an array with True if obstacles are present in that direction and False if not
        obstacle = [isinstance(p[0], Bump) for p in percepts]
        
        # Directions are clockwise from 0 -> move right, 1 -> move down ...
        movement  = {
            'MoveRight': agent.MoveRight,
            'MoveDown': agent.MoveDown,
            'MoveLeft': agent.MoveLeft,
            'MoveUp': agent.MoveUp,
        }
        
        # Check for all possible movments
        if action in movement:
            if self.verbose:
                print(f'{str(agent)[1:-1]} decided to {action} at location: {agent.location}')
            
            # Get the index
            index = list(movement.keys()).index(action)
           
            # if the agents bumps set collision in that direction to true
            if obstacle[index] == True:
                agent.collided[index] = True

            # Do your action
            movement[action]()
            agent.performance -= 1
            
        # Else check for the action of sucking Dirt
        elif action == "Suck":
            items = self.list_things_at(agent.location, tclass=Dirt)
            if items: # which is the same as asking if len(items) != 0:
                if agent.Suck(items[0]):
                    if self.verbose:
                        print(f'{str(agent)[1:-1]} suck {str(items[0])[1:-1]} at location: {agent.location}, :)')
                    self.delete_thing(items[0])
                    agent.performance += 100
                    
    def is_done(self):
        no_dirt = False # True when there is no dirt left
        if not any(isinstance(thing, Dirt) for thing in self.things):
            if self.verbose:
                print("There is not dirt left")
            no_dirt = True 
        return no_dirt # Return if the game is ended
    
    # Some small overrides to pre existing functions
    # ________________________________________________
    
    def run(self, steps=100, delay=0, draw=False):
        for step in range(steps):
            self.update(delay, draw)
            if self.is_done():
                break
            self.step()
        self.update(delay,draw)
        
    def update(self, delay=0, draw=False):
        sleep(delay)
        self.reveal(draw)

    def reveal(self, draw=False):
        self.draw_world()
        # apply changes to the same grid instead
        # of making a new one.
        if draw==True:
            clear_output(1) # This shows the enviroment
            self.grid.show()
        
    def draw_world(self):
        self.grid[:] = (200, 200, 200)
        world = self.get_world()
        for x in range(0, len(world)):
            for y in range(0, len(world[x])):
                if len(world[x][y]):
                    self.grid[y, x] = self.colors[world[x][y][-1].__class__.__name__]

#### Define the program for the simple reflex agent
> The simple reflex agent will happly bump against a wall as many times as he wants

In [7]:
def simple_program(percepts):
    '''Returns an action based on it's percepts'''
    # If you are on top of dirt Suck since percepts[-1][0] is what the agent is on top on
    # and percepts[-1][1] is agent itselfe
    if isinstance(percepts[-1][0], Dirt):
        return 'Suck'
   
    # Chooses a random direction
    directions = ['MoveRight', 'MoveDown', 'MoveLeft', 'MoveUp']
    return random.choice(directions)

##### Function to place the agent in a random location

In [8]:
# Define a function to get a random position that is not on top of walls, dirt, or obstacles
def get_random_position(room):
    while True:
        x = random.randint(1, room.width - 2)
        y = random.randint(1, room.height - 2)
        # Good way to write the if statement
        objects = [Wall, Dirt, Obstacle]
        if not any(isinstance(obj, var) for var in objects for obj in room.list_things_at((x, y))):
            return (x, y)

### Finally run the enviroment for tot steps

In [9]:
# Create the enviroment with the correct program
room = Roomba_Env(verbose=True)

# Add the Roomba object to the environment at a random position
room.add_thing(Roomba(simple_program), get_random_position(room), True)

# Run the rumba for x step with 0 delay because we want to be fast but still show the cool drawings
room.run(steps = 10, delay = 0, draw=True)

Roomba decided to MoveRight at location: (3, 9)


Roomba decided to MoveUp at location: (4, 9)


Roomba decided to MoveLeft at location: (4, 8)


Roomba decided to MoveRight at location: (4, 8)


Roomba decided to MoveRight at location: (5, 8)


Roomba decided to MoveRight at location: (6, 8)


Roomba decided to MoveUp at location: (7, 8)


Roomba decided to MoveRight at location: (7, 8)


Roomba decided to MoveLeft at location: (8, 8)


Roomba decided to MoveDown at location: (7, 8)


### Define the program for an agent which avoids the walls
> The simple reflex agent will happly bump against a wall as many times as he wants

In [10]:
def wall_avoid_program(percepts):
    '''returns an action based on it's percepts'''
    # if you are on top of dirt suck since percepts[-1][0] is what the agent is on top on
    # and percepts[-1][1] is agent itselfe
    if isinstance(percepts[-1][0], Dirt):
        return 'Suck'
        
    directions = {0: 'MoveRight', 1: 'MoveDown', 2: 'MoveLeft', 3: 'MoveUp'}

    # Check for percepts near you
    for i, p in enumerate(percepts[:-1]):
        # then check if it bumps against something 
        if isinstance(p[0], Bump):
            # remove the direction corresponding to the index of the if case
            del directions[i]
    
    # Chooses a random direction
    return random.choice(list(directions.values()))

In [11]:
# Create the enviroment with the correct program
room = Roomba_Env(verbose=True)

# Add the Roomba object to the environment at a random position
room.add_thing(Roomba(wall_avoid_program), get_random_position(room), True)

# Run the rumba for x step with 0 delay because we want to be fast but still show the cool drawings
room.run(steps = 10, delay = 0, draw=True)

Roomba decided to MoveLeft at location: (7, 9)


Roomba suck Dirt at location: (6, 9), :)


Roomba decided to MoveUp at location: (6, 9)


Roomba decided to MoveUp at location: (6, 8)


Roomba decided to MoveDown at location: (6, 7)


Roomba decided to MoveUp at location: (6, 8)


Roomba decided to MoveUp at location: (6, 7)


Roomba decided to MoveDown at location: (6, 6)


Roomba decided to MoveUp at location: (6, 7)


Roomba decided to MoveUp at location: (6, 6)


### Defining the test function 
> The definition can be found inside agents.py

In [12]:
# Import for progress bar
from tqdm import tqdm

# Custom implementation to allows the agent to be placed in a random position
def test_agent(AgentFactory, steps, envs):
    def compute_score(env):
        agent = AgentFactory()
        env.add_thing(agent, get_random_position(room), True) # maybe
        # env.add_thing(agent) # maybe it already places in a correct location questionable
        env.run(steps)
        return agent.performance
    
    def mean_score_with_progress(envs):
        scores = []
        for env in tqdm(envs):
            score = compute_score(env)
            scores.append(score)

        return mean(scores)

    return mean_score_with_progress(envs)

#### Testing in parallel: 

- ##### Simple Reflex agent
- ##### Wall avoiding simple agent

In [None]:
import threading

def run_test_agent(program, steps, envs):
    print(f'The {program} achived: {test_agent(lambda: Roomba(program), steps, envs)}')

if __name__ == '__main__':
    n_envs = 100
    steps = 500
    envs = [Roomba_Env() for _ in range(n_envs)]
    t1 = threading.Thread(target=run_test_agent, args=(simple_program, steps, envs))
    # t2 = threading.Thread(target=run_test_agent, args=(wall_avoid_program, steps, envs))
    t1.start()
    # t2.start()
    t1.join()
    # t2.join()

 27%|████████████████████████████████████████████                                                                                                                       | 27/100 [01:01<02:09,  1.78s/it]