## A simple swarm

In [3]:
%load_ext autoreload 
%autoreload 2
import sys
sys.tracebacklimit = 0

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
from core.agent import *
from core.world import * 
from core.map import *
import numpy as np
from core.render import * 
from core.env_params import * 
from core.skill import * 

In [5]:
def initialize_swarm(world : World):
    swarm = [Agent() for i in range(SWARM_SIZE)]
    skill_initializer = DefaultSkillInitializer(num_skills = PRODUCT_TYPES)
    for agent in swarm:
        skill_initializer.forward(agent)
        world.add_agent(agent)
    swarm = initialize_positions_randomly(world, swarm)
    

SyntaxError: invalid syntax (3151286362.py, line 3)

In [8]:
world = World(dims = WORLD_DIMS,
              swarm_initialzier= initialize_swarm,
              resource_generator= RandomMapGenerator(RESOURCE_TYPES),
              energy_model=EnergyModel()
              )
world.reset()

In [9]:
# Test movements by doing random actions
def update():
    world.update()
    # Comms protocol (receiving)
    for agent in world.agents: 
        received_messages = agent.get_messages()
        for msg in received_messages: 
            agent.add_relation(msg.sender, msg.message)
        agent.clear_messages()

    for agent in world.agents:
        action_choice = np.random.randint(0, 4)
        choice = np.random.randint(1, 5)

        match (action_choice): 
            case 1: agent.move(choice)
            case 2: agent.pick_up(choice)
            case 3: agent.put_down(choice)
    
    # Comms protocol (sending )
    for agent in world.agents:
        neighbors = agent.agents_in_range 

        # Send agents
        if len (neighbors) > 0:
            choice = np.random.choice(neighbors)
            tgt_agent = world.get_agent(choice)
            agent.talk_to(tgt_agent)

update()

In [10]:
render_world(world, (800, 800), update_fn=update, delay_s=0)

# Data Visualization

## Social Network 

In [7]:
import networkx as nx 
import matplotlib.pyplot as plt

In [8]:
network = nx.graph.Graph()

for agent in world.agents:
    for neighbor in agent._current_state.relations:
        network.add_edge(agent.id, neighbor, weight = agent._current_state.relations[neighbor])

In [None]:
edges,weights = zip(*nx.get_edge_attributes(network ,'weight').items())
nx.draw_kamada_kawai(network, node_size = 10, edge_color = weights, edge_cmap = plt.cm.Spectral)

## Verify certain things about the simulation

In [10]:
swarm = world.agents

In [None]:
# Validate the observation space provided is sensible
obs = swarm[123].local_observation

print(obs.nearby_agents, obs.nearby_agents.shape)
print(obs.resource_types, obs.resource_types.shape)

In [None]:
# Validate that all swarm agents landed in different positions
positions = set()
for agent in swarm:
    pos = agent.current_position
    positions.add((pos[0], pos[1]))

print(len(positions), len(world.agents))

In [None]:
# Check for resources
rsrc_qty = world._resource_map._resource_quantity_map
qtys = []

for i in range(rsrc_qty.shape[0]):
    for j in range(rsrc_qty.shape[1]):
        if rsrc_qty[i, j] > 0:
            qtys.append(rsrc_qty[i, j])

print(qtys)

In [None]:
# Check for agent state
state = swarm[0]._current_state.inventory
print(state)


# Perftest

In [90]:
import cProfile
def stress_test():
    np.random.seed(42)
    world.reset()
    for _ in range(0, 1000):
        update()

In [None]:

cProfile.run('stress_test()', sort = 'time')

In [None]:
cProfile.run('world.update()', sort = 'time')