In [1]:
import mesa
import numpy as np
from mesa.discrete_space import OrthogonalMooreGrid, CellAgent
from mesa.visualization import SolaraViz, make_plot_component, make_space_component
import solara
from mesa.visualization import make_plot_component
import matplotlib.pyplot as plt


In [2]:
# some data collecting functions
def count_agents_in_bar(model):
    """Count the number of agents in the bar zone."""
    count = 0
    for cell in model.grid.all_cells.cells:
        if model.zones.get(cell.coordinate) == "bar":
            count += len(cell.agents)
    return count

# Set agent up
class BarGoingAgent(CellAgent):
    """An agent that can move around the grid."""
    
    def __init__(self, model, cell, memory_size=1, number_of_strategies=1, overcrowding_threshold=20):
        """Initialize the agent."""
        super().__init__(model)
        self.cell = cell
        self.memory_size = memory_size
        self.number_of_strategies = number_of_strategies
        self.overcrowding_threshold = overcrowding_threshold

        # Add further agent attributes
        self.strategies = []
        self.best_strategy = None
        self.attend = False     # plans to attend the bar?
        self.prediction = 0     # current prediction of the bar attendance

        # generate a random strategy
        self.generate_strategies()

        # we set the first strategy as the best one
        self.best_strategy = 0
    
    def generate_strategies(self):
        """Generate random strategies for this agent (equivalent to NetLogo's random-strategy)."""
        # Each strategy is a list of weights from -1.0 to 1.0
        # The first weight is a constant, the rest are weights for previous time periods
        self.strategies = []
        for _ in range(self.number_of_strategies):
            strategy = []
            # Generate memory_size + 1 weights (constant + memory weights)
            for _ in range(self.memory_size + 1):
                weight = 1.0 - self.model.random.random() * 2.0  # Random float from -1.0 to 1.0
                strategy.append(weight)
            self.strategies.append(strategy)

    def predict_attendance(self, strategy_index, history_subset):
        """
        Predict attendance using a specific strategy and history subset.
        
        Formula: p(t) = x(t-1)*a(t-1) + x(t-2)*a(t-2) + ... + x(t-memory)*a(t-memory) + c*100
        where:
        - p(t) is prediction at time t
        - x(t) is attendance at time t  
        - a(t) is weight for time t
        - c is constant (first element of strategy)
        """
        if strategy_index >= len(self.strategies):
            return 0
            
        strategy = self.strategies[strategy_index]
        
        # Step 3a: Start with constant term (first element * 100)
        prediction = 100 * strategy[0]
        
        # Step 3b: Add weighted sum of historical data
        for i, attendance in enumerate(history_subset):
            if i + 1 < len(strategy):  # Make sure we have a weight for this history point
                prediction += strategy[i + 1] * attendance
                
        return prediction

    def make_decision(self):
        """
        Decide whether to attend bar based on prediction.
        """
        # Step: Get recent history for prediction
        history = self.model.history
        if len(history) >= self.memory_size:
            recent_history = history[:self.memory_size]
        else:
            recent_history = history + [0] * (self.memory_size - len(history))
        
        # Step: Predict attendance using best strategy
        self.prediction = self.predict_attendance(self.best_strategy, recent_history)
        
        # Step: Decide to attend if prediction is below threshold
        self.attend = (self.prediction <= self.overcrowding_threshold)

    def get_zone(self):
        """Get the zone this agent is currently in."""
        return self.model.zones.get(self.cell.coordinate, "unknown")
    
    def move_to_zone(self, zonetogo):
        """Move the agent to a random cell in the specified zone."""
        # Find all cells that belong to the target zone
        zone_cells = []
        for cell in self.model.grid.all_cells.cells:
            zone_name = self.model.zones.get(cell.coordinate, "unknown")
            if zone_name == zonetogo:
                zone_cells.append(cell)
        
        if not zone_cells:
            return False
        
        # Try to find empty cells first
        empty_cells = [cell for cell in zone_cells if len(cell.agents) == 0]
        
        if empty_cells:
            new_cell = self.model.random.choice(empty_cells)
            self.cell = new_cell
            return True
        else:
            # If no empty cells, move to least crowded
            least_crowded = min(zone_cells, key=lambda cell: len(cell.agents))
            self.cell = least_crowded
            return True
    
    def update_strategies(self):
        """
        Update which strategy is currently best.
        Choose strategy with smallest sum of prediction errors over memory period.
        """
        history = self.model.history
        if len(history) < self.memory_size + 1:
            return  # Not enough history yet
        
        best_score = float('inf')  # Start with maximum possible score
        
        # STest each strategy
        for strategy_idx in range(len(self.strategies)):
            score = 0
            
            # Step 4b: Test strategy performance over memory period
            for week in range(1, self.memory_size + 1):
                # Get history subset for this week
                if week + self.memory_size <= len(history):
                    history_subset = history[week:week + self.memory_size]
                    # Predict what attendance would have been
                    prediction = self.predict_attendance(strategy_idx, history_subset)
                    # Compare with actual attendance and add to score
                    actual_attendance = history[week - 1]
                    score += abs(actual_attendance - prediction)
            
            # Step 4c: Update best strategy if this one performed better
            if score <= best_score:
                best_score = score
                self.best_strategy = strategy_idx
    
    def step(self):
        """Agent step - each agent's individual actions."""
        # Step 1: Update strategies based on past performance
        self.update_strategies()
        
        # Step 2: Make attendance decision based on prediction
        self.make_decision()
        
        # Step 3: Move to appropriate zone based on decision
        if self.attend:
            self.move_to_zone("bar")
        else:
            self.move_to_zone("home")


In [3]:
class ElfarolModel(mesa.Model):
    """A model with a grid of cells and agents that can occupy them."""

    def __init__(self, n=100, width=20, height=20, seed=None, memory_size=5, 
                 number_of_strategies=10, overcrowding_threshold=60):
        """Initialize the model instance"""
        super().__init__(seed=seed)
        self.num_agents = n
        self.width = width
        self.height = height
        self.memory_size = memory_size
        self.number_of_strategies = number_of_strategies
        self.overcrowding_threshold = overcrowding_threshold
        self.grid = OrthogonalMooreGrid((width, height), torus=False, random=self.random)

        # Initialize history with random values like NetLogo does
        history_length = self.memory_size * 2  # Twice memory size for enough data
        self.history = [self.random.randint(0, 99) for _ in range(history_length)]
        self.attendance = 0  # Current attendance
        self.is_crowded = False

        # Create zone mapping for tracking
        self.zones = {}
        for cell in self.grid.all_cells.cells:
            x, y = cell.coordinate
            # Define bar area (top-right quadrant)
            if x >= width//2 and y >= height//2:
                self.zones[(x, y)] = "bar"
            else:
                self.zones[(x, y)] = "home"

        # Create bar-going agents
        BarGoingAgent.create_agents(
            self,
            self.num_agents, 
            self.random.choices(self.grid.all_cells.cells, k=self.num_agents),
            memory_size=memory_size,
            number_of_strategies=number_of_strategies,
            overcrowding_threshold=overcrowding_threshold
        )

        # Data collection setup - EXACTLY like MoneyModel
        self.datacollector = mesa.DataCollector(
            model_reporters={
                "agents_in_bar": count_agents_in_bar,
                "attendance": lambda m: m.attendance,
                "is_crowded": lambda m: m.is_crowded
            },
            agent_reporters={
                "zone": "get_zone",
                "prediction": "prediction",
                "attend": "attend"
            }
        )
        self.datacollector.collect(self)

    def step(self):
        """Model step - coordinates all agents."""
        # Reset crowded status
        self.is_crowded = False
        
        # Each agent does their own step (update strategies, decide, move)
        self.agents.shuffle_do("step")
        
        # Count actual attendance
        self.attendance = count_agents_in_bar(self)
        
        # Check if crowded
        if self.attendance > self.overcrowding_threshold:
            self.is_crowded = True
        
        # Update history
        self.history = [self.attendance] + self.history[:-1]
        
        # Collect data
        self.datacollector.collect(self)

In [4]:
@solara.component
def CustomSpaceVisualization(model):
    """Custom space visualization with colored background zones"""
    # This is required to update the visualization when the model steps
    from mesa.visualization.components.matplotlib_components import update_counter
    update_counter.get()  # This triggers re-rendering on model updates
    
    # Note: you must initialize a figure using this method instead of
    # plt.figure(), for thread safety purpose
    fig = plt.Figure(figsize=(8, 8))
    ax = fig.subplots()
    
    # Draw background zones using patches
    from matplotlib.patches import Rectangle
    
    width, height = model.width, model.height
    
    # Draw home area (light blue background)
    home_patch = Rectangle((0, 0), width, height, 
                          linewidth=0, facecolor='lightblue', alpha=0.3)
    ax.add_patch(home_patch)
    
    # Draw bar area (light coral background) - top-right quadrant
    bar_patch = Rectangle((width//2, height//2), width//2, height//2,
                         linewidth=0, facecolor='lightcoral', alpha=0.2)
    ax.add_patch(bar_patch)

    # load image for the background of the bar patch
    img = plt.imread("barbackground.png")
    ax.imshow(img, extent=[width//2, width, height//2, height], zorder=1, alpha=0.5, aspect='auto')
    # plot a label on the bar patch if crowded
    if model.is_crowded:
        ax.text(width * 0.75, height * 0.75, "Crowded", fontsize=20, color='black', ha='center', va='center', zorder=2)
    # Draw agents on top
    bar_agents = model.agents_by_type[BarGoingAgent]
    
    for agent in bar_agents:
        x, y = agent.cell.coordinate
        zone = agent.get_zone()
        
        if zone == "bar":
            color = "red"
            size = 100
        else:
            color = "blue"
            size = 80
            
        # Use matplotlib's OOP API instead of plt.scatter for thread safety
        ax.scatter(x + 0.5, y + 0.5, c=color, s=size, zorder=10, alpha=0.8)
    
    # Set up the plot
    ax.set_xlim(0, width)
    ax.set_ylim(0, height)
    ax.set_aspect('equal')
    ax.grid(True, alpha=0.3)
    ax.set_title('El Farol Model - Red Area=Bar, Blue Area=Home')
    ax.set_xlabel('X coordinate')
    ax.set_ylabel('Y coordinate')
    
    # This is required to render the visualization
    solara.FigureMatplotlib(fig)

# add sliders
model_params = {
    "memory_size": {
        "type": "SliderInt",
        "value": 5,
        "label": "Memory size:",
        "min": 1,
        "max": 10,
        "step": 1,
    },
    "number_of_strategies": {
        "type": "SliderInt",
        "value": 10,
        "label": "Number of strategies:",
        "min": 1,
        "max": 20,
        "step": 1,
    },
    "overcrowding_threshold": {
        "type": "SliderInt",
        "value": 60,
        "label": "Overcrowding threshold:",
        "min": 10,
        "max": 100,
        "step": 1,
    },
    "width": 10, # Width of the grid
    "height": 10, # Height of the grid

}

In [5]:
model = ElfarolModel(width=20, height=20)

# Create visualization with custom component
page = SolaraViz(
    model,
    components=[CustomSpaceVisualization, make_plot_component("agents_in_bar")], # source code for plot components https://mesa.readthedocs.io/latest/_modules/mesa/visualization/components/matplotlib_components.html
    name="El Farol Model",
    model_params=model_params
)

# To run the visualization, uncomment the next line:
page