In [1]:
! pip install pygame



DEPRECATION: uiutil 1.38.0 has a non-standard dependency specifier future>=0.16.0attrs>=16.3.0. pip 24.0 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of uiutil or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063


### Scenario 1

# Swarm Chaining Simulation Overview

This simulation models a swarm of bots attempting to find a goal using indirect communication through a central server.

## Key Concepts

- **Bots**: Each bot has position, direction, velocity, and a field of view (FOV).
- **FOV**: Bots can detect other bots and the goal only if within their 90° cone of vision.
- **Goal**: A red square that can be moved by the user.
- **Informed Bots**: Bots that see the goal, or see another informed bot.
- **Server**: Gathers visibility data from all bots and builds a "relay chain" to spread goal awareness.

## Behavior

1. Bots that see the goal are marked as informed.
2. Any bot that sees an informed bot becomes informed.
3. The server sends movement instructions toward the goal or the closest informed bot.
4. Uninformed bots wander randomly.
5. The user can drag the goal or individual bots.

This creates emergent, coordinated movement toward the goal without direct bot-to-bot communication.


In [1]:
import pygame
import math
import random

# === CONFIG ===
SCREEN_WIDTH, SCREEN_HEIGHT = 800, 600
NUM_BOTS = 30
BOT_SIZE = 12
FOV_ANGLE = 90
FOV_LENGTH = 160
SPEED_LIMIT = 2.0

COLORS = {
    'background': (0, 0, 0),
    'bot': (80, 170, 255),
    'fov': (80, 200, 255, 24),
    'goal': (255, 0, 0),
    'informed': (0, 255, 120)
}

pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption("Swarm Chaining: 100% Honest")
clock = pygame.time.Clock()

def clamp(val, minv, maxv):
    return max(minv, min(val, maxv))

class Bot:
    def __init__(self, bot_id, x, y):
        self.bot_id = bot_id
        self.x = x
        self.y = y
        self.angle = random.uniform(0, 360)
        self.vx = math.cos(math.radians(self.angle)) * random.uniform(1, SPEED_LIMIT)
        self.vy = math.sin(math.radians(self.angle)) * random.uniform(1, SPEED_LIMIT)
        self.fov = FOV_ANGLE
        self.fov_len = FOV_LENGTH

    def move(self):
        self.x = clamp(self.x + self.vx, 0, SCREEN_WIDTH)
        self.y = clamp(self.y + self.vy, 0, SCREEN_HEIGHT)
        if self.x == 0 or self.x == SCREEN_WIDTH:
            self.vx *= -1
        if self.y == 0 or self.y == SCREEN_HEIGHT:
            self.vy *= -1
        self.angle = math.degrees(math.atan2(self.vy, self.vx))

    def sense(self, bots, goal_pos):
        visible_bots = []
        for bot in bots:
            if bot.bot_id == self.bot_id:
                continue
            dx, dy = bot.x - self.x, bot.y - self.y
            dist = math.hypot(dx, dy)
            angle = (math.degrees(math.atan2(dy, dx)) - self.angle + 360) % 360
            angle_diff = ((angle + 180) % 360) - 180
            if dist <= self.fov_len and abs(angle_diff) <= self.fov / 2:
                visible_bots.append({
                    'bot_id': bot.bot_id,
                    'distance': dist,
                    'angle': angle_diff
                })
        # Goal
        dxg, dyg = goal_pos[0] - self.x, goal_pos[1] - self.y
        distg = math.hypot(dxg, dyg)
        angleg = (math.degrees(math.atan2(dyg, dxg)) - self.angle + 360) % 360
        angle_diffg = ((angleg + 180) % 360) - 180
        goal_info = None
        if distg <= self.fov_len and abs(angle_diffg) <= self.fov / 2:
            goal_info = {'distance': distg, 'angle': angle_diffg}
        return {
            'bot_id': self.bot_id,
            'visible_bots': visible_bots,
            'goal_detection': goal_info
        }

    def set_move(self, angle, speed):
        theta = math.radians(self.angle + angle)
        self.vx = math.cos(theta) * clamp(speed, 0, SPEED_LIMIT)
        self.vy = math.sin(theta) * clamp(speed, 0, SPEED_LIMIT)

    def draw(self, surface, informed=False):
        color = COLORS['informed'] if informed else COLORS['bot']
        pygame.draw.rect(surface, color,
            (self.x - BOT_SIZE//2, self.y - BOT_SIZE//2, BOT_SIZE, BOT_SIZE))
        # Draw FOV
        fov_surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT), pygame.SRCALPHA)
        points = [(self.x, self.y)]
        for a in range(-self.fov//2, self.fov//2 + 1, 3):
            rad = math.radians(self.angle + a)
            px = self.x + self.fov_len * math.cos(rad)
            py = self.y + self.fov_len * math.sin(rad)
            points.append((px, py))
        pygame.draw.polygon(fov_surface, COLORS['fov'], points)
        surface.blit(fov_surface, (0,0))


# === SERVER (Relay Chaining) ===
class Server:
    def __init__(self, num_bots):
        self.num_bots = num_bots
        self.bot_reports = {}

    def process(self, senses):
        self.bot_reports = {d['bot_id']: d for d in senses}
        # Build the relay chain
        informed = set()
        goal_direction = {}
        # First: direct goal observers
        for bot_id, report in self.bot_reports.items():
            if report['goal_detection']:
                informed.add(bot_id)
                goal_direction[bot_id] = report['goal_detection']

        # Now relay: while bots can see any informed bot, add them to informed
        changed = True
        while changed:
            changed = False
            for bot_id, report in self.bot_reports.items():
                if bot_id in informed:
                    continue
                for vb in report['visible_bots']:
                    if vb['bot_id'] in informed:
                        # "I see an informed bot"
                        informed.add(bot_id)
                        # Move toward the most recently informed bot you see
                        goal_direction[bot_id] = {'distance': vb['distance'], 'angle': vb['angle']}
                        changed = True
                        break
        return informed, goal_direction

    def get_instruction(self, bot_id, goal_direction):
        if bot_id in goal_direction:
            angle = goal_direction[bot_id]['angle']
            speed = min(SPEED_LIMIT, goal_direction[bot_id]['distance'] * 0.2 + 0.8)
            return {'move_angle': angle, 'speed': speed}
        else:
            # Random wander
            return {'move_angle': random.uniform(-60, 60), 'speed': 1.0}

# === INIT ===
goal_pos = (random.randint(120, SCREEN_WIDTH - 120), random.randint(120, SCREEN_HEIGHT - 120))
bots = [Bot(i, random.randint(60, SCREEN_WIDTH - 60), random.randint(60, SCREEN_HEIGHT - 60)) for i in range(NUM_BOTS)]
server = Server(NUM_BOTS)

# === MAIN LOOP ===
running = True
dragging_goal = False
dragging_bot_id = None
drag_offset = (0, 0)

while running:
    mouse_pos = pygame.mouse.get_pos()

    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

        elif event.type == pygame.MOUSEBUTTONDOWN:
            mx, my = event.pos
            gx, gy = goal_pos

            # Check if goal is clicked
            if abs(mx - gx) <= 14 and abs(my - gy) <= 14:
                dragging_goal = True
                drag_offset = (mx - gx, my - gy)
            else:
                # Check if any bot is clicked
                for bot in bots:
                    if abs(mx - bot.x) <= BOT_SIZE and abs(my - bot.y) <= BOT_SIZE:
                        dragging_bot_id = bot.bot_id
                        drag_offset = (mx - bot.x, my - bot.y)
                        break

        elif event.type == pygame.MOUSEBUTTONUP:
            dragging_goal = False
            dragging_bot_id = None

        elif event.type == pygame.MOUSEMOTION:
            mx, my = event.pos
            if dragging_goal:
                goal_pos = (
                    clamp(mx - drag_offset[0], 0, SCREEN_WIDTH),
                    clamp(my - drag_offset[1], 0, SCREEN_HEIGHT)
                )
            elif dragging_bot_id is not None:
                bot = bots[dragging_bot_id]
                bot.x = clamp(mx - drag_offset[0], 0, SCREEN_WIDTH)
                bot.y = clamp(my - drag_offset[1], 0, SCREEN_HEIGHT)
                bot.vx, bot.vy = 0, 0  # Freeze movement during drag

    screen.fill(COLORS['background'])
    pygame.draw.rect(screen, COLORS['goal'], (goal_pos[0]-7, goal_pos[1]-7, 14, 14))

    senses = [bot.sense(bots, goal_pos) for bot in bots]
    informed, goal_direction = server.process(senses)

    for bot in bots:
        if dragging_bot_id == bot.bot_id:
            continue  # Skip movement for bot being dragged
        instr = server.get_instruction(bot.bot_id, goal_direction)
        bot.set_move(instr['move_angle'], instr['speed'])
        bot.move()
        bot.draw(screen, informed=bot.bot_id in informed)

    # Draw dragged bot on top
    if dragging_bot_id is not None:
        bots[dragging_bot_id].draw(screen, informed=bots[dragging_bot_id].bot_id in informed)

    pygame.display.flip()
    clock.tick(30)

pygame.quit()


pygame 2.3.0 (SDL 2.24.2, Python 3.9.18)
Hello from the pygame community. https://www.pygame.org/contribute.html


### Scenario 2


# Swarm Simulation with Persistent Knowledge

This simulation extends swarm behavior with **persistent, sticky knowledge sharing** using a union-find structure (DSU). Bots share goal information permanently once connected.

## Key Concepts

- **Bots**: Each bot explores and remembers visited grid cells. Bots have position, velocity, heading, and a 90° FOV.
- **FOV Mapping**: Each bot marks a grid-based memory of what it has seen.
- **Goal Detection**: If a bot sees the goal, it stores the goal grid and stops when reaching it.
- **Union-Find (DSU)**: Bots that can see each other are merged into a group. Knowledge spreads instantly and permanently within each group.
- **Sticky Knowledge**: Once a bot in a group learns the goal, **all bots in that group become informed**, even if they never saw the goal themselves.
- **Exploration Overlay**: The combined visible regions of all bots are visualized on screen.

This setup simulates how persistent memory and indirect visual contact can accelerate cooperative behavior in decentralized systems.


In [3]:
import pygame
import math
import random
import numpy as np

# === CONFIGURATION ===
SCREEN_WIDTH, SCREEN_HEIGHT = 800, 600
NUM_BOTS = 20
BOT_SIZE = 13
FOV_ANGLE = 90
FOV_LENGTH = 120
SPEED_LIMIT = 2.2

GRID_SIZE = 18
GRID_W = SCREEN_WIDTH // GRID_SIZE
GRID_H = SCREEN_HEIGHT // GRID_SIZE

COLORS = {
    'background': (0, 0, 0),
    'bot': (80, 170, 255),
    'stopped': (120, 255, 80),
    'informed': (220, 200, 40),
    'fov': (80, 200, 255, 28),
    'goal': (255, 0, 0),
    'explored_overlay': (60, 140, 200, 20)
}

pygame.init()
screen = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
pygame.display.set_caption("Swarm: True Lifetime Sticky Knowledge")
clock = pygame.time.Clock()

FOV_ANGLES = list(range(-FOV_ANGLE//2, FOV_ANGLE//2+1, 7))
FOV_DISTANCES = list(range(0, FOV_LENGTH+1, GRID_SIZE//2))

def clamp(val, minv, maxv):
    return max(min(val, maxv), minv)

def mark_fov_explored(x, y, heading_deg, explored_map):
    for angle in FOV_ANGLES:
        rad = math.radians(heading_deg + angle)
        cos_a = math.cos(rad)
        sin_a = math.sin(rad)
        for d in FOV_DISTANCES:
            px = x + d * cos_a
            py = y + d * sin_a
            gx = int(px // GRID_SIZE)
            gy = int(py // GRID_SIZE)
            if 0 <= gx < GRID_W and 0 <= gy < GRID_H:
                explored_map[gx, gy] = True

def robots_see_each_other(botA, botB):
    dx, dy = botB.x - botA.x, botB.y - botA.y
    dist_sq = dx*dx + dy*dy
    if dist_sq >= FOV_LENGTH * FOV_LENGTH:
        return False
    theta = math.atan2(dy, dx)
    rel_angle = math.degrees(theta - botA.heading)
    rel_angle = ((rel_angle + 180) % 360) - 180
    return abs(rel_angle) < FOV_ANGLE/2

class Bot:
    def __init__(self, bot_id, start_pos):
        self.bot_id = bot_id
        self.x, self.y = start_pos
        self.heading = random.uniform(0, 2*math.pi)
        self.vx = math.cos(self.heading) * SPEED_LIMIT
        self.vy = math.sin(self.heading) * SPEED_LIMIT
        self.local_explored = np.zeros((GRID_W, GRID_H), dtype=bool)
        self.goal_grid = None
        self.stopped = False

    def sees_goal(self, goal_world):
        dx, dy = goal_world[0] - self.x, goal_world[1] - self.y
        dist_sq = dx*dx + dy*dy
        if dist_sq >= FOV_LENGTH * FOV_LENGTH:
            return False
        theta = math.atan2(dy, dx)
        rel_angle = math.degrees(theta - self.heading)
        rel_angle = ((rel_angle + 180) % 360) - 180
        return abs(rel_angle) < FOV_ANGLE/2

    def update_goal_status(self, goal_pos):
        if self.stopped:
            return False
        if self.sees_goal(goal_pos):
            gx = clamp(int(goal_pos[0] // GRID_SIZE), 0, GRID_W - 1)
            gy = clamp(int(goal_pos[1] // GRID_SIZE), 0, GRID_H - 1)
            self.goal_grid = (gx, gy)
            return True
        return False

    def move(self, informed, goal_grid):
        if self.stopped:
            return
        if informed and goal_grid:
            gx, gy = goal_grid
            cx, cy = gx*GRID_SIZE + GRID_SIZE//2, gy*GRID_SIZE + GRID_SIZE//2
            dx, dy = cx - self.x, cy - self.y
            dist = math.hypot(dx, dy)
            if dist > 2:
                self.heading = math.atan2(dy, dx)
                speed = min(SPEED_LIMIT, dist)
                self.x += clamp(dx, -speed, speed)
                self.y += clamp(dy, -speed, speed)
            else:
                # Arrived at goal
                self.stopped = True
        else:
            # Wander
            margin = 8
            near_wall = (
                self.x < margin or self.x > SCREEN_WIDTH - margin or
                self.y < margin or self.y > SCREEN_HEIGHT - margin
            )
            if near_wall:
                if self.x < margin:
                    self.heading = 0
                elif self.x > SCREEN_WIDTH - margin:
                    self.heading = math.pi
                if self.y < margin:
                    self.heading = math.pi/2
                elif self.y > SCREEN_HEIGHT - margin:
                    self.heading = -math.pi/2
                self.heading += random.uniform(-0.7, 0.7)
            else:
                self.heading += random.uniform(-0.15, 0.15)
            self.vx = math.cos(self.heading) * SPEED_LIMIT
            self.vy = math.sin(self.heading) * SPEED_LIMIT
            self.x += self.vx
            self.y += self.vy
            self.x = clamp(self.x, margin, SCREEN_WIDTH-margin)
            self.y = clamp(self.y, margin, SCREEN_HEIGHT-margin)

    def draw(self, surface, informed):
        if self.stopped:
            color = COLORS['stopped']
        elif informed:
            color = COLORS['informed']
        else:
            color = COLORS['bot']
        pygame.draw.rect(surface, color, (int(self.x)-BOT_SIZE//2, int(self.y)-BOT_SIZE//2, BOT_SIZE, BOT_SIZE))
        # Draw FOV
        fov_surface = pygame.Surface((SCREEN_WIDTH, SCREEN_HEIGHT), pygame.SRCALPHA)
        points = [(self.x, self.y)]
        for a in range(-FOV_ANGLE//2, FOV_ANGLE//2 + 1, 15):
            rad = self.heading + math.radians(a)
            px = self.x + FOV_LENGTH * math.cos(rad)
            py = self.y + FOV_LENGTH * math.sin(rad)
            points.append((px, py))
        pygame.draw.polygon(fov_surface, COLORS['fov'], points)
        surface.blit(fov_surface, (0, 0))

def draw_overlayed_explored(surface, bots):
    overlay = np.zeros((GRID_W, GRID_H), dtype=bool)
    for bot in bots:
        np.logical_or(overlay, bot.local_explored, out=overlay)
    cell = pygame.Surface((GRID_SIZE, GRID_SIZE), pygame.SRCALPHA)
    cell.fill(COLORS['explored_overlay'])
    for gx in range(GRID_W):
        for gy in range(GRID_H):
            if overlay[gx, gy]:
                surface.blit(cell, (gx*GRID_SIZE, gy*GRID_SIZE))

# === Disjoint-Set/Union-Find for sticky lifetime merging ===
class DSU:
    def __init__(self, N):
        self.parent = list(range(N))
        self.size = [1]*N
        # Each set keeps: informed status, goal_grid (if known)
        self.informed = [False]*N
        self.goal_grid = [None]*N

    def find(self, x):
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])
        return self.parent[x]

    def union(self, x, y):
        xr = self.find(x)
        yr = self.find(y)
        if xr == yr:
            return
        # Always attach smaller to bigger
        if self.size[xr] < self.size[yr]:
            xr, yr = yr, xr
        self.parent[yr] = xr
        self.size[xr] += self.size[yr]
        # Propagate informed and goal_grid
        if self.informed[yr] or self.informed[xr]:
            self.informed[xr] = True
            self.informed[yr] = True
        # Prefer whichever goal_grid is known
        if self.goal_grid[xr] is None and self.goal_grid[yr] is not None:
            self.goal_grid[xr] = self.goal_grid[yr]
        if self.goal_grid[yr] is None and self.goal_grid[xr] is not None:
            self.goal_grid[yr] = self.goal_grid[xr]
        if self.goal_grid[xr] is None and self.goal_grid[yr] is None:
            pass
        # If both known and different, just pick one

    def set_informed(self, x, goal_grid):
        xr = self.find(x)
        self.informed[xr] = True
        if goal_grid is not None:
            self.goal_grid[xr] = goal_grid

    def get_informed(self, x):
        return self.informed[self.find(x)]

    def get_goal_grid(self, x):
        return self.goal_grid[self.find(x)]

# === INIT ===
margin = 40
goal_pos = (
    random.randint(margin, SCREEN_WIDTH-margin),
    random.randint(margin, SCREEN_HEIGHT-margin)
)
bots = [Bot(i, (
    random.randint(margin, SCREEN_WIDTH-margin),
    random.randint(margin, SCREEN_HEIGHT-margin)
)) for i in range(NUM_BOTS)]

dsu = DSU(NUM_BOTS)

running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

    screen.fill(COLORS['background'])

    # Mark explored by each bot's FOV
    for bot in bots:
        mark_fov_explored(bot.x, bot.y, math.degrees(bot.heading), bot.local_explored)

    # Union-find: Merge sets for all FOV links (even stopped bots)
    for i in range(NUM_BOTS):
        for j in range(i+1, NUM_BOTS):
            if robots_see_each_other(bots[i], bots[j]) or robots_see_each_other(bots[j], bots[i]):
                dsu.union(i, j)

    # Let bots who see the goal directly update their set as informed
    for i, bot in enumerate(bots):
        if bot.update_goal_status(goal_pos):
            dsu.set_informed(i, bot.goal_grid)

    # After any merges, propagate knowledge within each set forever
    for i in range(NUM_BOTS):
        if dsu.get_informed(i):
            # Ensure any bot in this set gets the correct goal_grid
            if dsu.get_goal_grid(i) is not None:
                bots[i].goal_grid = dsu.get_goal_grid(i)

    # Move and stop if reached
    for i, bot in enumerate(bots):
        bot.move(dsu.get_informed(i), dsu.get_goal_grid(i))

    pygame.draw.circle(screen, COLORS['goal'], (goal_pos[0], goal_pos[1]), 12)
    draw_overlayed_explored(screen, bots)
    for i, bot in enumerate(bots):
        bot.draw(screen, dsu.get_informed(i))

    pygame.display.flip()
    clock.tick(30)

pygame.quit()
