# Evidence 2: Challenge, Agent Simulation
### Modeling of Multi-Agent Systems with Computer Graphics

With this final version of the project we worked creating and simulating the trucks on construction sites using Belief-Desire-Intention (BDI) agents. Our unique approach integrates BDI agents into truck models being able to navigates realistically in a simulation environment.

#### Required libraries to run on Colab

In [None]:
#!pip install agentpy pathfinding owlready2

### Libraries

In [None]:
import agentpy as ap
import pathfinding as pf
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from owlready2 import *
import itertools
import random
import IPython
import math

### Ontology

In [None]:
onto = get_ontology("http://test.org/TruckingOnto.owl")


In [None]:
with onto:

  # Entities

  class Entity(Thing): pass
  class Truck(Entity): pass
  class Camera(Entity): pass
  class Obstacle(Entity): pass
  class Deposit(Entity): pass
  class Security(Entity): pass
  class Exit(Entity): pass

  # ------------------------- Relations -------------------------

  ## _________ Domain Entity _________
  class x_coordinate(Entity >> int, FunctionalProperty): pass
  class y_coordinate(Entity >> int, FunctionalProperty): pass
  class z_coordinate(Entity >> int, FunctionalProperty): pass

  ## _________ Domain Truck _________
  class is_full(Truck >> bool, FunctionalProperty): pass
  class at_destination(Truck >> bool, FunctionalProperty): pass
  class loading_step_count(Truck >> int, FunctionalProperty): pass
  class currency(Truck >> int, FunctionalProperty): pass
  class targetDeposit(Truck >> Deposit, FunctionalProperty): pass
  class targetExit(Truck >> Exit, FunctionalProperty): pass
  class canProceed(Truck >> bool, FunctionalProperty): pass

  ## _________ Domain Deposit _________
  class occupied(Deposit >> bool, FunctionalProperty): pass

  ## _________ Domain Camera _________
  class viewRadius(Camera >> int, FunctionalProperty): pass
  class fullTruckCount(Camera >> int, FunctionalProperty): pass
  class trucksInView(Camera >> int, FunctionalProperty):pass
  class observedTrucks(Camera >> Truck): pass

  ## _________ Domain Obstacle _________
  class isBlocking(Obstacle >> bool, FunctionalProperty): pass
  class radius(Obstacle >> int, FunctionalProperty): pass

  ## _________ Domain Security _________
  class numberTrucksInCenter(Security >> int, FunctionalProperty): pass
  class totalTrucksAllowed(Security >> Truck): pass
  class totalTrucksAllowedCount(Security >> int, FunctionalProperty): pass
  class totalMoneyReceived(Security >> int, FunctionalProperty): pass
  class cameraTruckCount(Security >> int, FunctionalProperty): pass
  class currentTruckPayment(Security >> int, FunctionalProperty): pass
  class truckAtCheckpoint(Security >> Truck, FunctionalProperty): pass

###Messages (KQML)

In [None]:
#MESSAGE CLASS
#Class to handle and build a message, based on KQML
class Message():

    performatives = ["request","inform"]
    parameters = ["content","sender","reply-with","in-reply-to"]

    def __init__(self,msg="",performative="",content="",sender="",query="q1",is_reply=True):
        """Constructor to build a new message"""
        self.empty = False
        self.request = False
        self.inform = False
        self.msg = msg

        if msg == "":
            self.is_reply = is_reply
            self.query = query
            assert performative in Message.performatives , f"Performaive: {performative}"
            self.performative=performative
            self.content = content
            self.sender = sender

        else:
            self.decode()

        if self.performative == "request":
            self.request = True
        elif self.performative == "inform":
            self.inform = True
        else:
            self.empty = True

    def decode(self):
        current = self.msg[1:-1]
        current = current.split("\n")
        self.performative = current[0]
        assert self.performative in Message.performatives , f"Performaive: {self.performative}"
        parameterList = current[1].split(":")[1:]
        parametersDict = {}
        for parameter in parameterList:
            pair = parameter.split(" ")
            parametersDict[pair[0]] = pair[1]
        if "in-reply-to" in parametersDict.keys():
            self.query = parametersDict["in-reply-to"]
            self.is_reply = True
        else:
            self.query = parametersDict["reply-with"]
            self.is_reply = False
        self.content = parametersDict["content"]
        self.sender = parametersDict["sender"]

    def __str__(self):
        """Method to convert message paramters to a string (KQML format)"""
        s = "("
        s+= self.performative + "\n"
        s+= ":sender " + self.sender
        s+= ":content "+self.content
        if self.is_reply:
            s+= ":in-reply-to " + self.query
        else:
            s+= ":reply-with " + self.query
        s+= ")"
        return s


### Agents

**AgentType**
- Truck = 0
- Deposit = 1
- Camera = 2
- Obstacle = 3
- Security = 4
- Exit = 5

#### Truck Agent

In [None]:
# Truck Agent Class

class TruckAgent(ap.Agent):
    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
        self.agentType = 0 #Definition of truck for the simulation
        self.firstStep = True
        self.this_truck = Truck()

    def step(self):
        """ Executes the BDI cycle """
        if self.firstStep:
          position = self.model.environment.positions[self]
          self.initBeliefs(position)
          self.initIntentions()
          self.firstStep = False
        else:
            I = self.filter()
            self.execute(I)

    def update(self): pass

    def end(self): pass

    # ------------------------------------------ Functions ------------------------------------------

    def initBeliefs(self, position):
        #Initializes the belief system
        self.this_truck.x_coordinate = position[0]
        self.this_truck.y_coordinate = position[1]
        self.this_truck.z_coordinate = 0 #2d Animation
        self.this_truck.isFull = False
        self.this_truck.loading_step_count = 0
        self.this_truck.currency = 200 if self.this_truck.currency is None else self.this_truck.currency #To set currency manually
        self.this_truck.targetDeposit = None
        self.this_truck.targetExit = None
        self.this_truck.canProceed = True

    def initIntentions(self):
      self.this_truck.at_destination = False #For the truck to know if it reached its goal
      pass

    def find_nearest_deposit(self, environment):
      #Finds the deposit in view and saves it as a target in its beliefs
        deposits = [a for a in environment.neighbors(self, distance=100) if a.agentType == 1]
        self.this_truck.targetDeposit = deposits[0].this_deposit
        return deposits

    def find_nearest_exit(self, environment):
      #Finds the exit in view and saves it as a target in its beliefs
        exits = [a for a in environment.neighbors(self, distance=100) if a.agentType == 5]
        self.this_truck.targetExit = exits[0].this_exit
        return exits

    def getExitCoordinates(self, exit):
      #Bridge between ontology and simulation to have generic functions
      x = exit.this_exit.x_coordinate
      y = exit.this_exit.y_coordinate
      return x,y

    def getDepositCoordinates(self, deposit):
      #Bridge between ontology and simulation to have generic functions
      x = deposit.this_deposit.x_coordinate
      y = deposit.this_deposit.y_coordinate
      return x,y

    def brf(self, coordintates):
      #Main functoin, used fot navigation and to update intention variables
      dx = coordintates[0] - self.this_truck.x_coordinate
      dy = coordintates[1] - self.this_truck.y_coordinate

      # Check if already at the destination
      if dx == 0 and dy == 0:
          self.this_truck.at_destination = True
          return

      # Potential moves on the grid
      potential_moves = []
      if dx != 0: potential_moves.append((1 if dx > 0 else -1, 0))
      if dy != 0: potential_moves.append((0, 1 if dy > 0 else -1))

      # Shuffle moves to avoid directional bias if distances are equal
      random.shuffle(potential_moves)

      #
      for move in potential_moves:
          new_x = self.this_truck.x_coordinate + move[0]
          new_y = self.this_truck.y_coordinate + move[1]
          new_position = (new_x, new_y)

          # Recieves obstacle information from the grid
          if not self.model.is_obstacle_at(new_position):
              # Move to the first unobstructed position in the ontology
              self.model.environment.move_by(self, move)
              self.this_truck.x_coordinate = new_x
              self.this_truck.y_coordinate = new_y
              break

      # Update at_destination based on new position
      self.this_truck.at_destination = dx == 0 and dy == 0

    def load_material(self):
      #Function that simulates the time it takes to load material
        if self.this_truck.at_destination and self.this_truck.targetDeposit:
            self.this_truck.loading_step_count += 1
            if self.this_truck.loading_step_count >= 5:
                self.this_truck.isFull = True
                self.this_truck.at_destination = False

    def filter(self):
        # Process deposit message and update belief about whether it can proceed
        if self.model.depositMessage:
            message_obj = Message(msg=self.model.depositMessage)
            if message_obj.performative == "inform" and message_obj.sender == "DepositAgent":
                # Update belief based on the deposit message
                self.this_truck.canProceed = (message_obj.content == self.this_truck.name)
        else:
          self.this_truck.canProceed = True

        # Decide intention based on whether the truck can proceed, is full, or is at destination
        if not self.this_truck.isFull:
            if self.this_truck.at_destination and self.this_truck.canProceed:
                return 'load_material'
            elif not self.this_truck.at_destination and self.this_truck.canProceed:
                return 'navigate_to_deposit'
            else:
                return 'wait_for_deposit'
        else:
            if not self.this_truck.at_destination:
                return 'navigate_to_exit'
            else:
                return 'leave_simulation'

  #Executes the next step based on its current intention
    def execute(self, intention):
        if intention == 'load_material':
            self.load_material()
            if self.this_truck.isFull:
                self.this_truck.targetDeposit = None
                self.this_truck.targetExit = self.find_nearest_exit(self.model.environment)[0].this_exit
        elif intention == 'navigate_to_deposit':
            self.brf(self.getDepositCoordinates(self.find_nearest_deposit(self.model.environment)[0]))
        elif intention == 'navigate_to_exit':
            self.brf(self.getExitCoordinates(self.find_nearest_exit(self.model.environment)[0]))
        elif intention == 'leave_simulation':
            self.remove_from_simulation()
        else:
          pass

    def remove_from_simulation(self):
        #Aux functoin to destroy truck when it has already exited the quarry
        try:
          self.model.environment.remove_agents(self)
        except:
          pass




#### Deposit Agent

In [None]:
class DepositAgent(ap.Agent):

    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
        self.agentType = 1  # Identifying the agent as DepositAgent
        self.firstStep = True
        self.this_deposit = Deposit()  # Ontology entity

    def step(self):
        if self.firstStep:
            position = self.model.environment.positions[self]
            self.initBeliefs(position)
            self.firstStep = False
        else:
            self.execute()

    def update(self): pass

    def end(self): pass

    # ------------------------------------------ Functions ------------------------------------------

    def initBeliefs(self, position):
        """ Initializes the belief system """
        self.this_deposit = Deposit()
        self.this_deposit.x_coordinate = position[0]
        self.this_deposit.y_coordinate = position[1]
        self.this_deposit.z_coordinate = 0 #for the purpuses of a 2d Animation
        self.this_deposit.occupied = False  # Initial belief state

    def detect_truck(self):
        # Check if a truck is at the deposit's position, it can view trucks.
        trucks_at_deposit = [a for a in self.model.environment.neighbors(self, distance=0) if a.agentType == 0]  # 0 is for trucks
        self.this_deposit.occupied = bool(trucks_at_deposit)  # Update belief based on truck presence
        if self.this_deposit.occupied:
            occupied_message = Message(performative="inform",
                                      content=trucks_at_deposit[0].this_truck.name,
                                      sender="DepositAgent")
            self.model.depositMessage = str(occupied_message)
            print(f"<< Deposit Agent>> Currently loading {occupied_message.content}")

    def execute(self):
      self.detect_truck()


#### Camera Agent

In [None]:
# Camera Agent Class

class CameraAgent(ap.Agent):

    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
        self.agentType = 2
        self.firstStep = True
        self.this_camera = Camera()

    def step(self):
        if self.firstStep:
          position = self.model.environment.positions[self]
          self.initBeliefs(position)
          self.firstStep = False
        self.execute()

    def update(self): pass

    def end(self): pass

    # ------------------------------------------ Functions ------------------------------------------

    def initBeliefs(self, position):
        #Initializes the belief system
        self.this_camera = Camera()
        self.this_camera.x_coordinate = position[0]
        self.this_camera.y_coordinate = position[1]
        self.this_camera.z_coordinate = 0 #For the purpuses of a 2d Animation
        self.this_camera.trucksInView = 0
        self.this_camera.fullTruckCount = 0
        self.this_camera.viewRadius = 5

    def detect_trucks(self, environment):
        #Detects full trucks within the camera's view radius
        # Filter visible trucks to include only those that are full
        full_trucks = [a for a in environment.neighbors(self, distance=self.this_camera.viewRadius)
            if a.agentType == 0 and a.this_truck.isFull]
        return full_trucks

    def brf(self, full_trucks):
        #Updates beliefs based on detected trucks
        self.this_camera.trucksInView = len(full_trucks) #Trucks currently observed

        for truck in full_trucks:
            if truck.this_truck not in self.this_camera.observedTrucks:
                self.this_camera.observedTrucks.append(truck.this_truck) #Dictionary of all trucks ever observed during the simulation
                self.this_camera.fullTruckCount += 1

    def execute(self):
        full_trucks = self.detect_trucks(self.model.environment)
        self.brf(full_trucks)
        truck_count_message = Message(performative="inform",
                                      content=str(self.this_camera.fullTruckCount),
                                      sender="CameraAgent")
        self.model.broadcastMessage = str(truck_count_message)
        print("<< Camera Agent >> Full trucks currently in view : ", self.this_camera.trucksInView) #Message to console
        print("<< Camera Agent >> Total full trucks observed : ", self.this_camera.fullTruckCount) #Message to console



#### Obstacle Agent

In [None]:
# Obstacle Agent Class

class ObstacleAgent(ap.Agent):

    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
      self.agentType = 3
      self.firstStep = True
      self.this_obstacle = Obstacle()
      self.this_obstacle.isBlocking = True

    def step(self):
       #This guy just exists to block
      if self.firstStep:
        self.this_obstacle = Obstacle()
        position = self.model.environment.positions[self]
        self.this_obstacle.x_coordinate = position[0]
        self.this_obstacle.y_coordinate = position[1]
        self.this_obstacle.z_coordinate = 0 #for the purpuses of a 2d Animation
        self.firstStep = False

    def update(self): pass

    def end(self): pass

#### Security Agent

In [None]:
#Security Agent Class

class SecurityAgent(ap.Agent):
    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
        self.agentType = 4
        self.firstStep = True
        self.this_security = Security()
        self.charge_amount = 200

    def step(self):
        """ Executes the BDI cycle """
        if self.firstStep:
          position = self.model.environment.positions[self]
          self.initBeliefs(position)
          self.firstStep = False

        self.brf(self.detect_truck(self.model.environment))
        I = self.filter()
        self.execute(I)


    def update(self): pass

    def end(self): pass

    # ------------------------------------------ Functions ------------------------------------------

    def initBeliefs(self, position):
        #Initializes the belief system
        self.this_security.x_coordinate = position[0]
        self.this_security.y_coordinate = position[1]
        self.this_security.z_coordinate = 0
        self.this_security.totalMoneyReceived = 0
        self.this_security.cameraTruckCount = 0
        self.this_security.currentTruckPayment = 0
        self.this_security.numberTrucksInCenter = 0
        self.this_security.totalTrucksAllowedCount = 0

    def detect_truck(self, environment):
        #Check for an incoming truck at the security checkpoint
        incoming_truck = [a for a in environment.neighbors(self, distance=0) if a.agentType == 0]
        self.this_security.truckAtCheckpoint = incoming_truck[0].this_truck if incoming_truck else None
        self.this_security.currentTruckPayment = 0
        return incoming_truck

    def process_broadcast(self, message_obj):
      #recieves broadcasted message from Camera Agent using KQML protocol
      if message_obj.performative == "inform" and message_obj.sender == "CameraAgent":
          try:
              truck_count = int(message_obj.content)
              self.this_security.cameraTruckCount = truck_count #Updates belief based on message contents
          except ValueError:
              print("Error: Invalid truck count received in message.")

    def check_truck_count(self):
        #Checks if the truck count matches the camera count from message
        if self.this_security.totalTrucksAllowedCount < self.this_security.cameraTruckCount:
            self.sound_alarm()

    def sound_alarm(self):
        """ Sounds an alarm """
        print("%%%% Alarm! Truck count discrepancy detected! Security Guard compromised! %%%%")

    def brf(self, truck_detected):
      #Updates beliefs based on message contents
      if self.model.broadcastMessage:
          message_obj = Message(msg=self.model.broadcastMessage)
          self.process_broadcast(message_obj)
      self.check_truck_count()


    def filter(self):
      # Creates intentions based on the presence of a truck at the checkpoint
      if self.this_security.truckAtCheckpoint is not None:
          if self.this_security.truckAtCheckpoint.currency >= self.charge_amount:
              print("<< Security agent >> Detected truck at checkpoint with available funds. Collecting payment...") #Collects money from the truck
              return 'payment_collect'
          else:
              print("<< Security agent >> Truck doesn't have enough money... Eh, I'll let them through...") #Doesn't do his job properly for the purposes of the simulation
              return 'payment_ignore'
      else:
          return 'empty_checkpoint'


    def execute(self, intention):
      #Executes current step based on logic
        if intention == 'payment_collect':
            self.this_security.truckAtCheckpoint.currency -= self.charge_amount
            self.this_security.totalMoneyReceived += self.charge_amount
            self.this_security.totalTrucksAllowedCount += 1
            print(f"<< Security agent >> Total funds: {self.this_security.totalMoneyReceived}")
        elif intention == 'payment_ignore':
            pass
        else: # intention == 'empty_checkpoint'
            pass
        print(f"<< Security Agent >> Total trucks registered at checkpoint: {self.this_security.totalTrucksAllowedCount}")


#### Exit Agent

In [None]:
# Exit Agent Class

class ExitAgent(ap.Agent):

    # ------------------------------------------ Main Functions ------------------------------------------

    def setup(self):
        self.agentType = 5
        self.firstStep = True
        self.this_exit = Exit()

    def step(self):
      #This guy also just exists
      if self.firstStep:
        self.this_exit = Exit()
        position = self.model.environment.positions[self]
        self.this_exit.x_coordinate = position[0]
        self.this_exit.y_coordinate = position[1]
        self.this_exit.z_coordinate = 0 #for the purpuses of a 2d Animation
        self.firstStep = False

    def update(self): pass

    def end(self): pass

### Simulation

In [None]:
import agentpy as ap

class TruckingSimulation(ap.Model):

    def setup(self):
      self.step_counter = 0

      # --------------------- Initialize environment ---------------------
      self.environment = ap.Grid(self, (15, 15), track_empty=True)

      # --------------------- Positions ---------------------
      truck_position = [(0,0), (0,2), (0,4), (0,6), (0,8)]
      deposit_position = [(8, 10)]
      camera_position = [(9, 8)]
      obstacles_position = [(2, 9), (0,11), (1, 0), (1, 1), (1, 2), (1, 3), (1, 4), (1, 5), (1, 6), (1, 7), (1, 8), (1, 9), (1,11), (3,9), (4,9), (5,9), (6,9), (7,9), (8,9), (9,9),(10,9), (11,9), (12,9), (13,9), (14,9), (2, 11), (3, 11), (4, 11), (5, 11), (6, 11), (7, 11), (8, 11), (9, 11), (10, 11), (11, 11), (12, 11), (13, 11), (13, 12), (13,13),(13,14)]
      security_position = [(1, 10)]
      exit_position = [(14, 14)]
      self.broadcastMessage = ""
      self.depositMessage = ""

      # --------------------- Initialize Agents ---------------------
      self.deposits = ap.AgentList(self, len(deposit_position), DepositAgent)
      self.cameras = ap.AgentList(self, len(camera_position), CameraAgent)
      self.obstacles = ap.AgentList(self, len(obstacles_position), ObstacleAgent)
      self.security_agents = ap.AgentList(self, len(security_position), SecurityAgent)
      self.exits = ap.AgentList(self, len(exit_position), ExitAgent)
      self.trucks = ap.AgentList(self, len(truck_position), TruckAgent)
      self.trucks[1].this_truck.currency = 100

      # --------------------- Addition of agents ---------------------
      self.environment.add_agents(self.trucks, positions=truck_position)
      self.environment.add_agents(self.deposits, positions=deposit_position)
      self.environment.add_agents(self.cameras, positions=camera_position)
      self.environment.add_agents(self.obstacles, positions=obstacles_position)
      self.environment.add_agents(self.security_agents, positions=security_position)
      self.environment.add_agents(self.exits, positions=exit_position)



    def step(self):
      self.step_counter += 1
      print()
      print(f"============ Step {self.step_counter:3} ============ ")
      self.deposits.step()
      self.trucks.step()
      self.cameras.step()
      self.obstacles.step()
      self.security_agents.step()
      self.exits.step()
      self.broadcastMessage = ""
      self.depositMessage = ""

    def update(self): pass

    def end(self): pass

    def is_obstacle_at(self, position):
        """Check if there's an obstacle at the given position."""
        for agent in self.obstacles:
            if self.environment.positions[agent] == position:
                return True
        return False


### Plotting and animation

In [None]:
# Animation function

def animation_plot(model, ax):
    agent_type_grid = model.environment.attr_grid('agentType')
    # ap.gridplot(agent_type_grid, cmap='Accent', ax=ax)
    ap.gridplot(agent_type_grid, cmap=color_map, ax=ax)
    ax.set_title(f"Bank of Material Model \n Time-step: {model.t}")

# Parameters
parameters = {
    "steps" : 62
}

color_map = ListedColormap(["tab:blue", "tab:green", "tab:cyan", "tab:gray", "tab:orange", "tab:red"])
# truck, deposit, camera, obstacle, checkpoint, exit


fig, ax = plt.subplots()

model = TruckingSimulation(parameters)

animation = ap.animate(model, fig, ax, animation_plot)

IPython.display.HTML(animation.to_jshtml())


<< Camera Agent >> Full trucks currently in view :  0
<< Camera Agent >> Total full trucks observed :  0
<< Security Agent >> Total trucks registered at checkpoint: 0

<< Camera Agent >> Full trucks currently in view :  0
<< Camera Agent >> Total full trucks observed :  0
<< Security Agent >> Total trucks registered at checkpoint: 0

<< Camera Agent >> Full trucks currently in view :  0
<< Camera Agent >> Total full trucks observed :  0
<< Security Agent >> Total trucks registered at checkpoint: 0

<< Camera Agent >> Full trucks currently in view :  0
<< Camera Agent >> Total full trucks observed :  0
<< Security agent >> Detected truck at checkpoint with available funds. Collecting payment...
<< Security agent >> Total funds: 200
<< Security Agent >> Total trucks registered at checkpoint: 1

<< Camera Agent >> Full trucks currently in view :  0
<< Camera Agent >> Total full trucks observed :  0
<< Security Agent >> Total trucks registered at checkpoint: 1

<< Camera Agent >> Full tru