
# Information Conditions - Environmental State and Action Histories

> Plot learning trajectories under different information conditions of the Ecological Public Goods Game. a) Only environmental state history observable, b) only action history is observable, c) both environmental state and action histories are observable d) No information


In [134]:
#| export
import numpy as np
import matplotlib.pyplot as plt
import copy
import plotly
import plotly.express
import plotly.graph_objects as go


from pyCRLD.Environments.SocialDilemma import SocialDilemma
from pyCRLD.Environments.EcologicalPublicGood import EcologicalPublicGood

from pyCRLD.Agents.StrategyActorCritic import stratAC
from pyCRLD.Agents.POStrategyActorCritic import POstratAC


from pyCRLD.Utils import FlowPlot as fp
from fastcore.utils import *
from jax import jit
import jax.numpy as jnp
from pyCRLD.Environments.HistoryEmbedding import HistoryEmbedded

from nbdev.showdoc import show_doc
from scipy.stats import kstest

from scipy.stats import qmc
import itertools as it
import pandas as pd

global_seed = 42
np.random.seed(global_seed)


In [4]:
#| export

#generate 
def generate_action_history_observation_set(stateset, number_of_agents):
    action_histories = [state[:3] for state in stateset]
    unique_action_histories = sorted(list(set(action_histories)))
    Oset = [unique_action_histories.copy() for _ in range(number_of_agents)]
    return Oset


def generate_state_observation_set(stateset, number_of_agents):
    state_histories = [state[4:] for state in stateset]
    unique_state_histories = sorted(list(set(state_histories)))
    Oset = [unique_state_histories.copy() for _ in range(number_of_agents)]
    return Oset



In [5]:
#| export

#information conditions class
class Information_Conditions(HistoryEmbedded):
    def __init__(self, ecopg , mode):


        super().__init__(ecopg, h=(1, 1, 1))

        self.mode = mode
        self.configure_information_condition()

    def configure_information_condition(self):
        """
        Set the observation mode and configure the observation tensor, Oset, and other properties.
        Modes: 'state', 'action', 'none', 'state+action'
        """
        if self.mode == "only_state_information":
            self._configure_state()
        elif self.mode == "only_action_history_information":
            self._configure_action()
        elif self.mode == "no_information":
            self._configure_none()
        elif self.mode == "both_state_and_action_information":
            self._configure_state_action()
        else:
            raise ValueError("Invalid mode..")
        # self._print_configuration()

        self.Q = self.O.shape[2]

    def _configure_state(self):
        def generate_state_tensor(state_set, observation_set):
            state_tensor = np.zeros((2, len(state_set), len(observation_set)), dtype=int)
            for i in range(2):
                for j, state in enumerate(state_set):
                    for k, observation in enumerate(observation_set):
                        if state.endswith(observation):
                            state_tensor[i, j, k] = 1
            return state_tensor
        
        
        self.Oset = generate_state_observation_set(self.Sset, 2)

        self.O = generate_state_tensor(
            self.Sset,  self.Oset[0])
        

    def _configure_action(self):
        def generate_action_tensor(state_set, action_set):
            action_tensor = np.zeros((2, len(state_set), len(action_set)), dtype=int)
            for i in range(2):  
                for j, state in enumerate(state_set):
                    for k, action in enumerate(action_set):
                        if action[:3] == state[:3]:
                            action_tensor[i, j, k] = 1
            return action_tensor

        self.Oset = generate_action_history_observation_set(self.Sset, self.N)
        self.O = generate_action_tensor(self.Sset, self.Oset[0])

    def _configure_none(self):
        def generate_none_tensor():
            return np.ones((2, 8, 1), dtype=int)

        self.O = generate_none_tensor()
        self.Oset = [['.'], ['.']]

    def _configure_state_action(self):
        # This assumes the default state+action information in `ecopg_with_history`
        pass
          # No modification needed; default setup already uses state+action information.


    def _print_configuration(self):
        print(f"Mode: {self.mode}")
        # print("Observation Tensor:\n", self.O)
        # print("Observation Set:", self.Oset)
        # print("O shape", self.O.shape)
        # print("Q shape", self.Q)

        print("------\n")


### Experiments and Observations 

In [6]:
# np.set_printoptions(precision=4, suppress= True)


Complete state and action information - Monte Carlo analysis

In [7]:
def lhs_sampling(no_of_states, number_of_samples, agents):
    global global_seed
    sampler = qmc.LatinHypercube(d=no_of_states, seed = global_seed)

    # Sampling for each agent and stacking similar result lists
    lhs_random_samples_list = sampler.random(number_of_samples)
    result = [np.stack((random_samples, 1 - random_samples), axis=-1) for random_samples in lhs_random_samples_list]
    cross_product = [np.stack((x, y), axis=0) for x, y in it.combinations_with_replacement(result, agents)]

    return cross_product


In [8]:
def make_degraded_state_cooperation_probablity_zero(initial_condition, Oset):

    degraded_mask = jnp.array(['g' in label for label in Oset])
    initial_condition[:, degraded_mask, 0] = 0
    initial_condition[:, degraded_mask, 1] = 1

    return initial_condition

def make_degraded_state_cooperation_probablity_one(initial_condition, Oset):

    degraded_mask = jnp.array(['g' in label for label in Oset])
    initial_condition[:, degraded_mask, 0] = 1
    initial_condition[:, degraded_mask, 1] = 0

    return initial_condition



In [45]:
def exclude_degraded_states_from_obsdist(obsdist, Oset):
   
        # Exclude degraded states from the observation distribution

    degraded_mask = jnp.array(['g' in label for label in Oset])
    obsdist = jnp.where(degraded_mask, 0, obsdist)

    # Normalize rows to ensure sum of probabilities is 1
    row_sums = jnp.sum(obsdist, axis=1, keepdims=True)
    # obsdist_without_degraded_state = jnp.where(row_sums > 0, obsdist / row_sums, obsdist)  # Avoid division by zero

    return obsdist

In [10]:
def exclude_degraded_states_from_obsdist_and_normalise(obsdist, Oset):
   
        # Exclude degraded states from the observation distribution

    degraded_mask = jnp.array(['g' in label for label in Oset])
    obsdist = jnp.where(degraded_mask, 0, obsdist)

    # Normalize rows to ensure sum of probabilities is 1
    row_sums = jnp.sum(obsdist, axis=1, keepdims=True)
    obsdist_without_degraded_state = jnp.where(row_sums > 0, obsdist / row_sums, obsdist)  # Avoid division by zero

    return obsdist_without_degraded_state

In [11]:
def get_average_cooperativeness(policy, obsdist, mode, Oset, exclude_degraded_state_for_average_cooperation):
    
    if exclude_degraded_state_for_average_cooperation:
        if mode == 'only_state_information' or mode == 'both_state_and_action_information':
            obsdist = exclude_degraded_states_from_obsdist(obsdist, Oset)

        
    policy_cooperation_probabilities = policy[:,:, 0]
    agent_index, state_index = [0, 1]

    average_cooperation_for_each_agent = jnp.einsum(policy_cooperation_probabilities, [agent_index, state_index], obsdist, [agent_index, state_index], [agent_index])
    
    return average_cooperation_for_each_agent

Extracting Final Strategies

1. State information activb

In [12]:
def run_simulation_for_initial_condition(mae, mode, initial_condition, 
                                         exclude_degraded_state_for_average_cooperation):
    """
    Runs a single Monte Carlo simulation and returns the average cooperation and time-to-reach.

    Parameters:
        mae: The POstratAC instance (learning agent).
        information_condition_instance: The instance of Information_Conditions.
        initial_condition: The sampled initial condition for the simulation.
        initial_cooperation_in_degraded_state (int): If 0, cooperation in degraded state is set to zero;
                                                     if 1, it is set to one; otherwise, no changes.
        include_degraded_state_for_average_cooperation (bool): Whether to include the degraded state in average cooperation.

    Returns:
        tuple: (average cooperation, time-to-reach)
    """

    xtraj, fixedpointreached = mae.trajectory(initial_condition, Tmax=10000, tolerance=1e-5)
    final_point = xtraj[-1]

    avg_coop_across_states = get_average_cooperativeness(
        policy=final_point, 
        obsdist=mae.obsdist(final_point), 
        mode=mode, 
        Oset=mae.env.Oset[0],
        exclude_degraded_state_for_average_cooperation = exclude_degraded_state_for_average_cooperation
    )[0]  #we're only considiering agent i

    time_to_reach = xtraj.shape[0]

    return avg_coop_across_states, time_to_reach


def run_simulation_across_conditions(mae, mode, num_samples, 
                                     exclude_degraded_state_for_average_cooperation):
    """
    Runs Monte Carlo simulations across multiple initial conditions.

    Parameters:
        mae: The POstratAC instance (learning agent).
        information_condition_instance: The instance of Information_Conditions.
        num_samples (int): Number of initial conditions to sample.
        initial_cooperation_in_degraded_state (int): If 0, cooperation in degraded state is set to zero;
                                                     if 1, it is set to one; otherwise, no changes.
        include_degraded_state_for_average_cooperation (bool): Whether to include the degraded state in average cooperation.

    Returns:
        list: A list of (average cooperation, time-to-reach) tuples.
    """
    avg_coop_time_pairs = []
    initial_conditions_list = lhs_sampling(mae.Q, num_samples, mae.N)

    for initial_condition in initial_conditions_list:
        result = run_simulation_for_initial_condition(
            mae, mode, initial_condition,
             exclude_degraded_state_for_average_cooperation
        )
        avg_coop_time_pairs.append(result)

    return avg_coop_time_pairs


In [41]:


def compare_four_conditions(num_samples=5, degraded_choice = False, m_value = -6, discount_factor = 0.98, exclude_degraded_state_for_average_cooperation = True):
    """
    Runs simulations for different information conditions and outputs 
    the results for each condition.
    
    Parameters:
        ecopg (EcologicalPublicGood): An instance of the ecological public good model.
        num_samples (int): Number of initial conditions to sample.
        Tmax (int): Maximum time steps for trajectory simulation.
        tolerance (float): Convergence tolerance for fixed point detection.
        
    Returns:
        None (prints the output summaries for each information condition)
    """

    print(locals())
    
    information_modes = [
        'both_state_and_action_information', 
        'only_action_history_information', 
        'only_state_information', 
        'no_information'
    ]

    basin_of_attraction_and_avg_cooperation_results = {}

    
    
    ecopg = EcologicalPublicGood(N=2,
                                 f=1.2, 
                                 c=5, 
                                 m= m_value,
                                 qc=0.02, 
                                 qr= 0.0001, 
                                 degraded_choice = degraded_choice)


    for mode in information_modes:
        # Initialize the information condition
        information_condition_instance = Information_Conditions(ecopg, mode=mode)
        mae = POstratAC(env=information_condition_instance, learning_rates=0.1, discount_factors= discount_factor)

        # Data storage

        # print(f"\nMode: {mode}")

        avg_coop_time_pairs = run_simulation_across_conditions(
            mae = mae, 
            mode = mode,
            num_samples = num_samples, 
            exclude_degraded_state_for_average_cooperation = exclude_degraded_state_for_average_cooperation
        )

        # Create DataFrame for processing
        df = pd.DataFrame(avg_coop_time_pairs, columns=["AverageCooperation", "TimeToReach"])
        total_count = len(df)
        # print(df)


        average_cooperation_across_initial_conditions = df['AverageCooperation'].agg('mean')
        # print("Mean Final Cooperation Across Initial Conditions:", np.round(average_cooperation_across_initial_conditions,2))

        # Classification function
    

        df['Classification'] = df['AverageCooperation'].apply(lambda x: "Defection" if x < 0.4 else "Cooperation" if x > 0.6 else "Mixed" )

        # Summary statistics
        basin_of_attraction_size = df.groupby('Classification')['TimeToReach'].agg(
            MedianTimetoReach='median',
            Percentage=lambda x: round((len(x) / total_count) * 100, 1)
        ).reset_index()

        basin_of_attraction_and_avg_cooperation_results[mode] = {
        "average_cooperation": np.round(average_cooperation_across_initial_conditions, 3),
        "basin_of_attraction_size": basin_of_attraction_size
        }


    return basin_of_attraction_and_avg_cooperation_results
    

# Example usage:


In [59]:
data = compare_four_conditions()

{'num_samples': 5, 'degraded_choice': False, 'm_value': -6, 'discount_factor': 0.98, 'exclude_degraded_state_for_average_cooperation': True}


In [None]:
def get_cooperation_percentage(condition_data):
    df = condition_data.get('basin_of_attraction_size')
    cooperation_row = df[df['Classification'] == 'Cooperation']
    if not cooperation_row.empty:
        return cooperation_row['Percentage'].iloc[0]
    return 0


In [None]:

# Extract cooperation percentages correctly from the DataFrame

# Define the conditions in order
information_modes = [
    'both_state_and_action_information', 
    'only_action_history_information', 
    'only_state_information', 
    'no_information'
]

# Extract cooperation percentages
cooperation_percentages = [get_cooperation_percentage(data[condition]) for condition in information_modes]

# Debugging output
print("Extracted Cooperation Percentages:", cooperation_percentages)

conditions = [
    "Both Social and Ecological State Information", 
    "Only Social Information", 
    "Only Ecological State Information", 
    "No Information"
]

# Create DataFrame for plotting
plot_df = pd.DataFrame({
    'Information Condition': conditions,
    'Cooperation Percentage': cooperation_percentages
})

# Define a color palette
color_map = {
    "Both Social and Ecological State Information": "#4c72b0",  # Muted Blue
    "Only Social Information": "#c44e52",  # Muted Orange
    "Only Ecological State Information": "#55a868",  # Muted Green
    "No Information": "#000000"  # Black
}

# Create figure
fig = go.Figure()

for i, row in plot_df.iterrows():
    condition = row['Information Condition']
    percentage = row['Cooperation Percentage']
    color = color_map[condition]
    
    if percentage == 0:
        # 1. Actual outline bars for zero values (shown in the plot)
        fig.add_trace(go.Bar(
            x=[condition], 
            y=[percentage], 
            marker=dict(color='rgba(0,0,0,0)', line=dict(color=color, width=4)),
            text=f"{percentage:.1f}%",
            textposition='outside',
            textfont=dict(size=15, color='black'),  # Larger, darker percentage text
            showlegend=False,  # Don't show this in the legend
            legendgroup=condition  # Group legend with the solid bar
        ))

        # 2. Hidden solid legend bar (only for legend display)
        fig.add_trace(go.Bar(
            x=[None],  # Invisible bar in the plot
            y=[None],
            name=condition,
            marker=dict(color=color),  # Filled marker for the legend
            legendgroup=condition  # Matches legend with outline bar
        ))

    else:
        # Normal filled bars
        fig.add_trace(go.Bar(
            x=[condition], 
            y=[percentage], 
            name=condition,
            marker=dict(color=color),
            text=f"{percentage:.1f}%",
            textposition='outside',
            textfont=dict(size=15, color='black')  # Larger, darker percentage text
        ))

# Update layout for aesthetics
fig.update_layout(
    yaxis_title="Cooperation Basin Size (%)",
    yaxis=dict(
        range=[0, 100],
        titlefont=dict(size=18)  # Larger, darker y-axis label
    ),
    xaxis=dict(title='', showticklabels=False),
    plot_bgcolor='snow',  # Clean background
    width=500,
    height=675,
    bargap=0,  # Minimize gaps
    legend=dict(
        title="",
        orientation="h",
        yanchor="bottom",
        y=-0.3,
        xanchor="center",
        x=0.5,
    font=dict(size=13.5, color='black')
    )
)

# Show figure
fig.show()


Extracted Cooperation Percentages: [86.7, 0, 60.0, 0]


In [15]:
results_iteration_discount_factors_m_vals = []

for m_value in range(0, -7, -2):
    # Iterate over discount factor from 0.9 to 0.99 in steps of 0.02
    for discount_factor in np.arange(0.9, 0.999, 0.3):
        result = compare_four_conditions(m_value=m_value, discount_factor=discount_factor)
        
        results_iteration_discount_factors_m_vals.append({"m": m_value, "discount_factor": discount_factor, "result": result})


# Print or store results

