In [None]:
import math
import agentpy as ap
import numpy as np
# Visualization
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt
import IPython
from IPython.display import display, HTML

In [None]:
class RandomAgent(ap.Agent):
    """ 
    Se mueve a celdas al azar
    Cuando se encuentre una celda con una sola caja, la recoge
    Si esta cargando una caja y ve otra caja, las apila
    """
    
    
    def setup(self):
        self.intention = None  # Intención actual (pick_up, stack, move o wait)
        self.intention_position = None  # Posicion bjetivo al hacer accion
        
        self.nearby_single_boxes = []  # Coordenadas de las cajas cercanas, en stacks de 1 sola caja
        self.nearby_box_stack = []  # Coordenadas de stacks de cajas cercanos. Stacks de mas de 1 caja
        self.has_box = False
        
        self.looking_direction = "Up"  # To facilitate later designing a simulation
        
    def setup_position(self, start_position):
        self.position = start_position # Posición del agente (x, y)
        
    def see(self):
        """ Percibe su entorno """
        x, y = self.position
        self.nearby_single_boxes = []
        self.nearby_box_stack = []
        
        neighbors = self.model.get_neighbors(x, y) 
        for neighbor_pos in neighbors:
            i, j = neighbor_pos
            if not self.model.is_valid_position(neighbor_pos):
                continue
            
            if self.model.box_count[i][j] == 1:
                self.nearby_single_boxes.append(neighbor_pos)
            
            if 1 <= self.model.box_count[i][j] <= 4: # Make sure no more than 5!
                self.nearby_box_stack.append(neighbor_pos)  # Stacks de 1 cuentan para iniciar un stack
    
    def next(self):
        """ Decide la próxima acción según lo que percibe """
        self.intention = "wait" # por defecto espera
        
        temporary_intention = "wait"
        if not self.has_box and len(self.nearby_single_boxes) > 0:
            temporary_intention = "pick_up"
            self.intention_position = self.model.nprandom.choice(self.nearby_single_boxes)
        elif self.has_box and len(self.nearby_box_stack) > 0:
            temporary_intention = "stack"
            self.intention_position = self.model.nprandom.choice(self.nearby_box_stack)
        else:
            temporary_intention = "move"
            self.intention_position = self.model.random.choice(
                self.model.get_neighbors(self.position[0], self.position[1])
            )
            
        if self.model.is_valid_position(self.intention_position):
            self.intention = temporary_intention
                
    def action(self):
        """ Ejecuta la accion """
        x, y = self.intention_position
        if self.intention == "stack":
            self.stack_boxes(x, y)
        elif self.intention == "pick_up":
            self.pick_box(x, y)
        elif self.intention == "move" and self.model.is_valid_move_position(self.intention_position):
            self.position = self.intention_position
            
    def move(self):
        if not self.model.is_floor_organized:
            self.model.total_moves += 1
        self.see()
        self.next()
        self.action()
        
    
    """ Aux functions """
    # Assumes that given position is valid
    def pick_box(self, x, y):
        self.updateLookingDirection(x, y)
        if self.model.update_box_count(x, y, -1):
            self.has_box = 1
    
    # Assumes that given position is valid
    def stack_boxes(self, x, y): 
        self.updateLookingDirection(x, y)
        if self.model.update_box_count(x, y, 1):
            self.has_box = 0
    
    def updateLookingDirection(self, to_x, to_y):
        from_x, from_y = self.position
        
        # TODO: Verify which side is up in simulation. Rn positive x is down, positive y is right
        if to_x > from_x:
            self.looking_direction = "down"
        elif to_x < from_x:
            self.looking_direction = "up"
        elif to_y > from_y:
            self.looking_direction = "right"
        elif to_y < from_y:
            self.looking_direction = "left"  


In [None]:
class BoxWarehouseModel(ap.Model):
    """ Modelo de limpieza de tablero """
    
    def setup(self):
        
        self.box_count, self.agent_positions = self.generateFloorPlan(self.p.total_boxes)
        
        self.random_agents = ap.AgentList(self, 5, RandomAgent)
        
        # TODO: self.box_agents = ap.AgentList(self, 5, BoxAgent) # I think its only one agent, so it can have this name
        
        self.all_agents = self.random_agents # be careful, there can only be 5 of them
        for idx, agent in enumerate(self.all_agents):
            if idx > len(self.agent_positions): 
                raise RuntimeError("There cannot be more than 5 agents in total!")
            agent.setup_position(self.agent_positions[idx])

        
        # Variables para generar estadisticas
        self.total_moves = 0
        self.is_floor_organized = True if self.p.total_boxes <= 0 else False # Es falso a menos que no haya cajas
        self.finish_time = self.p.steps + 1 # Por default no termina en tiempo


    def step(self):
        self.all_agents.move()

    def update(self):
        # TODO: All recording of agent positions and actions can be done here
        pass

    def end(self):
        # print()
        # if not self.is_board_clean:
        #     print("La limpieza no acabo, se llego al tiempo maximo")
        # else:
        #     print(f"Tiempo de limpieza: {time_taken} pasos")
            
        # print(f"Porcentaje de celdas limpiadas: {cleaned_percentage:.2f}%")
        # print(f"Porcentaje de celdas limpias: {clean_percentage:.2f}%")
        # print(f"Total de movimientos: {self.total_moves}")
        
        self.report('total_moves', self.total_moves)
        self.report('time_taken', self.finish_time)
        self.report('percentage_time_taken', self.finish_time / self.p.steps)
        
    
    
    """ Aux functions """
    
    def generateFloorPlan(self, k):
        """
        Genera una matriz n x m donde habrán k cajas y 5 robots repartidos alrededor de todo el piso del almacén.
        k está limitado a n x m - 5, ya que también se repartirá aleatoriamente la posición de 5 robots (agentes)
        """
        # Inicializamos la matriz con ceros
        box_count = [[0 for _ in range(self.p.m)] for _ in range(self.p.n)]

        # Generamos las posibles posiciones de las celdas
        all_positions = [(i, j) for i in range(self.p.n) for j in range(self.p.m)]
        
        # Demasiadas cajas
        if k + 5 > len(all_positions):
            raise RuntimeError("Too many boxes for the floor size")

        # Elegimos 
        unique_positions: list = self.nprandom.choice(all_positions, k + 5, replace=False)
        box_positions = unique_positions[ : -5] # All elements but the last 5
        agent_positions = unique_positions[-5 : ] # Only last 5 elements

        # Marcamos las celdas seleccionadas como sucias
        for i, j in box_positions:
            box_count[i][j] = 1

        return [box_count, agent_positions]
    
    def is_valid_position(self, position):
        """ Verifica si una posición es valida dentro de nuestro tablero """
        x, y = position
        return 0 <= x < self.p.n and 0 <= y < self.p.m

    def is_valid_move_position(self, position):
        """ Verifica si una posición es valida dentro de nuestro tablero """
        x, y = position
        if not (0 <= x < self.p.n and 0 <= y < self.p.m):
            return False
        
        for agent in self.all_agents:
            if agent.position[0] == x and agent.position[1] == y:
                return False
        
        if self.box_count[x][y] > 0:
            return False
        
        return True
    
    def get_neighbors(self, x, y):
        """ regresa lista con todas las celdas vecinas de la posicion x, y"""
        neighbors = []
        
        for dx in range(-1, 2):  # Iterata -1, 0, 1
            for dy in range(-1, 2):  # Itera -1, 0, 1
                # Saltamos la celda actual y diagonales, robots solo ven en direcciones cardinales
                if abs(dx) == abs(dy): 
                    continue
                neighbors.append((x + dx, y + dy)) 
    
        return neighbors

    # TODO: Definir concretamente organización de cajas
    def update_box_count(self, x, y, dif):
        """ 
        Suma dif a self.box_count[x][y] y verifica si se terminaron de organizar las cajas.
        EN ESTE MOMENTO, organizar las cajas significa que todas las cajas esten en una pila de al menos 2 cajas
        Return True si realizo la operacion correctamente, False si no
        """
        # Not a valid move
        if self.box_count[x][y] + dif < 0 or self.box_count[x][y] + dif > 5:
            return False
        
        self.box_count[x][y] += dif
        # print("bye ", x, y, " changing by ", dif, " to now ", self.box_count[x][y])

        alone_box = 0
        for i in range(self.p.n):
            for j in range(self.p.m):
                alone_box += (1 if self.box_count[i][j] == 1 else 0)
        
        self.is_floor_organized = (alone_box == 0)
        
        # Termina la simulacion cuando se organizan todas las cajas
        if self.is_floor_organized:
            self.stop()
            
        return True


In [None]:
def multirun_experiment():
    # ciudado! Se usara 2 ** simulations_log_2. No hagas este valor muy alto
    simulations_log_2 = 2
    iterations_per_sample = 2 
    parameters = {
        'random_agents': 0,
        'sergio_agents': 0,
        'rodrigo_agents': 0,
        'oscar_agents': 0,
        'pepe_agents': 0,
        'hector_agents': 10,
        'steps': ap.IntRange(10, 20*20), # From very low, to a very high value
        'n': ap.IntRange(1, 20),
        'm': ap.IntRange(1, 20),
        'percentage_dirty': ap.IntRange(1, 100),
        'initial_position': (0, 0),
        'seed': 22,
    }

    agent_types = [ 
                    'sergio_agents',
                    'rodrigo_agents',
                    'oscar_agents',
                    'pepe_agents',
                    'hector_agents',
                    ]

    runs = {}
    for agent in agent_types: 
        # if agent != 'sergio_agents':  # Uncomment to test only your agent
        #     continue
        
        # Asign valid values to current agent only
        for other_agent in agent_types:
            if agent == other_agent:
                parameters[other_agent] = ap.IntRange(1, 50)
            else: 
                parameters[other_agent] = 0
                
        sample = ap.Sample(
            parameters,
            n= 2 ** simulations_log_2,
            method='saltelli',
            calc_second_order=False
        )
        exp = ap.Experiment(CleaningModel, sample, iterations=iterations_per_sample, record=True)
        results = exp.run()
        reporters_df = results.reporters
        
        curr_runs = {
            'A': 0,
            'B': 0,
            'C': 0,
            'D': 0,
            'not_finished': 0,
            'total': 0,
        }
        
        curr_runs['total'] = len(reporters_df)
        curr_runs['A'] = (reporters_df['percentage_time_taken'] <= 0.25).sum() / curr_runs['total']
        curr_runs['B'] = (reporters_df['percentage_time_taken'] <= 0.50).sum() / curr_runs['total']
        curr_runs['C'] = (reporters_df['percentage_time_taken'] <= 0.75).sum() / curr_runs['total']
        curr_runs['D'] = (reporters_df['percentage_time_taken'] <= 1.00).sum() / curr_runs['total']

        curr_runs['not_finished_probability'] = (reporters_df['percentage_time_taken'] > 1.00).sum() / curr_runs['total']
        
        curr_runs['A'] = round(curr_runs['A'], 3)
        curr_runs['B'] = round(curr_runs['B'], 3)
        curr_runs['C'] = round(curr_runs['C'], 3)
        curr_runs['D'] = round(curr_runs['D'], 3)
        curr_runs['not_finished_probability'] = round(curr_runs['not_finished_probability'], 3)
        runs[agent] = curr_runs
        
    
    return runs
        

In [None]:
def test_time_taken():
    # ciudado! Se usara 2 ** simulations_log_2. No hagas este valor muy alto
    simulations_log_2 = 2
    iterations_per_sample = 2 
    parameters = {
        'random_agents': 0,
        'sergio_agents': 0,
        'rodrigo_agents': 0,
        'oscar_agents': 0,
        'pepe_agents': 0,
        'hector_agents': 0,
        'steps': 400, 
        'n': 13,
        'm': 13,
        'percentage_dirty': 70,
        'initial_position': (0, 0),
        'seed': 22,
    }

    agent_types = [ 
                    'sergio_agents',
                    'rodrigo_agents',
                    'oscar_agents',
                    'pepe_agents',
                    'hector_agents',
                    ]

    agents_time_df = pd.DataFrame(columns=['time_taken', 'agent_amount', 'agent_type'])
    for agent in agent_types: 
        # if agent != 'sergio_agents':  # Uncomment to test only your agent
        #     continue
        
        # Asign valid values to current agent only
        for other_agent in agent_types:
            if agent == other_agent:
                parameters[other_agent] = ap.IntRange(1, 50)
            else: 
                parameters[other_agent] = 0
                
        sample = ap.Sample(
            parameters,
            n= 2 ** simulations_log_2,
            method='saltelli',
            calc_second_order=False
        )
        exp = ap.Experiment(CleaningModel, sample, iterations=iterations_per_sample, record=True)
        results = exp.run()
        
        # Update agents vs time_taken
        agent_df = results.parameters.sample
        agent_df = agent_df[[agent]]
        reporters_df : pd.DataFrame = results.reporters[['time_taken', 'percentage_time_taken', 'total_moves']]
        reporters_df = reporters_df[reporters_df['percentage_time_taken'] <= 1] # Filter only finished runs to clean
        
        reporters_df_time = reporters_df[['time_taken']]
        temp_agent_time_df = reporters_df_time.merge(agent_df, on='sample_id', how='left')
        temp_agent_time_df = temp_agent_time_df.reset_index()
        temp_agent_time_df = temp_agent_time_df[['time_taken', agent]]
        temp_agent_time_df.columns = ['time_taken', 'agent_amount'] 
        temp_agent_time_df['agent_type'] = agent
        agents_time_df = pd.concat([agents_time_df, temp_agent_time_df])
        
        
    
    sns.scatterplot(data=agents_time_df, x='agent_amount', y='time_taken', hue='agent_type', s=40)
    plt.title('Agent amount vs time taken to clean all')
    # Adjust Layout
    plt.tight_layout()
    plt.show()
        

In [None]:
def test_mov_amount():
    # ciudado! Se usara 2 ** simulations_log_2. No hagas este valor muy alto
    simulations_log_2 = 2
    iterations_per_sample = 2 
    parameters = {
        'random_agents': 0,
        'sergio_agents': 0,
        'rodrigo_agents': 0,
        'oscar_agents': 0,
        'pepe_agents': 0,
        'hector_agents': 0,
        'steps': 400, 
        'n': 13,
        'm': 13,
        'percentage_dirty': 70,
        'initial_position': (0, 0),
        'seed': 22,
    }

    agent_types = [ 
                    'sergio_agents',
                    'rodrigo_agents',
                    'oscar_agents',
                    'pepe_agents',
                    'hector_agents',
                    ]

    agents_mov_df = pd.DataFrame(columns=['total_moves', 'agent_amount', 'agent_type'])
    for agent in agent_types: 
        # if agent != 'sergio_agents':  # Uncomment to test only your agent
        #     continue
        
        # Asign valid values to current agent only
        for other_agent in agent_types:
            if agent == other_agent:
                parameters[other_agent] = ap.IntRange(1, 50)
            else: 
                parameters[other_agent] = 0
                
        sample = ap.Sample(
            parameters,
            n= 2 ** simulations_log_2,
            method='saltelli',
            calc_second_order=False
        )
        exp = ap.Experiment(CleaningModel, sample, iterations=iterations_per_sample, record=True)
        results = exp.run()
        
        # Update agents vs total_movements
        agent_df = results.parameters.sample
        agent_df = agent_df[[agent]]
        reporters_df : pd.DataFrame = results.reporters[['time_taken', 'percentage_time_taken', 'total_moves']]
        reporters_df = reporters_df[reporters_df['percentage_time_taken'] <= 1] # Filter only finished runs to clean
        
        reporters_df_moves = reporters_df[['total_moves']]
        temp_agent_mov = reporters_df_moves.merge(agent_df, on='sample_id', how='left')
        temp_agent_mov = temp_agent_mov.reset_index()
        temp_agent_mov = temp_agent_mov[['total_moves', agent]]
        temp_agent_mov.columns = ['total_moves', 'agent_amount']
        temp_agent_mov['agent_type'] = agent
        agents_mov_df = pd.concat([agents_mov_df, temp_agent_mov])
        
    
    sns.scatterplot(data=agents_mov_df, x='agent_amount', y='total_moves', hue='agent_type', s=40)
    plt.title('Agent amount vs total movements done')

        

In [None]:
def show_animation():
    parameters = {
        'steps': 100,
        'n': 20,
        'm': 20,
        'total_boxes': 250,
        # 'seed': 22,
    }

    def animation_plot(model, ax):
        color_dict = {
            0: '#000000',  # Black for empty cells
            1: '#006600',  # Dark green for cells with boxes
            2: '#FF0000'   # Red for agents
        }
        
        # Create base visualization array
        ndarray_doubles = np.zeros_like(model.box_count, dtype=float)
        
        # Mark cells with boxes
        for i in range(len(model.box_count)):
            for j in range(len(model.box_count[0])):
                if model.box_count[i][j] > 0:
                    ndarray_doubles[i][j] = 1
        
        # Mark agent positions
        for agent in model.all_agents:
            x, y = agent.position
            ndarray_doubles[x][y] = 2
        
        # Plot the base grid
        ap.gridplot(ndarray_doubles, ax=ax, color_dict=color_dict, convert=True)
        
        # Add text annotations for box counts
        max_dim = max(parameters['n'], parameters['m'])
        font_size = min(-1/6 * max_dim + 13.33, 10)
        font_size = font_size if font_size >= 4 else 0 # just dont show if its too small

        for i in range(len(model.box_count)):
            for j in range(len(model.box_count[0])):
                if model.box_count[i][j] > 0:
                    # Add text with white color for better visibility on dark green
                    ax.text(j, i, str(model.box_count[i][j]), 
                        ha='center', va='center', color='white',
                        fontweight='bold', fontsize=font_size)
        
        ax.set_title(f"Box Warehouse Model\nTime-step: {model.t}")
        
    fig, ax = plt.subplots()
    model = BoxWarehouseModel(parameters)
    animation = ap.animate(model, fig, ax, animation_plot)
    return animation
    

In [None]:
# Grafica A, B, C, D
def plot_results(runs):
    categories = ['A', 'B', 'C', 'D']
    agents = list(runs.keys())
    data = {category: [runs[agent][category] for agent in agents] for category in categories}

    x = np.arange(len(agents))
    width = 0.2  

    fig, ax = plt.subplots(figsize=(10, 6))  
    for i, category in enumerate(categories):
        ax.bar(x + i * width, data[category], width, label=category)

    ax.set_xlabel('Agent Types')
    ax.set_ylabel('Proportion')
    ax.set_title('Comparison of Results by Agent Type')
    ax.set_xticks(x + width * (len(categories) - 1) / 2)
    ax.set_xticklabels([name.split('_')[0].capitalize() for name in agents], rotation=45, ha='right')  
    ax.legend()

    plt.tight_layout()  
    plt.show()


In [None]:
options = {
    'show_animation': True,
    'multirun_experiment': False, # Not tested
    'test_time_taken': False, # Not tested
    'test_mov_amount': False, # Not tested
}

In [None]:
if options['show_animation']:
    animation = show_animation()
    display(
        HTML(animation.to_jshtml())
    )    

In [None]:
if options['multirun_experiment']:
    results = multirun_experiment()
    plot_results(results)
    print(results)

In [None]:
if options['test_time_taken']:
    test_time_taken()

In [None]:
if options['test_mov_amount']:
    test_mov_amount()