In [2]:
!pip install agentpy pathfinding owlready2

Collecting agentpy
  Downloading agentpy-0.1.5-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.9/53.9 kB[0m [31m923.3 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting pathfinding
  Downloading pathfinding-1.0.9-py3-none-any.whl (22 kB)
Collecting owlready2
  Downloading owlready2-0.45.tar.gz (27.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.3/27.3 MB[0m [31m23.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting SALib>=1.3.7 (from agentpy)
  Downloading salib-1.4.7-py3-none-any.whl (757 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m758.0/758.0 kB[0m [31m37.4 MB/s[0m eta [36m0:00:00[0m
Collecting multiprocess (from SALib>=1.3.7->agentpy)
  Downloading multiprocess-0.70.15-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━

In [3]:
import agentpy as ap
import pathfinding as pf        #In case you want to use pathfinding algorithms for the agent's plan
import matplotlib.pyplot as plt
from owlready2 import *
import itertools
import random
import IPython
import math

In [45]:
#DO NOT EXECUTE MORE THAN ONCE, THE ONTOLOGY WILL ACCUMULATE INFORMATION AND LEAD TO ERRORS

#Simple general ontology using Owlready2, of course you can use an ontology from an OWL file

#If you need to execute this cell again you can do it deleting the ontology as follows:
#onto.destroy(update_relation = True, update_is_a = True)

# Ontology name and path
onto = get_ontology("file:///content/coin_onto.owl")

# La ontología
with onto:

    # SuperClass
    class Entity(Thing):
        pass

    class Montacargas(Entity):
        pass

    class Caja(Entity):
        pass

    class Place(Thing):
        pass

    # Propiedad para describir el lugar de una entidad en el almacén.
    class is_in_place(ObjectProperty):
        domain = [Entity]
        range = [Place]
        pass

    # Propiedad que especifica la posición de un Place.
    class at_position(DataProperty,FunctionalProperty):
        domain = [Place]
        range = [str]
        pass

    # Propiedad que describe cuántas cajas puede ver un agente.
    class cajas_within_reach(ObjectProperty):
        domain = [Montacargas]
        range = [int]


In [100]:
# CLASE MONTACARGAS

class MontacargasAgent(ap.Agent):

    def __init__(self, *args, **kwargs):
      super().__init__(*args, **kwargs)
      self.collision_occurred = False

    # FUNCIONALIDAD BDI:

    def see(self,e):
      """
      Función que percibe y encuentra todos los agentes Caja (agentType 1) que
      están dentro del rango de visión del agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.
      - e (Environment): El ambiente que contiene a todos los agentes.

      Retorna:
      List[Agent]: Una lista de todos los agentes Caja colindantes que caen
      dentro del rango observable especificado.
        """

      seeRange = self.model.p.worldSize[0]

      p = [a for a in e.neighbors(self, distance=seeRange) if a.agentType==1]

      return p

    # Belief Revision Function:
    def brf(self,p):
      """
      Función que actualiza el sistema de creencias del agente montacargas
      basad0 en el estado observado del ambiente.

      Parámetros:
      - self: Instancia de la clase.
      - p (List[Agent]): Lista de agentes Caja dentro del alcance del montacargas.

      Retorna:
      No hay valor de retorno.
      """

      # Se destruyen las creencias previas.
      for caja in self.this_montacargas.cajas_within_reach:
          destroy_entity(caja.is_in_place[0])
          destroy_entity(caja)
      destroy_entity(self.this_montacargas.is_in_place[0])

      # Se instancian ontológicamente los montacargas.
      currentPos = self.model.almacen.positions[self]
      self.this_montacargas.is_in_place = [Place(at_position = str(currentPos))]

      # Se instancian ontológicamente las cajas al alcance.
      for c in p:
          theBox = Caja(is_in_place = [Place()])
          theBox.is_in_place[0].at_position = str(self.model.almacen.positions[c])
          self.this_montacargas.cajas_within_reach.append(theBox)

    def options(self):
      """
      Función que genera las metas disponibles a perseguir basándose en la
      distancia de cada Caja con respecto al Montacargas; es decir, sus deseos.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      dict: Un diccionario donde las claves son instancias de Caja dentro del
      alcance del Montacargas, y los valores son sus distancias euclidianas.
      Además, incluye una entrada adicional para la distancia al centro del almacén.
      """

      distances = {}

      for onto_caja in self.this_montacargas.cajas_within_reach:
          caja_pos = eval(onto_caja.is_in_place[0].at_position)
          montacargas_pos = eval(self.this_montacargas.is_in_place[0].at_position)
          # Distancia euclidiana:
          d = math.sqrt((caja_pos[0] - montacargas_pos[0])**2 + (caja_pos[1] - montacargas_pos[1])**2)
          distances[onto_caja] = d

      # Distancia al centro:
      center_pos = (self.model.p.worldSize[0] // 2, self.model.p.worldSize[1] // 2)

      montacargas_pos = eval(self.this_montacargas.is_in_place[0].at_position)

      center_distance = math.sqrt((center_pos[0] - montacargas_pos[0])**2 + (center_pos[1] - montacargas_pos[1])**2)

      distances["Center"] = center_distance

      return distances

    #The filter function (where it gets the Intention)
    def filter(self):
      """
      Función que determina la intención del Montacargas al seleccionar la caja
      más cercana o el centro como objetivo de acuerdo con sus deseos.

      Parámetros:
      - self: La instancia de la clase

      Retorna:
      Instancia or None: Retorna la instancia de la caja más cercana como objetivo,
      el centro del almacén o None si no hay objetivos.
      """

      # Si una colisión ocurrió, se regresa el centro del almacén.
      if self.collision_occurred:
          return self.D.get("Center", None)

      # Si no, se busca la caja con menor distancia.
      self.D.pop("Center", None)
      desires = {x: y for x, y in sorted(self.D.items(), key=lambda item: item[1])}

      if desires:
          return list(desires.items())[0][0]
      else:
          return None

    def plan(self):
      """
      Función que genera un plan para alcanzar la intención actual del Montacargas.

      Parámetros:
      - self: La instancia de la clase.

      Retorna:
      lista de tuplas: Retorna un plan en forma de lista de tuplas (x, y).
      """

      if self.I is None:
          if random.randint(0, 1) == 0:
              return [(random.choice([-1, 1]), 0)]
          elif random.randint(0, 1) == 1:
              return [(0, random.choice([-1, 1]))]
          else:
              return [(0, 0)]

      thePlanX = []
      thePlanY = []

      # Se evalúa si la instancia se trata del centro del almacén o una caja.
      if isinstance(self.I, float):
          center_pos = (self.model.p.worldSize[0] // 2, self.model.p.worldSize[1] // 2)
          montacargas_pos = eval(self.this_montacargas.is_in_place[0].at_position)
          distance2D = (center_pos[0] - montacargas_pos[0], center_pos[1] - montacargas_pos[1])
      else:
          cajaPos = eval(self.I.is_in_place[0].at_position)
          montacargas_pos = eval(self.this_montacargas.is_in_place[0].at_position)
          distance2D = (cajaPos[0] - montacargas_pos[0], cajaPos[1] - montacargas_pos[1])

      for i in range(abs(distance2D[0])):
          val = 1 if distance2D[0] >= 0 else -1
          thePlanX.append(val)

      for j in range(abs(distance2D[1])):
          val = 1 if distance2D[1] >= 0 else -1
          thePlanY.append(val)

      thePlanX = list(zip(thePlanX, [0 for _ in range(len(thePlanX))]))
      thePlanY = list(zip([0 for _ in range(len(thePlanY))], thePlanY))

      thePlan = thePlanX + thePlanY
      random.shuffle(thePlan)

      return thePlan

    #The main BDI algorithm
    def BDI(self, p):
      """
      This function calls all functions from the BDI architecture.
      Función que ejecuta todas las funciones de la arquitectura BDI
      (Belief-Desire-Intention).

      Parámetros:
      - self: Instancia de la clase.
      - p: Lista de objetos que el Montacargas puede percibir en su entorno.

      Retorna:
      No hay valor de retorno.
      """

      self.brf(p)

      # Se crea un nuevo plan si la intención se cumplió.
      if self.intentionSucceded:
          self.intentionSucceded = False
          self.D = self.options()
          self.I = self.filter()
          self.currentPlan = self.plan()
          self.collision_occurred = False

          # Se evalúa si el montacargas llegó al centro.
          montacargas_pos = eval(self.this_montacargas.is_in_place[0].at_position)
          center_pos = (self.model.p.worldSize[0] // 2, self.model.p.worldSize[1] // 2)
          if montacargas_pos == center_pos:
              self.agentType = 0

    #The function to execute actions
    def execute(self):
      """
      Función que ejecuta el plan de acciones paso a paso. Cada acción es una
      tupla que contiene un 1 o -1, indicando si el agente debe moverse en una
      dirección u otra.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """

      currentAction = (0, 0)

      # Si el plan aún no termina:
      if len(self.currentPlan) > 0:
          currentAction = self.currentPlan.pop()
      else:  # Si el plan terminó
          self.intentionSucceded = True

          # Se evalúa si el agente colisionó con alguna caja.
          for montacarga in self.model.montacargas:
              for caja in self.model.cajas:
                  if caja in self.model.almacen.positions and self.model.almacen.positions[montacarga] == self.model.almacen.positions[caja]:
                      self.collision_occurred = True
                      self.agentType = 2

                      self.model.almacen.remove_agents(caja)
                      self.model.cajas.remove(caja)

          # Se actualizan deseos e intenciones y se crea un nuevo plan.
          self.D = self.options()
          self.I = self.filter()
          self.currentPlan = self.plan()

      self.model.almacen.move_by(self, currentAction)

    def initBeliefs(self,initPos):
      """
      Esta función llena el sistema de creencias del agente instanciando los
      primeros conceptos de la ontología.

      Parámetros:
      - self: Instancia de la clase.
      initPos (tuple): Posición inicial del Montacargas en el formato (x, y).

      Retorna:
      No hay valor de retorno.
      """

      place = Place(at_position=str(initPos))
      self.this_montacargas = Montacargas(is_in_place = [place])

    # Initial intentions funtion
    def initIntentions(self):
      """
      Función que provee la primera intención del agente.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """

      self.intentionSucceded = True
      self.I = None

    #=================== Funciones principales del agente =====================

    def setup(self):
      """
      Función que configura la inicialización del agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """

      #HIdentifier of the Montacargas Agent
      self.agentType = 0
      self.firstStep = True
      self.currentPlan = []

    def step(self):
      """
      Función que ejecuta un paso de la simulación para el agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """

      # Para el primer paso:
      if self.firstStep:
          initPos = self.model.almacen.positions[self]
          self.initBeliefs(initPos)
          self.initIntentions()
          self.firstStep = False

      self.BDI(self.see(self.model.almacen))

      self.execute()

    def update(self):
      """
      Función que actualiza al agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      pass

    def end(self):
      """
      Función que finaliza la simulación para el agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      - No hay valor de retorno.
      """
      pass

In [47]:
# AGENTE CAJA

# Este agente no hace nada, solo existe.
class CajaAgent(ap.Agent):

    def setup(self):
      """
      Función que configura la inicialización del agente Caja.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      self.agentType = 1

    def step(self):
      """
      Función que ejecuta un paso de la simulación para el agente Caja.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      pass

    def update(self):
      """
      Función que actualiza al agente Caja.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      pass

    def end(self):
      """
      Función que finaliza la simulación para el agente Montacargas.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      - No hay valor de retorno.
      """
      pass

In [59]:
class DepositoAgent(ap.Agent):

     def setup(self, almacen):
      """
      Función que configura la inicialización del agente Deposito.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      self.agentType = 3

     def step(self):
      """
      Función que ejecuta un paso de la simulación para el agente Deposito.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      pass

     def update(self):
      """
      Función que actualiza al agente Deposito.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      pass

     def end(self):
      """
      Función que finaliza la simulación para el agente Deposito.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      - No hay valor de retorno.
      """
      pass

In [95]:
# LA SIMULACIÓN

class AlmacenModel(ap.Model):

    def get_cajas(self):
      """
      Función que obtiene la cantidad de cajas restantes en el almacén.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      int: Cantidad de cajas restantes en el almacén.
      """
      return len(self.cajas)

    def setup(self):
      """
      Función que configura el modelo al inicio de la simulación.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """

      # Lista para guardar las posiciones de todos los agentes, dado un momento `time-step`
      self.posiciones_global = []

      # Crear agentes
      self.montacargas = ap.AgentList(self,self.p.montacargasAgents,MontacargasAgent)
      self.cajas = ap.AgentList(self,self.p.cajasAgents,CajaAgent)
      self.almacen = ap.Grid(self,self.p.worldSize,track_empty=True)
      self.deposito = ap.AgentList(self, 1, DepositoAgent, almacen=self.almacen)

      # Agregar agentes al almacén
      self.almacen.add_agents(self.montacargas,random=True,empty=True)
      self.almacen.add_agents(self.cajas,random=True,empty=True)
      self.almacen.add_agents(self.deposito, positions=[(self.model.p.worldSize[0] // 2, self.model.p.worldSize[1] // 2)], random=False, empty=True)

    def step(self):
      """
      Función que ejecuta un paso de la simulación.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      self.montacargas.step()
      self.cajas.step()

      # Si ya no hay cajas, se detiene la simulación.
      if len(self.cajas) <= 0:
          self.stop()

    def update(self):
      """
      Función que actualiza el modelo registrando la posición de agentes
      y cajas en el tiempo actual.

      Parámetros:
      - self: Instancia de la clase.

      Retorna:
      No hay valor de retorno.
      """
      copia_de_todas_las_posiciones = self.almacen.positions.copy()
      self.posiciones_global.append(copia_de_todas_las_posiciones)

    def end(self):
      """
      Función que finaliza la simulación.

      Parámetros:
      - self: Instancia de la clase.

      Retorno:
      No hay valor de retorno.
      """

      print("Finished!")
      self.report('Posicion de montacargas', self.posiciones_global)

In [78]:
# ANIMACIÓN

def animation_plot(model, ax):
  """
  Función para generar un gráfico animado del modelo de simulación.

  Parámetros:
  - self: Instancia de la clase.
  - model (AlmacenModel): Instancia del modelo de simulación.
  - ax (matplotlib.axes._axes.Axes): Eje sobre el cual se realizará el gráfico.

  Retorna:
    No hay valor de retorno.
  """
    agent_type_grid = model.almacen.attr_grid('agentType')

    # Mapa de colores para los estados
    color_dict = { 0: "#c42c02", # rojo: Montacargas en ruta
                    1: '#4B241C', # café: Caja
                    2: '#9666ff', # morado: Montacargas con caja
                    3: "#2986cc", # azul: Depósito
                    None:'#e0e0e0'}

    ap.gridplot(grid = agent_type_grid, color_dict = color_dict, ax=ax, convert = True)
    ax.set_title(f"Simulación de montacargas \n Time-step: {model.t}, "
                 f"Cajas por ordenarzz: {model.get_cajas()}")

In [101]:
# PARÁMETROS DE SIMULACIÓN

r = random.random()

parameters = {
    "montacargasAgents" : 2,     # Cantidad de montacargas
    "cajasAgents" : 40,          # Cantidad de cajas
"worldSize" : (18,18),           # Dimensiones del almacén.
    "steps" : 150,               # Pasos máximos.
    "seed" : 13*r                # Semilla para variables random.
}

#============================================================================0

# SIMULACIÓN:

fig, ax = plt.subplots()

model = AlmacenModel(parameters)

# model.run()
animation = ap.animate(model, fig, ax, animation_plot)

IPython.display.HTML(animation.to_jshtml())