In [1]:
from dataclasses import dataclass, field
from vi import Agent, Config, Simulation, Vector2
import pygame as pg
import os
import numpy as np
import random

pygame 2.6.1 (SDL 2.28.4, Python 3.12.3)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
frame_dir = "frames"
os.makedirs(frame_dir, exist_ok=True)
frame_count = 0

def save_frame(screen):
    global frame_count
    pg.image.save(screen, os.path.join(frame_dir, f"frame_{frame_count:05d}.png"))
    frame_count += 1

In [6]:
class RecordingSimulation(Simulation):
    def __init__(self, config):
        super().__init__(config)
        self.frame_count = 0
        os.makedirs("frames", exist_ok=True)

    def after_update(self) -> None:
        # Draw everything to the screen
        self._all.draw(self._screen)

        if self.config.visualise_chunks:
            self.__visualise_chunks()

        # Save current frame as an image
        # pg.image.save(self._screen, f"frames/frame_{self.frame_count:05d}.png")

        # Update the screen with the new image
        pg.display.flip()

        self._clock.tick(self.config.fps_limit)

        current_fps = self._clock.get_fps()
        if current_fps > 0:
            self._metrics.fps._push(current_fps)

            if self.config.print_fps:
                print(f"FPS: {current_fps:.1f}")  # noqa: T201

        # Increment a frame counter (you may need to initialize it somewhere)
        self.frame_count += 1

        #update pheromone grid
        self.config.pheromone_grid_decay()


In [14]:
@dataclass
class AggregationConfig(Config):
    pheromone_decay: float = 0.01
    cell_size: int = 50
    pheromone_grid: np.ndarray = field(default_factory=lambda: np.zeros((15, 15)))
    attraction_strength: float = 0.5
    random_movement: float = 0.05  
    stopping_threshold: float = 10

    def update_pheromone_grid(self, x, y):
        """Update pheromone grid at the agent's position"""
        grid_x = int(x // self.cell_size)
        grid_y = int(y // self.cell_size)
        if 0 <= grid_x < len(self.pheromone_grid) and 0 <= grid_y < len(self.pheromone_grid[0]):
            self.pheromone_grid[grid_x][grid_y] += 0.01
    
    def pheromone_grid_decay(self):
        """Decay pheromone levels in the grid"""
        self.pheromone_grid = np.where(self.pheromone_grid < 0.001, 0, self.pheromone_grid - self.pheromone_decay)
    
    def get_pheromone_gradient(self, x, y):
        grid_x = int(x // self.cell_size)
        grid_y = int(y // self.cell_size)
        
        neighbors = []
        for dx in [-1, 0, 1]:
            for dy in [-1, 0, 1]:
                nx, ny = grid_x + dx, grid_y + dy
                if 0 <= nx < self.pheromone_grid.shape[0] and 0 <= ny < self.pheromone_grid.shape[1]:
                    neighbors.append((dx, dy, self.pheromone_grid[nx, ny]))
        
        if not neighbors:
            return Vector2(0, 0)  #return Vector2 instead of numpy array
        
        max_dx, max_dy, max_val = max(neighbors, key=lambda x: x[2])
        return Vector2(max_dx, max_dy) * max_val

class FlockingAgent(Agent):
    def on_spawn(self):
        self.stopped = False
        return super().on_spawn()
    
    def change_position(self):
        self.there_is_no_escape()
            
        current_grid_x = int(self.pos[0] // self.config.cell_size)
        current_grid_y = int(self.pos[1] // self.config.cell_size)
        
        if (0 <= current_grid_x < self.config.pheromone_grid.shape[0] and 
            0 <= current_grid_y < self.config.pheromone_grid.shape[1]):
            if self.config.pheromone_grid[current_grid_x, current_grid_y] > self.config.stopping_threshold:
                self.stopped = True
                return
        
        self.config.update_pheromone_grid(self.pos[0], self.pos[1])
        
        gradient = self.config.get_pheromone_gradient(self.pos[0], self.pos[1])
        random_dir = Vector2(random.uniform(-1, 1), random.uniform(-1, 1))
        
        new_direction = (gradient * self.config.attraction_strength + 
                        random_dir * self.config.random_movement)
        
        if new_direction.length() > 0:
            new_direction = new_direction.normalize()
        
        self.move = new_direction * self.config.movement_speed
        self.there_is_no_escape()
        self.pos += self.move

(
    RecordingSimulation(
        AggregationConfig(image_rotation = True, movement_speed = 5.0, radius = 25, seed = 777, duration = 10000, fps_limit = 30)
    )
    .batch_spawn_agents(100, FlockingAgent, images=["images/tarakan.png"])
    .run()
)

KeyboardInterrupt: 