In [1]:
import mesa
import mesa.space as space
import mesa.time as time
import random
import numpy as np
from collections import defaultdict
import pandas as pd

In [2]:
# For Gambit game theory analysis (you'll need to install gambit-python)
try:
    import gambit
    GAMBIT_AVAILABLE = True
except ImportError:
    GAMBIT_AVAILABLE = False
    print("Gambit not available. Install for advanced game theory analysis.")

Gambit not available. Install for advanced game theory analysis.


In [3]:
class DualProcessAgent(mesa.Agent):
    """
    An agent with dual-process decision-making capabilities.
    
    This agent models the dual-process theory of cognition:
    - System 1: Fast, intuitive, emotional thinking (veil of ignorance)
    - System 2: Slow, deliberative, analytical thinking
    
    Under the veil of ignorance, agents don't know their future position
    in society and make decisions from an impartial standpoint.
    """
    
    def __init__(self, unique_id, model, system1_bias=0.7, 
                 fairness_preference=0.5, learning_rate=0.1):
        """
        Initialize a dual-process agent.
        
        Parameters:
        -----------
        unique_id : int
            Unique identifier for the agent
        model : mesa.Model
            The model instance the agent belongs to
        system1_bias : float (0-1)
            Probability of using System 1 thinking vs System 2
        fairness_preference : float (0-1)
            Strength of fairness preference (1 = max fairness)
        learning_rate : float (0-1)
            How quickly the agent updates its strategies
        """
        super().__init__(unique_id, model)
        self.system1_bias = system1_bias
        self.fairness_preference = fairness_preference
        self.learning_rate = learning_rate
        self.payoff_history = []
        self.cooperation_history = []
        self.system_used = []  # Track which system was used
        
        # System 2 deliberation parameters
        self.deliberation_level = 0.0
        self.analytical_capacity = random.uniform(0.5, 1.0)
        
    def step(self):
        """Advance the agent by one step in the simulation."""
        # Update deliberation level based on context
        self.update_deliberation()
        
        # Make decisions based on current game
        if self.model.game_type == "prisoners_dilemma":
            self.play_prisoners_dilemma()
        elif self.model.game_type == "resource_allocation":
            self.play_resource_allocation()
    
    def update_deliberation(self):
        """Update the agent's level of deliberative thinking."""
        # Stress or time pressure reduces deliberation
        stress_factor = 1.0 - (self.model.schedule.steps / self.model.max_steps)
        self.deliberation_level = (1 - self.system1_bias) * self.analytical_capacity * stress_factor
    
    def make_decision_under_veil(self, context):
        """
        Make a decision under the veil of ignorance.
        
        The veil of ignorance forces agents to consider decisions
        without knowing their own position, promoting impartiality.
        
        Parameters:
        -----------
        context : dict
            Contextual information about the decision
            
        Returns:
        --------
        decision : object
            The agent's decision
        """
        if random.random() < self.deliberation_level:
            # Use System 2: Deliberative thinking
            decision = self.system2_deliberative_decision(context)
            self.system_used.append(2)
        else:
            # Use System 1: Intuitive thinking with veil of ignorance
            decision = self.system1_veil_decision(context)
            self.system_used.append(1)
        
        return decision
    
    def system1_veil_decision(self, context):
        """
        System 1 decision-making under veil of ignorance.
        
        Fast, intuitive thinking that considers fairness from an
        impartial perspective.
        """
        # Under veil of ignorance, consider all possible positions
        if context.get('game_type') == 'prisoners_dilemma':
            # More likely to cooperate under veil of ignorance
            cooperation_prob = 0.3 + (0.7 * self.fairness_preference)
            return random.random() < cooperation_prob
        
        elif context.get('game_type') == 'resource_allocation':
            # Fair distribution under veil
            fair_share = context.get('total_resources', 100) / context.get('num_agents', 10)
            base_demand = fair_share * (0.8 + 0.4 * self.fairness_preference)
            # Add some noise
            return max(0, base_demand * random.uniform(0.8, 1.2))
    
    def system2_deliberative_decision(self, context):
        """
        System 2 deliberative decision-making.
        
        Slow, analytical thinking that calculates expected payoffs
        and strategic considerations.
        """
        if context.get('game_type') == 'prisoners_dilemma':
            # Calculate expected payoff of cooperation vs defection
            coop_payoff = self.calculate_expected_payoff(cooperate=True, context=context)
            defect_payoff = self.calculate_expected_payoff(cooperate=False, context=context)
            
            # Include fairness consideration
            fairness_bonus = 0.5 * self.fairness_preference
            return (coop_payoff + fairness_bonus) > defect_payoff
        
        elif context.get('game_type') == 'resource_allocation':
            # Strategic resource demand calculation
            total_resources = context.get('total_resources', 100)
            num_agents = context.get('num_agents', 10)
            others_demand = context.get('others_demand_estimate', total_resources/num_agents)
            
            # Calculate optimal demand considering others' behavior
            optimal = min(total_resources - (num_agents - 1) * others_demand, 
                         total_resources * 0.3)  # Don't be too greedy
            
            # Adjust for fairness
            fair_adjustment = self.fairness_preference * (total_resources/num_agents - optimal)
            return max(0, optimal + fair_adjustment * 0.5)
    
    def calculate_expected_payoff(self, cooperate, context):
        """
        Calculate expected payoff for a decision.
        
        Parameters:
        -----------
        cooperate : bool
            Whether the agent cooperates
        context : dict
            Decision context
            
        Returns:
        --------
        expected_payoff : float
            Expected payoff value
        """
        if context.get('game_type') == 'prisoners_dilemma':
            # Prisoner's dilemma payoff calculation
            if cooperate:
                # If both cooperate: R, if opponent defects: S
                prob_coop = context.get('expected_cooperation_rate', 0.5)
                return prob_coop * 3 + (1 - prob_coop) * 0
            else:
                # If opponent cooperates: T, if opponent defects: P
                prob_coop = context.get('expected_cooperation_rate', 0.5)
                return prob_coop * 5 + (1 - prob_coop) * 1
        
        return 0
    
    def play_prisoners_dilemma(self):
        """Play a prisoner's dilemma game with another agent."""
        partner = random.choice([a for a in self.model.schedule.agents if a != self])
        
        context = {
            'game_type': 'prisoners_dilemma',
            'partner_id': partner.unique_id,
            'expected_cooperation_rate': self.model.get_cooperation_rate()
        }
        
        # Make decision under veil of ignorance
        my_decision = self.make_decision_under_veil(context)
        partner_decision = partner.make_decision_under_veil(context)
        
        # Calculate payoffs
        payoff = self.calculate_pd_payoff(my_decision, partner_decision)
        self.payoff_history.append(payoff)
        self.cooperation_history.append(1 if my_decision else 0)
        
        # Update model metrics
        self.model.cooperation_count += 1 if my_decision else 0
        self.model.defection_count += 0 if my_decision else 1
    
    def play_resource_allocation(self):
        """Play a resource allocation game."""
        context = {
            'game_type': 'resource_allocation',
            'total_resources': self.model.total_resources,
            'num_agents': self.model.num_agents,
            'others_demand_estimate': self.model.get_average_demand()
        }
        
        # Make decision under veil of ignorance
        demand = self.make_decision_under_veil(context)
        
        # Store demand for model-level calculation
        self.current_demand = demand
        self.model.current_demands.append(demand)
    
    def calculate_pd_payoff(self, my_choice, partner_choice):
        """
        Calculate prisoner's dilemma payoff.
        
        Standard PD payoffs:
        - Both cooperate: 3,3
        - Both defect: 1,1  
        - I defect, partner cooperates: 5,0
        - I cooperate, partner defects: 0,5
        """
        if my_choice and partner_choice:  # Both cooperate
            return 3
        elif not my_choice and not partner_choice:  # Both defect
            return 1
        elif not my_choice and partner_choice:  # I defect, partner cooperates
            return 5
        else:  # I cooperate, partner defects
            return 0


class DualProcessModel(mesa.Model):
    """
    Model for simulating dual-process decision-making under veil of ignorance.
    
    This model explores how different cognitive processes affect
    fairness, cooperation, and welfare in society.
    """
    
    def __init__(self, num_agents=50, width=20, height=20, 
                 game_type="prisoners_dilemma", max_steps=1000,
                 system1_distribution="uniform"):
        """
        Initialize the model.
        
        Parameters:
        -----------
        num_agents : int
            Number of agents in the model
        width, height : int
            Dimensions of the space (not used in all games)
        game_type : str
            Type of game being played ("prisoners_dilemma" or "resource_allocation")
        max_steps : int
            Maximum number of steps to run
        system1_distribution : str
            How System 1 bias is distributed ("uniform", "skewed", "bimodal")
        """
        super().__init__()
        self.num_agents = num_agents
        self.grid = space.MultiGrid(width, height, True)
        self.schedule = time.RandomActivation(self)
        self.game_type = game_type
        self.max_steps = max_steps
        self.current_step = 0
        
        # Game-specific parameters
        if game_type == "resource_allocation":
            self.total_resources = 1000
            self.current_demands = []
        
        # Metrics tracking
        self.cooperation_count = 0
        self.defection_count = 0
        self.total_payoffs = 0
        self.fairness_scores = []
        self.welfare_scores = []
        self.equality_scores = []
        
        # Create agents with specified distribution of System 1 bias
        self.create_agents(system1_distribution)
        
        # Data collection
        self.datacollector = mesa.DataCollector(
            model_reporters={
                "Cooperation_Rate": "cooperation_rate",
                "Average_Payoff": "average_payoff", 
                "Gini_Coefficient": "gini_coefficient",
                "Total_Welfare": "total_welfare",
                "Fairness_Score": "fairness_score",
                "System1_Usage_Rate": "system1_usage_rate"
            },
            agent_reporters={
                "Payoff": "payoff_history",
                "Cooperation": "cooperation_history",
                "System_Used": "system_used",
                "Fairness_Preference": "fairness_preference",
                "System1_Bias": "system1_bias"
            }
        )
    
    def create_agents(self, distribution):
        """Create agents with specified distribution of cognitive traits."""
        for i in range(self.num_agents):
            # Determine System 1 bias based on distribution
            if distribution == "uniform":
                system1_bias = random.uniform(0.1, 0.9)
            elif distribution == "skewed":
                # Most agents have high System 1 bias
                system1_bias = min(0.9, max(0.1, random.betavariate(2, 5)))
            elif distribution == "bimodal":
                # Polarized population
                system1_bias = random.uniform(0.1, 0.4) if random.random() < 0.5 else random.uniform(0.6, 0.9)
            else:
                system1_bias = random.uniform(0.1, 0.9)
            
            # Create agent
            agent = DualProcessAgent(
                unique_id=i,
                model=self,
                system1_bias=system1_bias,
                fairness_preference=random.uniform(0.2, 0.8),
                learning_rate=random.uniform(0.05, 0.2)
            )
            
            self.schedule.add(agent)
            
            # Place agent on grid
            x = random.randrange(self.grid.width)
            y = random.randrange(self.grid.height)
            self.grid.place_agent(agent, (x, y))
    
    def step(self):
        """Advance the model by one step."""
        # Reset game-specific trackers
        if self.game_type == "resource_allocation":
            self.current_demands = []
        
        # Advance all agents
        self.schedule.step()
        
        # Calculate resource allocation if applicable
        if self.game_type == "resource_allocation":
            self.calculate_resource_allocation()
        
        # Calculate metrics
        self.calculate_metrics()
        
        # Collect data
        self.datacollector.collect(self)
        
        self.current_step += 1
    
    def calculate_resource_allocation(self):
        """Calculate resource allocation based on demands."""
        total_demand = sum(self.current_demands)
        
        if total_demand <= self.total_resources:
            # All demands can be satisfied
            for i, agent in enumerate(self.schedule.agents):
                payoff = min(agent.current_demand, self.total_resources * (agent.current_demand / total_demand))
                agent.payoff_history.append(payoff)
                self.total_payoffs += payoff
        else:
            # Demands exceed resources - proportional reduction
            for i, agent in enumerate(self.schedule.agents):
                payoff = self.total_resources * (agent.current_demand / total_demand)
                agent.payoff_history.append(payoff)
                self.total_payoffs += payoff
    
    def calculate_metrics(self):
        """Calculate various social metrics."""
        # Cooperation rate (for PD)
        if self.game_type == "prisoners_dilemma":
            total_interactions = self.cooperation_count + self.defection_count
            self.cooperation_rate = self.cooperation_count / total_interactions if total_interactions > 0 else 0
        else:
            self.cooperation_rate = 0
        
        # Average payoff
        all_payoffs = []
        for agent in self.schedule.agents:
            if agent.payoff_history:
                all_payoffs.append(agent.payoff_history[-1])
        
        self.average_payoff = np.mean(all_payoffs) if all_payoffs else 0
        
        # Gini coefficient (inequality measure)
        self.gini_coefficient = self.calculate_gini(all_payoffs) if all_payoffs else 0
        
        # Total welfare
        self.total_welfare = sum(all_payoffs) if all_payoffs else 0
        
        # Fairness score (based on Rawls' difference principle)
        self.fairness_score = min(all_payoffs) / max(all_payoffs) if all_payoffs and max(all_payoffs) > 0 else 0
        
        # System 1 usage rate
        system_uses = []
        for agent in self.schedule.agents:
            if agent.system_used:
                system_uses.append(agent.system_used[-1])
        self.system1_usage_rate = sum(1 for x in system_uses if x == 1) / len(system_uses) if system_uses else 0
    
    def calculate_gini(self, values):
        """Calculate Gini coefficient for inequality measurement."""
        if len(values) == 0:
            return 0
        
        values = sorted(values)
        n = len(values)
        index = np.arange(1, n + 1)
        return (np.sum((2 * index - n - 1) * values)) / (n * np.sum(values))
    
    def get_cooperation_rate(self):
        """Get current cooperation rate in the population."""
        return self.cooperation_rate if hasattr(self, 'cooperation_rate') else 0.5
    
    def get_average_demand(self):
        """Get average resource demand."""
        if hasattr(self, 'current_demands') and self.current_demands:
            return np.mean(self.current_demands)
        return self.total_resources / self.num_agents
    
    def run_model(self, n_steps=None):
        """Run the model for a specified number of steps."""
        if n_steps is None:
            n_steps = self.max_steps
        
        for _ in range(n_steps):
            self.step()


class GameTheoryAnalyzer:
    """
    Analyzer for game-theoretic properties using Gambit.
    
    This class provides tools to analyze the emergent game-theoretic
    properties of the agent-based model.
    """
    
    def __init__(self):
        """Initialize the game theory analyzer."""
        if not GAMBIT_AVAILABLE:
            print("Warning: Gambit not available. Game theory analysis limited.")
    
    def analyze_nash_equilibrium(self, payoffs):
        """
        Analyze Nash equilibria of the emergent game.
        
        Parameters:
        -----------
        payoffs : array-like
            Payoff matrix from the simulation
            
        Returns:
        --------
        equilibria : list
            List of Nash equilibria
        """
        if not GAMBIT_AVAILABLE:
            return ["Gambit required for Nash equilibrium analysis"]
        
        try:
            # Create a strategic game
            game = gambit.Game.new_table([2, 2])  # 2x2 game for demonstration
            
            # Set payoffs (example for prisoner's dilemma)
            game[0, 0][0] = 3  # Both cooperate
            game[0, 0][1] = 3
            game[0, 1][0] = 0  # I cooperate, opponent defects  
            game[0, 1][1] = 5
            game[1, 0][0] = 5  # I defect, opponent cooperates
            game[1, 0][1] = 0
            game[1, 1][0] = 1  # Both defect
            game[1, 1][1] = 1
            
            # Solve for Nash equilibria
            solver = gambit.nash.ExternalEnumMixedSolver()
            equilibria = solver.solve(game)
            
            return equilibria
            
        except Exception as e:
            return [f"Analysis failed: {str(e)}"]
    
    def calculate_social_welfare(self, payoffs):
        """
        Calculate social welfare metrics.
        
        Parameters:
        -----------
        payoffs : list
            List of agent payoffs
            
        Returns:
        --------
        metrics : dict
            Dictionary of welfare metrics
        """
        if not payoffs:
            return {}
        
        payoffs = np.array(payoffs)
        
        metrics = {
            'utilitarian': np.sum(payoffs),  # Sum of utilities
            'rawlsian': np.min(payoffs),     # Welfare of worst-off
            'nash': np.prod(payoffs),        # Nash social welfare
            'egalitarian': np.std(payoffs)   # Equality measure (inverse)
        }
        
        return metrics


def run_experiment():
    """Run a complete experiment with different parameter settings."""
    print("Running Dual-Process Decision Making Experiment")
    print("=" * 50)
    
    results = []
    
    # Test different scenarios
    scenarios = [
        ("High System 1 Bias", "skewed", "prisoners_dilemma"),
        ("Balanced Cognition", "uniform", "prisoners_dilemma"), 
        ("Resource Allocation", "uniform", "resource_allocation"),
        ("Polarized Cognition", "bimodal", "prisoners_dilemma")
    ]
    
    for scenario_name, system1_dist, game_type in scenarios:
        print(f"\nRunning scenario: {scenario_name}")
        
        # Initialize and run model
        model = DualProcessModel(
            num_agents=50,
            game_type=game_type,
            system1_distribution=system1_dist,
            max_steps=500
        )
        model.run_model()
        
        # Collect results
        model_data = model.datacollector.get_model_vars_dataframe()
        agent_data = model.datacollector.get_agent_vars_dataframe()
        
        # Calculate summary statistics
        final_cooperation = model_data['Cooperation_Rate'].iloc[-1] if game_type == "prisoners_dilemma" else 0
        final_welfare = model_data['Total_Welfare'].iloc[-1]
        final_equality = 1 - model_data['Gini_Coefficient'].iloc[-1]  # Convert to equality measure
        final_fairness = model_data['Fairness_Score'].iloc[-1]
        
        scenario_result = {
            'scenario': scenario_name,
            'system1_distribution': system1_dist,
            'game_type': game_type,
            'cooperation_rate': final_cooperation,
            'total_welfare': final_welfare,
            'equality': final_equality,
            'fairness': final_fairness,
            'system1_usage': model_data['System1_Usage_Rate'].iloc[-1]
        }
        
        results.append(scenario_result)
        
        print(f"  Cooperation: {final_cooperation:.3f}")
        print(f"  Welfare: {final_welfare:.2f}")
        print(f"  Equality: {final_equality:.3f}")
        print(f"  Fairness: {final_fairness:.3f}")
        print(f"  System 1 Usage: {scenario_result['system1_usage']:.3f}")
    
    # Convert to DataFrame for analysis
    results_df = pd.DataFrame(results)
    
    print("\n" + "=" * 50)
    print("Experiment Complete")
    print("=" * 50)
    
    return results_df, model_data, agent_data

In [4]:
if __name__ == "__main__":
    # Run the experiment
    results_df, model_data, agent_data = run_experiment()
    
    # Display summary of results
    print("\nSummary Results:")
    print(results_df.to_string(index=False))
    
    # Save results to files
    # results_df.to_csv("dual_process_results.csv", index=False)
    # model_data.to_csv("model_timeseries.csv")
    # agent_data.to_csv("agent_data.csv")
    
    print("\nResults saved to CSV files:")
    print(" - dual_process_results.csv (summary)")
    print(" - model_timeseries.csv (model-level time series)")
    print(" - agent_data.csv (agent-level data)")

Running Dual-Process Decision Making Experiment

Running scenario: High System 1 Bias
  Cooperation: 0.472
  Welfare: 133.00
  Equality: 0.612
  Fairness: 0.000
  System 1 Usage: 1.000

Running scenario: Balanced Cognition
  Cooperation: 0.531
  Welfare: 142.00
  Equality: 0.654
  Fairness: 0.000
  System 1 Usage: 1.000

Running scenario: Resource Allocation
  Cooperation: 0.000
  Welfare: 1000.00
  Equality: 0.916
  Fairness: 0.578
  System 1 Usage: 1.000

Running scenario: Polarized Cognition
  Cooperation: 0.560
  Welfare: 137.00
  Equality: 0.623
  Fairness: 0.000
  System 1 Usage: 1.000

Experiment Complete

Summary Results:
           scenario system1_distribution           game_type  cooperation_rate  total_welfare  equality  fairness  system1_usage
 High System 1 Bias               skewed   prisoners_dilemma           0.47164          133.0  0.612481  0.000000            1.0
 Balanced Cognition              uniform   prisoners_dilemma           0.53124          142.0  0.653803 