In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.colors as mcolors
from matplotlib.animation import FuncAnimation

from IPython.display import HTML

# Set the limit to, for example, 100 MB
plt.rcParams['animation.embed_limit'] = 100
%matplotlib inline

import numpy as np
import random
import pandas as pd
import time

#import warnings
#warnings.filterwarnings('ignore', category=FutureWarning)

In [2]:
class Environment:
    def __init__(self, size, max_capacity=4, agent_vision_range=(1, 6), agent_metabolism_range=(1, 4), enable_seasons=True, tau=0.5, d=0.1):
        self.size = size
        self.enable_seasons = enable_seasons  # Control seasonal effects
        self.eco_crisis = []  # Store active ecological evils here
        self.active_diseases = []  # Initialize the list for active diseases
        self.tau = tau  # Resource allocation bias
        self.d = d  # Ensure d is a float for deadweight loss

        self.max_capacity = max_capacity
        self.agent_vision_range = agent_vision_range
        self.agent_metabolism_range = agent_metabolism_range
        
        self.northern_mountain_cells = []  # Stores coordinates of cells in the northern mountain
        self.southern_mountain_cells = []  # Stores coordinates of cells in the southern mountain
        
        # Initialization of other attributes...
        self.grid = np.zeros(size, dtype=int)
        self.max_sugar_capacity = np.zeros(size, dtype=int)
        self.initialize_sugar_capacity(max_capacity)
        self.initialize_sugar()
        self.agents = []
        self.occupied_locations = set()
        self.history = [np.copy(self.grid)]
        self.agents_history = []  # Make sure this is initialized here
        
        # Season tracking
        self.current_step = 0
        self.season_length = 50
        # Initially, northern half starts in summer, southern in winter
        self.northern_summer = True
        
        # Initialize counters
        self.replacements = 0
        self.deaths_due_to_age = 0
        self.deaths_due_to_starvation = 0

    def initialize_sugar_capacity(self, max_capacity):
        """Defines the maximum sugar capacity for each cell, including mountains."""
        # Initialize all cells with a default capacity
        self.max_sugar_capacity.fill(0)  # Assuming wasteland has minimal sugar

        # Define peaks for the sugar mountains
        peak1 = (int(self.size[0] / 4), int(self.size[1] / 4))  # Northern peak
        peak2 = (int(3 * self.size[0] / 4), int(3 * self.size[1] / 4))  # Southern peak

        # Define the sugar mountains
        for i in range(self.size[0]):
            for j in range(self.size[1]):
                distance1 = np.sqrt((i - peak1[0])**2 + (j - peak1[1])**2)
                distance2 = np.sqrt((i - peak2[0])**2 + (j - peak2[1])**2)

                # Determine if the cell is closer to the northern or southern peak
                if distance1 < distance2:
                    self.northern_mountain_cells.append((i, j))
                else:
                    self.southern_mountain_cells.append((i, j))

                # The further from the peak, the lower the maximum capacity
                capacity1 = max(max_capacity - int(distance1 / 3), 0)
                capacity2 = max(max_capacity - int(distance2 / 3), 0)

                # Assign the higher of the two capacities, capped at max_capacity
                self.max_sugar_capacity[i, j] = min(max(capacity1, capacity2), max_capacity)

    def initialize_sugar(self):
        """Initializes the grid with sugar according to max capacity."""
        self.grid = np.copy(self.max_sugar_capacity)  # Start with the max capacity as initial sugar levels

    def grow_sugar(self):
        """Simulates the regrowth of sugar on the grid, respecting each cell's max capacity."""
        # Reset grid for non-seasonal growth or inactive mountain areas
        self.grid = np.zeros(self.size, dtype=int)  # Resetting to zero if you want to simulate 'dying off'

        severity = 0  # Default to no crisis effect if none are active
        for evil in self.eco_crisis:
            if evil.active:
                severity = max(severity, evil.severity)  # Assume the worst severity applies

        """
        Simulates the regrowth of sugar on the grid, respecting each cell's max capacity and considering seasonality.
        Each cell can grow by 1 unit per step up to its adjusted (seasonal) maximum capacity.
        """    
        if self.enable_seasons:
            active_mountain_cells = self.northern_mountain_cells if self.northern_summer else self.southern_mountain_cells
            for x, y in active_mountain_cells:
                # Ensure the adjusted max sugar capacity doesn't drop below zero
                adjusted_max_capacity = max(self.max_sugar_capacity[y, x] - severity, 0)
                self.grid[y, x] = min(self.grid[y, x] + 6, adjusted_max_capacity)
        else:
            for y in range(self.size[1]):
                for x in range(self.size[0]):
                    # Ensure the adjusted max sugar capacity doesn't drop below zero
                    adjusted_max_capacity = max(self.max_sugar_capacity[y, x] - severity, 0)
                    self.grid[y, x] = min(self.grid[y, x] + 6, adjusted_max_capacity)
        
    def switch_seasons(self):
        if self.enable_seasons and self.current_step % self.season_length == 0:
            self.northern_summer = not self.northern_summer

    def update_eco_crisis(self):
        """Updates the status of all active ecological evils."""
        for crisis in list(self.eco_crisis):  # Use a copy of the list for safe modification
            if crisis.active:
                crisis.duration -= 1
                if crisis.duration <= 0:
                    crisis.deactivate()
                    self.eco_crisis.remove(crisis)
    
    def trigger_crisis(self, crisis):
        """Activates and adds an evil to the environment."""
        crisis.activate()
        self.eco_crisis.append(crisis)
    
    def update_diseases(self):
        """Updates the status of all active diseases."""
        for disease in list(self.active_diseases):
            if disease.active:
                disease.update(self)
                if disease.is_over():
                    disease.deactivate()
                    self.active_diseases.remove(disease)

    def trigger_disease(self, disease):
        """Activates a disease and initially infects a random agent if possible."""
        disease.activate()
        if disease.active:  # Only proceed if the disease is successfully activated
            self.active_diseases.append(disease)
            if self.agents:  # Ensure there are agents to infect
                initial_infected_agent = random.choice(self.agents)
                initial_infected_agent.infect(disease.severity, disease.duration)
    
    def capture_agents_initial_data(self):
        """Captures the initial data of all agents."""
        # Make sure this method is called after all agents are added
        self.agents_history.append([(agent.location, agent.type, agent.sugar, agent.metabolism, agent.vision, agent.age, agent.is_infected, agent.alive) for agent in self.agents])

    def add_agent(self, agent):
        self.agents.append(agent)
        self.occupied_locations.add(agent.location)

    def remove_agent(self, agent):
        """Removes an agent from the simulation."""
        self.agents.remove(agent)
        self.occupied_locations.discard(agent.location)

    def is_occupied(self, location):
        """Checks if a location is occupied by an agent."""
        return location in self.occupied_locations

    def move_agent(self, agent, new_location):
        """Moves an agent to a new location within the grid."""
        self.occupied_locations.discard(agent.location)
        agent.location = new_location
        self.occupied_locations.add(new_location)

    def harvest_sugar(self, location):
        """Allows an agent to harvest sugar from its current location."""
        sugar = self.grid[location]
        self.grid[location] = 0
        return sugar

    def generate_random_unoccupied_location(self):
        """Finds a random unoccupied location within the environment."""
        while True:
            location = (np.random.randint(0, self.size[0]), np.random.randint(0, self.size[1]))
            if not self.is_occupied(location):
                return location
    
    def find_adjacent_unoccupied_location(self, location):
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # Up, down, left, right
        random.shuffle(directions)  # Randomize directions to avoid bias
        for dx, dy in directions:
            new_x = (location[0] + dx) % self.size[0]
            new_y = (location[1] + dy) % self.size[1]
            new_location = (new_x, new_y)
            if not self.is_occupied(new_location):
                return new_location
        return None  # No unoccupied adjacent locations available
    
    def get_von_neumann_neighbors(self, location):
        """Returns the Von Neumann neighbors (up, down, left, right) of a given location."""
        x, y = location
        neighbors = []
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]  # Up, down, left, right
        for dx, dy in directions:
            nx, ny = (x + dx) % self.size[0], (y + dy) % self.size[1]
            if self.is_occupied((nx, ny)):
                neighbor = self.get_agent_at((nx, ny))
                if neighbor:
                    neighbors.append(neighbor)
        return neighbors

    def get_agent_at(self, location):
        """Returns the agent at the specified location, if any."""
        for agent in self.agents:
            if agent.location == location:
                return agent
        return None
    
    def play_resource_game(self, agent1, agent2):

        total_resources = agent1.sugar + agent2.sugar
        adjusted_resources = round(total_resources * (1 - self.d))  # Account for deadweight loss when both are selfish, and round it

        if agent1.type == agent2.type:
            # Homogeneous pair (either both altruistic or both selfish)
            if agent1.type == "selfish":
                agent1.sugar = round(adjusted_resources / 2)
                agent2.sugar = round(adjusted_resources / 2)
            else:
                agent1.sugar = round(total_resources / 2)
                agent2.sugar = round(total_resources / 2)
        else:
            # Heterogeneous pair (one altruistic, one selfish)
            if agent1.type == "selfish":
                agent1.sugar = round(total_resources / 2 + self.tau * total_resources)
                agent2.sugar = round(total_resources / 2 - self.tau * total_resources)
            else:
                agent1.sugar = round(total_resources / 2 - self.tau * total_resources)
                agent2.sugar = round(total_resources / 2 + self.tau * total_resources)

        # Ensure that sugar does not drop below zero due to rounding errors
        agent1.sugar = max(0, agent1.sugar)
        agent2.sugar = max(0, agent2.sugar)
    
    def step(self):
        """Advances the simulation by one step, including managing eco crises and diseases."""
        self.current_step += 1
        self.switch_seasons()
        self.update_eco_crisis()
        self.update_diseases()  # Make sure diseases are updated each step
        self.grow_sugar()

        random.shuffle(self.agents)

        for agent in list(self.agents):
            if not agent.alive:
                continue  # Skip dead agents for processing
            
            agent_initial_sugar = agent.sugar
            agent.step(self)  # Internal agent state management including disease effects
            
            # Play the resource game with neighboring agents
            neighbors = self.get_von_neumann_neighbors(agent.location)
            for neighbor in neighbors:
                self.play_resource_game(agent, neighbor)

            if agent.sugar < 1 and agent_initial_sugar > 0:
                self.deaths_due_to_starvation += 1
            if agent.age > agent.max_age:
                self.deaths_due_to_age += 1

        self.update_visualization_and_data()

    def update_visualization_and_data(self):
        """Updates the state of the grid and agents for visualization and collects data."""
        # Store the current grid state in history
        self.history.append(np.copy(self.grid))

        # Store the current state of agents in agents_history
        current_agents_state = [
            (
                agent.location,
                agent.type,
                agent.sugar,
                agent.metabolism,
                agent.vision,
                agent.age,
                agent.is_infected,
                agent.alive  # Include alive status
            ) for agent in self.agents
        ]
        # Collect data from the current agents state, which includes both living and dead agents at this step
        self.agents_history.append(current_agents_state)

In [3]:
class Agent:
    def __init__(self, location, sugar=20, metabolism=1, vision=2, max_age=100, age=0, type=None):
        self.location = location
        self.sugar = sugar
        self.original_metabolism = metabolism  # Store original metabolism for disease effect
        self.metabolism = max(1, metabolism)
        self.vision = max(1, vision)
        self.age = age
        self.max_age = max_age
        self.alive = True  # Initialize the agent as alive
        self.is_infected = False
        self.infection_duration = 0
        self.disease_severity = 0
        self.is_immune = False  # Track immunity status
        self.just_infected = False  # Track if just infected in the current step
        self.just_recovered = False  # Track if just recovered in the current step
        self.death_cause = None  # Initialize with None
        
        # Assign type randomly if not provided, otherwise use the provided type
        self.type = type if type is not None else np.random.choice(["selfish", "altruistic"])

    def infect(self, severity, duration):
        if severity > 0 and duration > 0 and not self.is_immune and not self.is_infected:
            self.is_infected = True
            self.disease_severity = severity
            self.infection_duration = duration
            self.just_infected = True  # Mark as just infected
            self.adjust_metabolism()

    def adjust_metabolism(self):
        if self.is_infected:
            self.metabolism = 4 * self.original_metabolism

    def manage_disease(self):
        if self.is_infected:
            self.infection_duration -= 1
            if self.infection_duration <= 0:
                self.recover()

    def recover(self):
        self.is_infected = False
        self.just_recovered = True  # Mark as just recovered
        self.reset_metabolism()
        self.is_immune = True  # Grant immunity after recovery

    def reset_metabolism(self):
        self.metabolism = self.original_metabolism
        
    def check_and_infect_neighbors(self, neighbors, severity):
        """Check neighboring agents and potentially infect them based on disease severity."""
        if self.is_infected:  # Only spread the disease if this agent is infected
            for neighbor in neighbors:
                if not neighbor.is_immune and not neighbor.is_infected:
                    infection_chance = random.random()
                    if infection_chance < severity:
                        neighbor.infect(severity, self.infection_duration)
                else:
                    status = 'immune' if neighbor.is_immune else 'already infected'
                    # This else block now does nothing with 'status', but you can use it for debugging if needed
                    
    def look_around(self, environment):
        """
        Agent decides where to move based on the surrounding sugar.
        """
        best_sugar_level = -1
        best_location = self.location

        # Explore surrounding cells within vision
        for dx in range(-self.vision, self.vision + 1):
            for dy in range(-self.vision, self.vision + 1):
                if dx == 0 and dy == 0:
                    continue  # Skip the agent's current location

                # Calculate the target location, considering wrap-around
                target_x = (self.location[0] + dx) % environment.size[0]
                target_y = (self.location[1] + dy) % environment.size[1]
                target_location = (target_x, target_y)

                # Ensure the target location is not occupied and within the grid
                if not environment.is_occupied(target_location):
                    sugar_level = environment.grid[target_location]
                    if sugar_level > best_sugar_level:
                        best_sugar_level = sugar_level
                        best_location = target_location

        return best_location

    def move(self, environment, new_location):
        """
        Moves the agent to a new location.
        """
        environment.move_agent(self, new_location)

    def harvest(self, environment):
        """
        Harvests sugar from the current location.
        """
        self.sugar += environment.harvest_sugar(self.location)

    def step(self, environment):
        if not self.alive:  # Skip processing for dead agents
            return

        # Reset infection and recovery markers at the beginning of each step
        self.just_infected = False
        self.just_recovered = False

        # Aging and checking for natural death due to old age
        self.age += 1
        if self.age > self.max_age:
            self.alive = False
            self.death_cause = 'age'
            environment.remove_agent(self)
            return

        # Disease management within the step function
        if self.is_infected:
            self.manage_disease()

        # Check for death due to running out of sugar
        if self.sugar <= 0:
            self.alive = False
            self.death_cause = 'starvation'
            environment.remove_agent(self)
            return

        # Movement and harvesting if alive
        if self.alive:
            new_location = self.look_around(environment)
            if new_location != self.location:
                self.move(environment, new_location)
            self.harvest(environment)

        # Subtract metabolism and check for death due to starvation
        self.sugar -= self.metabolism
        if self.sugar <= 0:
            self.alive = False
            self.death_cause = 'starvation'
            environment.remove_agent(self)
            return

        # Reproduction attempt only if still alive and conditions are met
        if self.alive:
            self.try_to_reproduce(environment)

    def try_to_reproduce(self, environment):
        # Set different sugar thresholds based on infection status
        sugar_threshold = 99 if self.is_infected else 49

        if self.age > 11 and self.age < 51 and self.sugar > sugar_threshold:
            new_location = environment.find_adjacent_unoccupied_location(self.location)
            if new_location:
                offspring = self.reproduce(new_location, environment)
                if offspring:  # Check if reproduction was successful
                    environment.add_agent(offspring)

    def reproduce(self, new_location, environment):
        # Adjust the sugar cost and offspring's sugar based on infection status
        if self.is_infected:
            reproduction_sugar_cost = int(0.75 * self.sugar)
            offspring_sugar = self.sugar // 10
        else:
            reproduction_sugar_cost = self.sugar // 2
            offspring_sugar = self.sugar // 5

        # Only proceed with reproduction if there's enough sugar
        if self.sugar > reproduction_sugar_cost:
            self.sugar -= reproduction_sugar_cost  # Parent loses sugar based on infection status

            # Define specific mutation functions for each attribute
            mutate_metabolism = lambda x: max(1, x + random.choice([-1, 0, 1]))
            mutate_vision = lambda x: max(1, x + random.choice([-2, 0, 2]))
            mutate_max_age = lambda x: x + random.choice([-5, 0, 5])

            # Apply these mutations to create offspring
            offspring = Agent(
                    location=new_location,
                    sugar=offspring_sugar,
                    metabolism=mutate_metabolism(self.metabolism),
                    vision=mutate_vision(self.vision),
                    max_age=mutate_max_age(self.max_age),
                    age=0,  # Newborn starts at age 0
                    type=self.type  # Inherit type from the parent
            )
            return offspring
        else:
            # Not enough sugar to reproduce, return None or similar to indicate failure
            return None

In [4]:
class EcoCrisis:
    def __init__(self, severity=0, duration=0, frequency=1):
        self.severity = severity
        self.duration = duration
        self.active = False  # Track whether the evil is currently active

    def activate(self):
        if self.severity > 0 and self.duration > 0:
            self.active = True
            # print(f"Crisis activated with severity {self.severity} and duration {self.duration}")
        else:
            # print("Crisis parameters set to zero, not activating disease.")
            pass

    def deactivate(self):
        self.active = False  # Mark the evil as inactive

    def apply_effect(self, environment):
        """Apply the effect to the environment; specifics should be implemented in subclasses."""
        pass

In [5]:
class Disease:
    def __init__(self, severity=0, duration=0, frequency=1):
        self.severity = severity
        self.duration = duration
        self.active = False

    def activate(self):
        if self.severity > 0 and self.duration > 0:
            self.active = True
            # print(f"Disease activated with severity {self.severity} and duration {self.duration}")
        else:
            # print("Disease parameters set to zero, not activating disease.")
            pass

    def deactivate(self):
        self.active = False

    def update(self, environment):
        """Update the disease status and apply effects to the environment."""
        if not self.active or self.duration <= 0:
            return  # Skip updating if the disease is not active or the duration has expired

        # Apply effects such as spreading the disease to agents
        for agent in environment.agents:
            if agent.is_infected:
                neighbors = environment.get_von_neumann_neighbors(agent.location)
                agent.check_and_infect_neighbors(neighbors, self.severity)

        # Decrement the duration only if the disease is active
        self.duration -= 1

    def is_over(self):
        """Check if the disease's active period has ended."""
        return self.duration <= 0

In [6]:
class DataCollection:
    def __init__(self):
        self.simulation_data = []

    def process_agent_history(self, agent_history, config_id, simulation_id, replacements, deaths_due_to_age, deaths_due_to_starvation):
        # Flatten the agent_history list and add simulation metadata
        flattened_data = [
            (config_id, simulation_id, step, *agent, replacements, deaths_due_to_age, deaths_due_to_starvation)
            for step, agents in enumerate(agent_history)
            for agent in agents
        ]
        # Append to the data list
        self.simulation_data.extend(flattened_data)

    def get_simulation_data(self):
        # Create a DataFrame with the collected data
        columns = [
            'config_id', 'simulation_id', 'step', 'location', 'type', 'sugar', 'metabolism', 'vision', 'age', 'is_infected', 'alive', 
            'replacements', 'deaths_due_to_age', 'deaths_due_to_starvation'
        ]
        return pd.DataFrame(self.simulation_data, columns=columns)

    def calculate_population_data(self):
        df = self.get_simulation_data()
        population_data = []

        for config_id in df['config_id'].unique():
            config_data = df[df['config_id'] == config_id]
            for simulation_id in config_data['simulation_id'].unique():
                simulation_data = config_data[config_data['simulation_id'] == simulation_id]
                for step in simulation_data['step'].unique():
                    step_data = simulation_data[simulation_data['step'] == step]
                    total_population = len(step_data)
                    altruistic_count = len(step_data[step_data['type'] == 'altruistic'])
                    selfish_count = len(step_data[step_data['type'] == 'selfish'])
                    population_data.append((config_id, simulation_id, step, total_population, altruistic_count, selfish_count))

        population_df = pd.DataFrame(population_data, columns=['config_id', 'simulation_id', 'step', 'total_population', 'altruistic_count', 'selfish_count'])
        return population_df

In [7]:
class Visualization:
    def __init__(self, environment, max_capacity=6):
        self.environment = environment
        self.max_capacity = max_capacity
        self.fig, self.ax = plt.subplots()
        self.colors = {'selfish': 'red', 'altruistic': 'blue'}
        
        # Dynamically create colormap and bounds based on max_capacity
        self.cmap = self.create_colormap(self.max_capacity)
        self.bounds = list(range(self.max_capacity + 1))
        self.norm = mcolors.BoundaryNorm(self.bounds, self.cmap.N)

    def create_colormap(self, max_capacity):
        base_colors = ['#c2b280', '#ccffcc', '#99ff99', '#66cc66', '#339933', '#004d00', '#f0e68c']
        return mcolors.ListedColormap(base_colors[:max_capacity + 1])

    def visualize_state(self, step=None):
        """Visualizes the state of the grid with agents at a given timestep."""
        self.ax.clear()
        grid_state = self.environment.grid if step is None else self.environment.history[step]
        self.ax.imshow(grid_state, cmap=self.cmap, norm=self.norm, interpolation='nearest')

        # Annotate each cell with its sugar level
        for (j, i), label in np.ndenumerate(grid_state):
            self.ax.text(i, j, label, ha='center', va='center', color='black')

        # Overlay agents as colored dots based on their type
        if step is None:
            agents = self.environment.agents
        else:
            agents = self.environment.agents_history[step]
        
        for agent in agents:
            location, agent_type, sugar, metabolism, vision, age, is_infected = agent
            color = self.colors[agent_type]
            self.ax.plot(location[1], location[0], 'o', color=color, markersize=5)
        
        self.ax.set_title('Grid State at Step {}'.format(step))
        self.ax.set_xticks([])
        self.ax.set_yticks([])
        plt.show()

    def animate_simulation(self, interval=200):
        def update(frame):
            self.ax.clear()
            grid_state = self.environment.history[frame]
            self.ax.imshow(grid_state, cmap=self.cmap, norm=self.norm, interpolation='nearest')
            
            agents_state = self.environment.agents_history[frame]
            for agent in agents_state:
                (agent_location, agent_type, sugar, metabolism, vision, age, is_infected, alive) = agent
                agent_color = self.colors[agent_type] if alive else 'gray'
                self.ax.plot(agent_location[1], agent_location[0], 'o', color=agent_color, markersize=5, alpha=1 if alive else 0.5)
            
            self.ax.set_xticks([])
            self.ax.set_yticks([])
            self.ax.set_title(f'Step {frame}')

        frames = len(self.environment.history)
        anim = FuncAnimation(self.fig, update, frames=frames, interval=interval, repeat=False)
        plt.close(self.fig)
        return anim

In [8]:
class Experiment:
    def __init__(self, num_agents, size, max_capacity, num_simulations, steps_per_simulation, 
                 agent_vision_range, agent_metabolism_range, configurations, 
                 agent_lifespan_range=(60, 100), enable_seasons=True, tau=0.5, d=0.1):
        self.num_agents = num_agents
        self.size = size
        self.max_capacity = max_capacity
        self.num_simulations = num_simulations
        self.steps_per_simulation = steps_per_simulation
        self.agent_vision_range = agent_vision_range
        self.agent_metabolism_range = agent_metabolism_range
        self.agent_lifespan_range = agent_lifespan_range
        self.enable_seasons = enable_seasons
        self.configurations = configurations
        self.tau = tau
        self.d = d
        self.data_collection = DataCollection()
        self.baseline_run = False

    def run_simulation(self, config_id, simulation_id):
        eco_part, disease_part = config_id.split('_')
        eco_severity, eco_duration, eco_frequency = map(int, eco_part.replace('Eco-', '').split('-'))
        disease_severity, disease_duration, disease_frequency = map(float, disease_part.replace('Disease-', '').split('-'))

        environment = Environment(
            self.size,
            self.max_capacity,
            self.agent_vision_range,
            self.agent_metabolism_range,
            self.enable_seasons,
            self.tau,
            self.d
        )

        crisis_steps = sorted(random.sample(range(self.steps_per_simulation), eco_frequency)) if eco_frequency > 0 else []
        disease_steps = sorted(random.sample(range(self.steps_per_simulation), int(disease_frequency))) if disease_frequency > 0 else []

        for _ in range(self.num_agents):
            location = environment.generate_random_unoccupied_location()
            vision = np.random.randint(self.agent_vision_range[0], self.agent_vision_range[1] + 1)
            metabolism = np.random.randint(self.agent_metabolism_range[0], self.agent_metabolism_range[1] + 1)
            sugar = np.random.randint(5, 26)
            max_age = np.random.randint(self.agent_lifespan_range[0], self.agent_lifespan_range[1] + 1)
            agent = Agent(location=location, sugar=sugar, metabolism=metabolism, vision=vision, max_age=max_age)
            environment.add_agent(agent)

        environment.capture_agents_initial_data()

        for step in range(self.steps_per_simulation):
            if step in crisis_steps:
                crisis = EcoCrisis(eco_severity, eco_duration)
                environment.trigger_crisis(crisis)
            if step in disease_steps:
                disease = Disease(disease_severity, disease_duration)
                environment.trigger_disease(disease)
            environment.step()

        self.data_collection.process_agent_history(
            environment.agents_history,
            config_id,
            simulation_id,
            environment.replacements,
            environment.deaths_due_to_age,
            environment.deaths_due_to_starvation
        )

    def run_experiment(self):
        futures = []
        for config_id in self.configurations:
            print(f"Running experiment with Configuration ID: {config_id}")

            eco_config, disease_config = config_id.split('_')
            eco_severity, eco_duration, eco_frequency = map(int, eco_config.replace('Eco-', '').split('-'))
            disease_severity, disease_duration, disease_frequency = map(float, disease_config.replace('Disease-', '').split('-'))

            is_baseline = (eco_severity == 0 and eco_duration == 0 and eco_frequency == 0 and
                           disease_severity == 0 and disease_duration == 0 and disease_frequency == 0)

            if is_baseline:
                if not self.baseline_run:
                    self.baseline_run = True
                for simulation in range(self.num_simulations):
                    self.run_simulation(config_id, simulation + 1)
                continue

            if ((eco_severity == 0 or eco_duration == 0 or eco_frequency == 0) and 
                (disease_severity == 0 or disease_duration == 0 or disease_frequency == 0)):
                continue

            for simulation in range(self.num_simulations):
                self.run_simulation(config_id, simulation + 1)

        population_data = self.data_collection.calculate_population_data()

        for config_id in self.configurations:
            print(f"\nPopulation data for Configuration ID: {config_id}")
            config_population_data = population_data[population_data['config_id'] == config_id]

            for simulation_id in config_population_data['simulation_id'].unique():
                print(f"\nSimulation {simulation_id}:")
                simulation_data = config_population_data[config_population_data['simulation_id'] == simulation_id]

                for step in sorted(simulation_data['step'].unique()):
                    step_data = simulation_data[simulation_data['step'] == step]
                    total_population = step_data['total_population'].values[0]
                    altruistic_count = step_data['altruistic_count'].values[0]
                    selfish_count = step_data['selfish_count'].values[0]
                    print(f"Step {step}: Total = {total_population}, Altruistic = {altruistic_count}, Selfish = {selfish_count}")

        return population_data

In [9]:
# Set random seed for reproducibility
np.random.seed(42)

# Experiment parameters
num_agents = 400
environment_size = (50, 50)
max_capacity = 6
num_simulations = 5
steps_per_simulation = 250
enable_seasons = False
vision_range = (1, 6)
metabolism_range = (1, 4)
agent_lifespan_range = (60, 100)
tau = 0.2
d = 0

configurations = {
    'Eco-0-0-0_Disease-0-0-0': {'eco': (0, 0, 0), 'disease': (0, 0, 0)},
    'Eco-4-25-5_Disease-1-15-9': {'eco': (4, 25, 5), 'disease': (1, 15, 9)},
    'Eco-8-25-5_Disease-1-15-9': {'eco': (8, 25, 5), 'disease': (1, 15, 9)}
}

# Create an instance of the Experiment class using Ray's remote method
experiment = Experiment(num_agents=num_agents, size=environment_size, max_capacity=max_capacity,
                               num_simulations=num_simulations, steps_per_simulation=steps_per_simulation,
                               agent_vision_range=vision_range, agent_metabolism_range=metabolism_range,
                               agent_lifespan_range=agent_lifespan_range, enable_seasons=enable_seasons,
                               configurations=configurations, tau=tau, d=d)

# Timing the experiment
start_time = time.time()

# Run the experiment in parallel
experiment.run_experiment()

# Calculate the total experiment time
end_time = time.time()
duration_seconds = end_time - start_time
duration_minutes = duration_seconds / 60

# Output the duration of the experiment
print(f"The experiment took {duration_minutes:.2f} minutes to complete.")

Running experiment with Configuration ID: Eco-0-0-0_Disease-0-0-0
Running experiment with Configuration ID: Eco-4-25-5_Disease-1-15-9
Running experiment with Configuration ID: Eco-8-25-5_Disease-1-15-9

Population data for Configuration ID: Eco-0-0-0_Disease-0-0-0

Simulation 1:
Step 0: Total = 400, Altruistic = 198, Selfish = 202
Step 1: Total = 399, Altruistic = 197, Selfish = 202
Step 2: Total = 399, Altruistic = 197, Selfish = 202
Step 3: Total = 396, Altruistic = 196, Selfish = 200
Step 4: Total = 393, Altruistic = 194, Selfish = 199
Step 5: Total = 388, Altruistic = 190, Selfish = 198
Step 6: Total = 386, Altruistic = 190, Selfish = 196
Step 7: Total = 384, Altruistic = 189, Selfish = 195
Step 8: Total = 380, Altruistic = 187, Selfish = 193
Step 9: Total = 379, Altruistic = 186, Selfish = 193
Step 10: Total = 377, Altruistic = 185, Selfish = 192
Step 11: Total = 375, Altruistic = 184, Selfish = 191
Step 12: Total = 416, Altruistic = 184, Selfish = 232
Step 13: Total = 433, Altrui

In [None]:
# List all available Configuration IDs
# print("Available Configuration IDs:", list(experiment.data_collections.keys()))
print("Total Available Configurations:", len(experiment.data_collections.keys()))

In [None]:
# Access the DataCollection instance for the chosen configuration
# Choose a configuration ID to examine
config_id_to_check = 'Eco-0-0-0_Disease-0-0-0'  # Example configuration ID

# Access the DataCollection instance for the chosen configuration from your Experiment instance
data_collector = experiment.data_collections[config_id_to_check]

if config_id_to_check in data_collector.data:
    # Population Data
    population_data = data_collector.data[config_id_to_check]['population_data']
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=population_data, x='Timestep', y='Population', hue='Type', style='Type', markers=True, dashes=False, estimator='mean', ci='sd')
    plt.title('Population Over Time with Mean and Standard Deviation')
    plt.xlabel('Timestep')
    plt.ylabel('Population')
    plt.legend(title='Agent Type')
    plt.grid(True)
    plt.show()

    # Metabolism Data
    metabolism_data = data_collector.data[config_id_to_check]['metabolism_data']
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=metabolism_data, x='Timestep', y='Metabolism', hue='Type', style='Type', markers=True, dashes=False, estimator='mean', ci='sd')
    plt.title('Metabolism Over Time with Mean and Standard Deviation')
    plt.xlabel('Timestep')
    plt.ylabel('Metabolism')
    plt.legend(title='Agent Type')
    plt.grid(True)
    plt.show()

    # Vision Data
    vision_data = data_collector.data[config_id_to_check]['vision_data']
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=vision_data, x='Timestep', y='Vision', hue='Type', style='Type', markers=True, dashes=False, estimator='mean', ci='sd')
    plt.title('Vision Over Time with Mean and Standard Deviation')
    plt.xlabel('Timestep')
    plt.ylabel('Vision')
    plt.legend(title='Agent Type')
    plt.grid(True)
    plt.show()

    # Wealth Data
    wealth_data = data_collector.data[config_id_to_check]['wealth_data']
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=wealth_data, x='Timestep', y='Wealth', hue='Type', style='Type', markers=True, dashes=False, estimator='mean', ci='sd')
    plt.title('Wealth Over Time with Mean and Standard Deviation')
    plt.xlabel('Timestep')
    plt.ylabel('Wealth')
    plt.legend(title='Agent Type')
    plt.grid(True)
    plt.show()

else:
    print("No data available for this configuration ID.")

In [None]:
# Assuming 'age_distribution' is structured with columns ['Timestep', 'Age', 'Count', 'Type']
age_data = data_collector.data[config_id_to_check]['age_distribution']

if not age_data.empty and age_data['Count'].sum() > 0:
    # Ensure proper data types
    age_data['Age'] = pd.to_numeric(age_data['Age'], errors='coerce')
    age_data['Count'] = pd.to_numeric(age_data['Count'], errors='coerce')
    age_data.dropna(subset=['Age', 'Count'], inplace=True)  # Drop rows where 'Age' or 'Count' could not be converted

    # Calculate the weighted age and group by Timestep and Type
    age_data['Weighted_Age'] = age_data['Age'] * age_data['Count']
    grouped = age_data.groupby(['Timestep', 'Type']).agg(
        Total_Count=('Count', 'sum'),
        Sum_Weighted_Age=('Weighted_Age', 'sum')
    )

    # Calculate the average age for each agent type
    grouped['Average_Age'] = grouped['Sum_Weighted_Age'] / grouped['Total_Count']

    # Calculate variance for standard deviation
    age_data = age_data.merge(grouped[['Average_Age']], on=['Timestep', 'Type'])
    age_data['Variance_Term'] = age_data['Count'] * (age_data['Age'] - age_data['Average_Age'])**2
    variance_grouped = age_data.groupby(['Timestep', 'Type'])['Variance_Term'].sum() / grouped['Total_Count']
    grouped['Std_Dev_Age'] = variance_grouped.apply(lambda x: np.sqrt(x) if x >= 0 else 0)

    # Reset index for plotting
    final_data = grouped.reset_index()

    # Plotting with standard deviation
    plt.figure(figsize=(10, 6))
    sns.lineplot(data=final_data, x='Timestep', y='Average_Age', hue='Type', marker='o', style='Type')
    for _, row in final_data.iterrows():
        plt.fill_betweenx([row['Average_Age'] - row['Std_Dev_Age'], row['Average_Age'] + row['Std_Dev_Age']], row['Timestep'] - 0.2, row['Timestep'] + 0.2, color='blue', alpha=0.2)
    plt.title('Average Age Over Time with Standard Deviation by Agent Type')
    plt.xlabel('Timestep')
    plt.ylabel('Average Age')
    plt.legend(title='Agent Type')
    plt.grid(True)
    plt.show()
else:
    print("No age data available for plotting.")

In [None]:
# Assuming 'infection_data' is structured with columns ['Timestep', 'Infections', 'Recoveries', 'Type']
infection_data = data_collector.data[config_id_to_check]['infection_data']

if not infection_data.empty and (infection_data['Infections'].sum() > 0 or infection_data['Recoveries'].sum() > 0):
    plt.figure(figsize=(10, 6))
    # Plotting the data for infections and recoveries separately by agent type
    sns.lineplot(data=infection_data, x='Timestep', y='Infections', hue='Type', style='Type', markers=True, dashes=False, palette='viridis')
    sns.lineplot(data=infection_data, x='Timestep', y='Recoveries', hue='Type', style='Type', markers=True, dashes=False, palette='viridis')
    plt.title('Infection and Recovery Over Time by Agent Type')
    plt.xlabel('Timestep')
    plt.ylabel('Count')
    plt.legend(title='Type')
    plt.grid(True)
    plt.show()
else:
    print("No infection data available for plotting.")

In [None]:
# Assuming you've chosen a specific configuration ID and have data available
wealth_data = data_collector.data[config_id_to_check]['wealth_data']

# Check if there's wealth data to plot
if not wealth_data.empty:
    # Define the steps you want to plot
    total_steps = wealth_data['Timestep'].max() + 1  # Assuming continuous steps from 0 to max
    step_skip = 31  # Define the number of steps to skip between plots
    steps_to_plot = range(0, total_steps, step_skip)
    steps_to_plot = list(steps_to_plot)[:9]  # Take only the first 9 steps to fit a 3x3 grid

    # Setting up the subplot grid
    fig, axes = plt.subplots(3, 3, figsize=(15, 12))
    axes = axes.flatten()  # Flatten the 2D array of axes to iterate easily

    # Plot histograms for each selected timestep, filtering by type
    for i, step in enumerate(steps_to_plot):
        # Filter the data for the current step and each type
        data_at_step = wealth_data[wealth_data['Timestep'] == step]
        sns.histplot(data=data_at_step, x='Wealth', hue='Type', ax=axes[i], kde=True, palette='viridis', bins=25, element='step', stat='density')
        axes[i].set_title(f'Wealth Distribution at Timestep {step}')
        axes[i].set_xlabel('Wealth')
        axes[i].set_ylabel('Density')

    # Adjust layout to prevent overlap
    plt.tight_layout()
    plt.show()
else:
    print("No wealth data available for plotting.")

In [None]:
# Assuming you've chosen a specific configuration ID and have data available
wealth_data = data_collector.data[config_id_to_check]['wealth_data']

# Check if there's wealth data to plot
if not wealth_data.empty:
    # Define the steps you want to plot
    total_steps = wealth_data['Timestep'].max() + 1  # Assuming continuous steps from 0 to max
    step_skip = 31  # Define the number of steps to skip between plots
    steps_to_plot = range(0, total_steps, step_skip)
    steps_to_plot = list(steps_to_plot)[:9]  # Take only the first 9 steps to fit a 3x3 grid

    # Setting up the subplot grid
    fig, axes = plt.subplots(3, 3, figsize=(15, 12))
    axes = axes.flatten()  # Flatten the 2D array of axes to iterate easily

    # Plot Lorenz curves and calculate Gini index for each selected timestep
    for i, step in enumerate(steps_to_plot):
        # Filter the data for the current step
        data_at_step = wealth_data[wealth_data['Timestep'] == step]
        if data_at_step.empty:
            axes[i].text(0.5, 0.5, 'No data available', horizontalalignment='center', verticalalignment='center', transform=axes[i].transAxes)
            axes[i].set_title(f'Timestep {step}')
            continue

        for type in data_at_step['Type'].unique():
            # Filter data by type
            type_data = data_at_step[data_at_step['Type'] == type]['Wealth']

            # Sort data and normalize
            sorted_wealth = np.sort(type_data)
            cumulative_wealth = np.cumsum(sorted_wealth)
            total_wealth = cumulative_wealth[-1]
            normalized_wealth = cumulative_wealth / total_wealth

            # Create Lorenz curve points
            x = np.linspace(0, 1, len(sorted_wealth) + 1)  # Include zero point
            y = np.concatenate([[0], normalized_wealth])  # Include the zero point to start the curve

            # Plot the curve
            axes[i].plot(x, y, label=f'Lorenz Curve - {type}')
            axes[i].plot([0, 1], [0, 1], 'k--', label='Line of Equality')

            # Calculate and annotate Gini index
            gini_index = 1 - 2 * np.trapz(y, x)
            axes[i].set_title(f'Wealth Distribution at Timestep {step}\nGini Index for {type}: {gini_index:.2f}')

        axes[i].set_xlabel('Fraction of Population')
        axes[i].set_ylabel('Fraction of Wealth')
        axes[i].legend()

    # Adjust layout to prevent overlap
    plt.tight_layout()
    plt.show()
else:
    print("No wealth data available for plotting.")

In [None]:
def calculate_gini(arr):
    """Calculate the Gini coefficient of a numpy array."""
    # Ensure array is sorted.
    arr = np.sort(arr)
    n = arr.size
    cumulative_sum = np.cumsum(arr)
    # Compute the Lorenz curve using the cumulative sum.
    lorenz_curve = cumulative_sum / cumulative_sum[-1]
    # Linear space to represent population share
    uniform_distribution = np.linspace(0, 1, n+1)
    # Gini index calculation using trapezoidal rule
    area_under_lorenz = np.trapz(lorenz_curve, uniform_distribution[1:])
    area_under_uniform = 0.5
    gini_index = (area_under_uniform - area_under_lorenz) / area_under_uniform
    return gini_index

# Assuming 'wealth_data' is a DataFrame with columns ['Timestep', 'Wealth', 'Type']
# Check if there's wealth data for the initial timestep and calculate Gini indices for each type
initial_wealth = wealth_data[wealth_data['Timestep'] == 0]

if not initial_wealth.empty:
    for agent_type in initial_wealth['Type'].unique():
        type_specific_wealth = initial_wealth[initial_wealth['Type'] == agent_type]['Wealth']
        gini_index = calculate_gini(type_specific_wealth.values)
        print(f"Gini Index at Step 0 for {agent_type} agents:", gini_index)
else:
    print("No wealth data available for Gini calculation.")

In [None]:
np.random.seed(42)

environment = Environment(size=(50, 50), max_capacity=6, enable_seasons=False, tau=0.2, d=0)

eco_crisis = EcoCrisis(severity=0, duration=0, frequency=0)
disease = Disease(severity=0, duration=0, frequency=0)

environment.trigger_crisis(eco_crisis)
environment.trigger_disease(disease)

occupied_locations = set()
for _ in range(400):
    while True:
        location = (np.random.randint(0, 50), np.random.randint(0, 50))
        if location not in occupied_locations:
            occupied_locations.add(location)
            break
    vision = np.random.randint(1, 7)
    metabolism = np.random.randint(1, 5)
    sugar = np.random.randint(5, 26)
    age = np.random.randint(60, 101)
    agent = Agent(location=location, sugar=sugar, metabolism=metabolism, vision=vision, max_age=age)
    environment.add_agent(agent)

environment.capture_agents_initial_data()

# Run the simulation for 250 steps
for step in range(250):
    environment.step()

    # Print the agent counts from agent_history and calculate average traits
    agent_history = environment.agents_history[-1]
    total_count = len(agent_history)
    altruistic_count = len([agent for agent in agent_history if agent[1] == 'altruistic'])
    selfish_count = len([agent for agent in agent_history if agent[1] == 'selfish'])

    # Calculate average traits for all agents
    if total_count > 0:
        avg_sugar = sum(agent[2] for agent in agent_history) / total_count
        avg_metabolism = sum(agent[3] for agent in agent_history) / total_count
        avg_vision = sum(agent[4] for agent in agent_history) / total_count
        avg_age = sum(agent[5] for agent in agent_history) / total_count
    else:
        avg_sugar = avg_metabolism = avg_vision = avg_age = 0

    print(f"Step {step}: Total agents: {total_count}, Altruistic agents: {altruistic_count}, Selfish agents: {selfish_count}")
    print(f"Step {step}: Avg sugar: {avg_sugar:.2f}, Avg metabolism: {avg_metabolism:.2f}, Avg vision: {avg_vision:.2f}, Avg age: {avg_age:.2f}")

visualization = Visualization(environment)
anim = visualization.animate_simulation(interval=200)
HTML(anim.to_jshtml())