In [1]:
# dependencias
%pip install agentpy IPython

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import agentpy as ap
import IPython
import numpy as np
import heapq
import matplotlib.pyplot as plt

class HarvestModel(ap.Model):
    def a_star(self, grid, start, goal, invalid_values):
      def heuristic(a, b):
          return abs(a[0] - b[0]) + abs(a[1] - b[1])

      directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
      open_list = []
      heapq.heappush(open_list, (0, start))

      g_score = {start: 0}
      f_score = {start: heuristic(start, goal)}
      came_from = {}

      while open_list:
          _, current = heapq.heappop(open_list)
          if current == goal:
              path = []
              while current in came_from:
                  path.append(current)
                  current = came_from[current]
              path.append(start)
              path.reverse()
              return path

          for direction in directions:
              neighbor = (current[0] + direction[0], current[1] + direction[1])
                          
              if 0 <= neighbor[0] < len(grid) and 0 <= neighbor[1] < len(grid[0]) and grid[neighbor[0]][neighbor[1]] not in invalid_values:
                      tentative_g_score = g_score[current] + 1

                      if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                          came_from[neighbor] = current
                          g_score[neighbor] = tentative_g_score
                          f_score[neighbor] = tentative_g_score + heuristic(neighbor, goal)
                          heapq.heappush(open_list, (f_score[neighbor], neighbor))
      return None

    def setup(self):
        self.harvesters = []
        self.collectors = []
        exclude_coords = []

        for i in range(self.p['tractors']):
          harvester_start = (self.p['size'][0] - 2, 1 + i*2)
          collector_start = (self.p['size'][0] - 2, 2 + i*2)

          self.harvesters.append(Harvester(self, harvester_start))
          self.collectors.append(Collector(self, collector_start))

          exclude_coords.extend([harvester_start, collector_start])

        # MATRIX 
        # 1 = crops
        # -1 = obstacles
        # 5 = tractor
        # 10 = collector
        # 0 = empty space
        self.aux_grid_matrix = np.zeros(self.p.size, dtype=int)

        self.paths = [[] for _ in range(self.p['tractors'])]  
        self.path_indices = [0] * self.p['tractors'] 
        self.collecting = [False] * self.p['tractors']  
        self.crops_collected = set()  
        self.total_crops = 0  

        self.custom_t = 0

        # lista de coordenadas en donde hay cultivos        
        self.lista_coordenadas = []

        for i in range(1, (self.p['size'][0] - 1), 1):
          if i % 2 != 0:
            for j in range((self.p['size'][0] - 2), 0, -1):
              self.lista_coordenadas.append((j, i))
          else:
            for j in range(1, (self.p['size'][0] - 1), 1):
              self.lista_coordenadas.append((j, i))

        self.lista_obstaculos = []

        while len(self.lista_obstaculos) < self.p['obstacles']:
            x = np.random.randint(0, self.p['size'][0] - 1)
            y = np.random.randint(0, self.p['size'][1] - 1)
            coord = (x, y)
            if coord not in exclude_coords and coord not in self.lista_obstaculos:
                self.lista_obstaculos.append(coord)
           
        for collector in self.collectors:
            collector.capacity = self.p['capacity']

        for i in range(len(self.lista_obstaculos)):
            if self.lista_obstaculos[i] in self.lista_coordenadas:
                self.lista_coordenadas.remove(self.lista_obstaculos[i])

        self.total_crops = len(self.lista_coordenadas)

        for i in range(len(self.lista_obstaculos)):
            self.aux_grid_matrix[self.lista_obstaculos[i]] = -1
        
        for i in range(len(self.lista_coordenadas)):
            self.aux_grid_matrix[self.lista_coordenadas[i]] = 1

    def step(self):
        for i, (harvester, collector) in enumerate(zip(self.harvesters, self.collectors)):
            if self.collecting[i]:
                if self.path_indices[i] < len(self.paths[i]):
                    step = self.paths[i][self.path_indices[i]]

                    if self.aux_grid_matrix[step] == 1:
                        collector.next_index_was1 = True
                    else:
                        collector.next_index_was1 = False

                    collector.last_index = collector.index
                    collector.index = step

                    if collector.last_index != collector.index:
                        if collector.next_index_was1:
                            self.aux_grid_matrix[collector.last_index] = 1
                        else:
                            self.aux_grid_matrix[collector.last_index] = 0

                    self.aux_grid_matrix[collector.index] = 10

                    self.path_indices[i] += 1

                if self.path_indices[i] == len(self.paths[i]):
                    collector.capacity = self.p['capacity']
                    self.collecting[i] = False

            else:
                if collector.capacity == 0 and self.custom_t > 0:
                    start = collector.index
                    goal = (0, self.p['size'][0] - 1)
                    path_to_goal = self.a_star(self.aux_grid_matrix, start, goal, {5, -1}) or []
                    path_back = self.a_star(self.aux_grid_matrix, goal, start, {5, -1}) or []

                    self.paths[i] = path_to_goal + path_back
                    if len(self.paths[i]) > 1:
                        self.paths[i] = self.paths[i][1:]

                    self.path_indices[i] = 0
                    self.collecting[i] = True
                else:
                   
                    path_to_harvester = self.a_star(self.aux_grid_matrix, collector.index, harvester.index, {-1})
                    collector.last_index = collector.index

                    if self.lista_coordenadas:  
                        distances = [abs(harvester.index[0] - crop[0]) + abs(harvester.index[1] - crop[1]) for crop in self.lista_coordenadas]
                        if distances:
                            closest_crop_index = distances.index(min(distances))

                            path_to_crop = self.a_star(self.aux_grid_matrix, harvester.index, self.lista_coordenadas[closest_crop_index], {-1, 10, 5})
                            
                            harvester.last_index = harvester.index

                            if path_to_crop:
                                if len(path_to_crop) > 1:
                                    harvester.index = path_to_crop[1]

                            if harvester.index in self.lista_coordenadas:
                                self.crops_collected.add(harvester.index)
                                self.lista_coordenadas.remove(harvester.index)

                            self.aux_grid_matrix[harvester.last_index] = 0
                            self.aux_grid_matrix[harvester.index] = 5

                    if path_to_harvester:
                        if len(path_to_harvester) > 1 and path_to_harvester[1] != harvester.index:
                            collector.index = path_to_harvester[1]

                    self.aux_grid_matrix[collector.last_index] = 0
                    self.aux_grid_matrix[collector.index] = 10
                    collector.capacity -= 1


    def update(self):
        if self.total_crops == len(self.crops_collected):
            self.stop()

class Harvester(ap.Agent):
    def setup(self, index):
        self.index = index
        self.last_index = index

class Collector(ap.Agent):
    def setup(self, index):
        self.index = index
        self.last_index = index
        self.capacity = 0
        self.next_index_was1 = False

parameters = {
    'steps': 397,
    'size': (15, 15),
    'capacity': 20,
    'obstacles': 20,
    'tractors': 3
}

def my_plot(model, ax):
    ax.set_title(f'Step: {model.t}')
    ax.grid(True)
    ax.clear()

    ax.imshow(model.aux_grid_matrix, cmap='ocean', interpolation='nearest')

fig, ax = plt.subplots()
model = HarvestModel(parameters)

animation = ap.animate(model, fig, ax, my_plot)
IPython.display.HTML(animation.to_jshtml(fps=6))
