# OSCCAI Simulation Notebook

## Usage Instructions

This notebook simulates output from an Open Source Collective Constitutional AI (OSCCAI) tool.
Please ensure you have the necessary libraries installed as specified in the `requirements.txt` file.
You will need an OpenAI API key to run this notebook. Set it as an environment variable or enter it when prompted.

**Required Setup:**
- Install required libraries using `pip install -r requirements.txt`
- Set your OpenAI API key:
  - Option 1: Set as an environment variable `OPENAI_API_KEY`
  - Option 2: Enter it when prompted in the notebook

## 1. Setup and Imports

In [None]:
# Uncomment and run this cell to install required packages
!pip install numpy pandas matplotlib openai tenacity asyncio nest_asyncio

In [None]:
# Import required libraries
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import json
from getpass import getpass
from IPython.display import display, Markdown
import pandas as pd
import numpy as np
from scipy.stats import beta, nbinom
import random
from openai import OpenAI
import openai

# Try to import from google.colab if available, otherwise use a fallback
try:
    from google.colab import files, userdata
    IN_COLAB = True
except ImportError:
    IN_COLAB = False
    
    # Fallback function for file upload in Jupyter Notebook
    def upload_files():
        from ipywidgets import FileUpload
        from IPython.display import display
        
        uploader = FileUpload(accept='.csv', multiple=False)
        display(uploader)
        
        def on_upload_change(change):
            if change['type'] == 'change' and change['name'] == 'value':
                filename = list(change['new'].keys())[0]
                content = change['new'][filename]['content']
                with open(filename, 'wb') as f:
                    f.write(content)
                print(f"Uploaded file: {filename}")
        
        uploader.observe(on_upload_change, names='value')
        return uploader

# Set up OpenAI API key
if os.getenv("OPENAI_API_KEY"):
    client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
elif IN_COLAB and userdata.get('OPENAI_API_KEY'):
    # Add OPENAI_API_KEY as secret in google colab to skip manual entry
    client = OpenAI(api_key=userdata.get('OPENAI_API_KEY'))
else:
    client = OpenAI(api_key=getpass("Please enter your OpenAI API key: "))

In [None]:
# Use cache to efficiently store and reuse LLM responses
import hashlib
from functools import lru_cache

# Create a cache dictionary
llm_cache = {}

# Create a function to generate a unique key for each prompt
def generate_cache_key(prompt):
    return hashlib.md5(f"{prompt}".encode()).hexdigest()

## 2. Data Upload and Preprocessing

In [None]:
import pandas as pd
import requests
from io import StringIO

# Define flags to control whether to use default files
USE_DEFAULT_DATA = True
USE_DEFAULT_COMMUNITY = True
DATA_FILE_URL = "https://raw.githubusercontent.com/collect-intel/osccai-simulation/refs/heads/main/data/ccai_polis_data_voters.csv"
COMMUNITY_FILE_URL = "https://raw.githubusercontent.com/collect-intel/osccai-simulation/refs/heads/main/data/simulation_community_inputs.json"

# Function to curl the CSV from GitHub
def curl_github_file(url):
    response = requests.get(url)
    return response.text if response.status_code == 200 else None

# Get actual data to base distributions on
if USE_DEFAULT_DATA:
    print("Attempting to load data from GitHub...")
    csv_content = curl_github_file(DATA_FILE_URL)
    if csv_content:
        data = pd.read_csv(StringIO(csv_content))
    else:
        print("Failed to load default data. Please upload manually.")
        USE_DEFAULT_DATA = False

if not USE_DEFAULT_DATA:
    print("Please upload your Polis data CSV file.")
    if IN_COLAB:
        uploaded = files.upload()
        filename = list(uploaded.keys())[0]
    else:
        uploader = upload_files()
        # Wait for the user to upload a file
        while not uploader.value:
            pass
        filename = list(uploader.value.keys())[0]

    # Load the uploaded CSV into a DataFrame
    data = pd.read_csv(filename)

# Verify that data was loaded
if data is not None:
    print("Data loaded successfully.")
    print(f"Shape of the data: {data.shape}")
else:
    print("Failed to load data. Please check your input or try uploading manually.")

# Load community inputs
if USE_DEFAULT_COMMUNITY:
    print("Attempting to load community inputs from GitHub...")
    community_json = curl_github_file(COMMUNITY_FILE_URL)
    if community_json:
        community_data = json.loads(community_json)[0]  # Assuming we want the first community
        print("Community inputs loaded successfully.")
    else:
        print("Failed to load default community inputs. Will prompt for manual input.")
        USE_DEFAULT_COMMUNITY = False

In [None]:
# Calculate additional metrics

# Calculate pass votes per participant
data['n-pass'] = data['n-votes'] - (data['n-agree'] + data['n-disagree'])

# Calculate % agree, % disagree, and % pass per participant
data['% agree'] = data['n-agree'] / data['n-votes']
data['% disagree'] = data['n-disagree'] / data['n-votes']
data['% pass'] = data['n-pass'] / data['n-votes']

## 3. Define Simulation Functions

In [None]:
def fit_beta_distribution(data):
    """Fit a beta distribution to the given data."""
    data_cleaned = data.clip(lower=0.001, upper=0.999)
    a, b, _, _ = beta.fit(data_cleaned, floc=0, fscale=1)
    return a, b

def fit_negative_binomial(data):
    """Fit a negative binomial distribution to the given data."""
    mean = data.mean()
    var = data.var()
    n = (mean ** 2) / (var - mean) if var > mean else 10
    p = mean / (mean + n)
    return n, p
    
def calculate_statements_per_participant(num_participants):
    """Calculate the number of statements per participant based on actual data."""
    fattening_factor = 3 # fatten the right tail of the skew
    max_statements = 20
    # Add 1 to all values in the actual data to shift up any 0's to 1 minimum
    n_comments = data['n-comments'] + 1
    n, p = fit_negative_binomial(n_comments)
    
    # Adjust parameters to fatten the tail
    adjusted_n = n / fattening_factor
    adjusted_p = adjusted_n / (adjusted_n + n_comments.mean())
    
    # Generate samples from the adjusted negative binomial distribution
    samples = np.random.negative_binomial(n=adjusted_n, p=adjusted_p, size=num_participants)
    
    # Ensure at least 1 statement per participant and cap at max_statements
    return [min(max(1, sample), max_statements) for sample in samples]

def sample_statements_per_participant(num_participants):
    """Sample the number of statements per participant directly from actual data."""
    max_statements = 20
    
    # Ensure n-comments has at least 1 comment per participant
    n_comments = np.maximum(data['n-comments'], 1)
    
    # Sample with replacement from the actual data
    samples = np.random.choice(n_comments, size=num_participants, replace=True)
    
    # Cap at max_statements
    return [min(sample, max_statements) for sample in samples]

def calculate_votes_per_participant(num_participants, max_statements):
    """Calculate the number of votes per participant based on actual data, capped by max_statements."""
    n_votes = data['n-votes']
    n, p = fit_negative_binomial(n_votes)
    return [min(max(np.random.negative_binomial(n, p), 1), max_statements) for _ in range(num_participants)]

def calculate_vote_distribution_per_participant(num_participants):
    """Calculate the vote distribution per participant based on actual data."""
    agree_a, agree_b = fit_beta_distribution(data['% agree'])
    disagree_a, disagree_b = fit_beta_distribution(data['% disagree'])

    distributions = []
    for _ in range(num_participants):
        agree = np.random.beta(agree_a, agree_b)
        disagree = np.random.beta(disagree_a, disagree_b)
        pass_prob = max(0, 1 - (agree + disagree))
        total = agree + disagree + pass_prob
        distributions.append([round(agree/total, 3), round(disagree/total, 3), round(pass_prob/total, 3)])

    return distributions

## 3.1 Define OpenAI Functions

In [None]:
import time
import json
from tenacity import retry, stop_after_attempt, wait_random_exponential


@retry(wait=wait_random_exponential(min=1, max=10), stop=stop_after_attempt(3))
def get_openai_completion(prompt):
    """Sends a prompt to the OpenAI API and returns the completion, using cache if available."""
    cache_key = generate_cache_key(prompt)

    if cache_key in llm_cache:
        print("Using cached response")
        return llm_cache[cache_key]

    try:
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are an assistant that outputs JSON-formatted data."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7
        )
        completion = response.choices[0].message.content

        # Cache the response
        llm_cache[cache_key] = completion
        print("Received response from OpenAI")
        return completion
    except Exception as e:
        if "RateLimitError" in str(type(e)):
            retry_after = int(e.headers.get("Retry-After", 10)) if getattr(e, 'headers', None) else 10
            print(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
            time.sleep(int(retry_after))
            raise
        print(f"Error during OpenAI API call: {e}")
        raise  # Re-raise the exception to stop execution

import nest_asyncio
import asyncio
from asyncio import TimeoutError

# Apply nest_asyncio to run with an event loop already running in the notebook
nest_asyncio.apply()
from concurrent.futures import ThreadPoolExecutor

async def get_openai_completion_async(prompt):
    """Asynchronous version of get_openai_completion with caching."""
    print(f"Sending prompt to OpenAI asynchronously (length: {len(prompt)})")
    loop = asyncio.get_running_loop()
    try:
        with ThreadPoolExecutor() as pool:
            return await asyncio.wait_for(
                loop.run_in_executor(pool, get_openai_completion, prompt),
                timeout=120  # 120 seconds timeout
            )
    except TimeoutError:
        print("API call timed out")
        return None
    except Exception as e:
        print(f"Error during API call: {e}")
        return None

# returns json string
async def get_openai_completion_chunked(prompt, data, chunk_size=10):
    print(f"Starting chunked completion with {len(data)} items, chunk size {chunk_size}")
    full_response = {"data": []}

    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    print(f"Prepared {len(chunks)} chunks for processing")

    async def process_chunk(chunk_index, chunk):
        chunk_prompt = f"{prompt}\nProcess the following chunk:\n{json.dumps(chunk)}"
        try:
            response = await get_openai_completion_async(chunk_prompt)
            print(f"Received response for chunk {chunk_index + 1}")
            return response
        except Exception as e:
            print(f"Error processing chunk {chunk_index + 1}: {e}")
            return None

    async_responses = await asyncio.gather(*[process_chunk(i, chunk) for i, chunk in enumerate(chunks)])

    for chunk_response in async_responses:
        if chunk_response:
            try:
                chunk_data = json.loads(chunk_response)
                full_response["data"].extend(chunk_data["data"])
            except json.JSONDecodeError as e:
                print(f"Error decoding JSON from chunk response: {e}")

    print(f"Processed {len(async_responses)} chunks")

    return full_response

# Example usage of get_openai_completion_chunked
async def process_data_async():
    # ... other code ...
    chunked_response = await get_openai_completion_chunked(prompt, data, chunk_size=10)
    # ... process chunked_response ...
# In the main execution:
# asyncio.run(process_data_async())

## 4. User Input Collection

In [4]:
print("### Community Information Collection ###")
if USE_DEFAULT_COMMUNITY:
    community_name = community_data['community_name']
    community_description = community_data['community_description']
    community_goals = community_data['community_goals_for_ai_model']
    print(f"Using default community: {community_name}")
else:
    community_name = input("Enter the community name: ").strip()
    community_description = input("Enter the community description: ").strip()
    community_goals = input("Enter the community goals for the AI model: ").strip()

# Consolidate inputs into a single community description
full_community_description = f"{community_description}\nGoals: {community_goals}"

print("\n### Simulation Parameters Collection ###")
if USE_DEFAULT_COMMUNITY:
    num_subgroups = 2
    num_participants = 20
    print(f"Using default values: {num_subgroups} subgroups, {num_participants} participants")
else:
    while True:
        try:
            num_subgroups = int(input("Enter the number of subgroups (G) [default: 2]: ") or "2")
            if num_subgroups <= 0:
                raise ValueError("Number of subgroups must be positive.")
            break
        except ValueError as e:
            print(f"Invalid input: {e}")

    while True:
        try:
            num_participants = int(input("Enter the total number of participants (P) [default: 20]: ") or "20")
            if num_participants <= 0:
                raise ValueError("Number of participants must be positive.")
            break
        except ValueError as e:
            print(f"Invalid input: {e}")

statement_format = input("Enter the statement format (default: 'The best response is one that...'): ").strip()
if not statement_format:
    statement_format = "The best response is one that..."

# upload a saved llm_cache file to resume a previous simulation
use_saved_cache = input("Do you want to use a saved LLM cache? (y/n): ").lower().strip() == 'y'
if use_saved_cache:
    if IN_COLAB:
        uploaded = files.upload()
        cache_file = list(uploaded.keys())[0]
    else:
        uploader = upload_files()
        while not uploader.value:
            pass
        cache_file = list(uploader.value.keys())[0]
    
    llm_cache.update(load_llm_cache(cache_file))
    print(f"Loaded {len(llm_cache)} items into LLM cache")

### Community Information Collection ###


NameError: name 'USE_DEFAULT_COMMUNITY' is not defined

## 5. Simulation Setup Calculations

In [None]:
def calculate_participants_per_group(num_subgroups, num_participants):
    """Distribute participants among groups with variability but no extreme disparities."""
    base = num_participants // num_subgroups
    remainder = num_participants % num_subgroups
    participants_per_group = [base] * num_subgroups
    for i in range(remainder):
        participants_per_group[i] += 1
    # Introduce slight variability
    for i in range(len(participants_per_group)):
        variation = np.random.randint(-base//10, base//10+1)
        participants_per_group[i] = max(1, participants_per_group[i] + variation)
    total_participants = sum(participants_per_group)
    # Adjust if total participants changed due to variability
    if total_participants != num_participants:
        difference = num_participants - total_participants
        for i in range(abs(difference)):
            index = i % num_subgroups
            if difference > 0:
                participants_per_group[index] += 1
            else:
                participants_per_group[index] = max(1, participants_per_group[index] - 1)
    return participants_per_group

participants_per_group = calculate_participants_per_group(num_subgroups, num_participants)
print(f"\nParticipants per group: {participants_per_group}")

## 6. LLM Interaction for Group and Participant Generation

In [None]:
# Generate subgroups
subgroup_generation_prompt = f"""
You are tasked with creating realistic subgroups for a community simulation. Use the following information to generate detailed descriptions:

Community Description: {full_community_description}
Number of Subgroups: {num_subgroups}

For each subgroup:
1. Provide a 2-sentence description of the subgroup. This description must start with the words: "This group consists of individuals who..."

Ensure descriptions are diverse and realistic within the context of the community.

Return your response in the following JSON format:
{{"subgroups": [
    {{"description": "This group consists of individuals who <rest of description>"}},
    ...]
}}
"""

subgroup_generation_response = get_openai_completion(subgroup_generation_prompt)

try:
    subgroup_data = json.loads(subgroup_generation_response)
    subgroups = subgroup_data['subgroups']
    print("\nGenerated Subgroups:")
    print(json.dumps(subgroups, indent=4))
except json.JSONDecodeError as e:
    print(f"Error parsing JSON: {e}")
    subgroups = []

In [None]:
indexed_subgroup_data = [
    {
        "subgroup_index": i,
        "num_participants": num_participants,
        "description": subgroup['description']
    }
    for i, (subgroup, num_participants) in enumerate(zip(subgroups, participants_per_group))
]

async def generate_participants():
    participant_generation_prompt = """
    Generate unique, 1-sentence descriptions of individuals who could belong to the following subgroups:
    
    For each subgroup in the input data, generate the specified number of participant descriptions.
    Ensure descriptions are diverse and realistic within the context of the community.
    
    The description of each participant should take this form: "A <demographic description> who is <extended description of situation, personality, or goals>."
    Do not include the name of the participant.

    Return your response in the following JSON format:
    {"data": [{ "subgroup_index": 0, "participants": ["Participant 1 description", "Participant 2 description", ...] },
              { "subgroup_index": 1, "participants": [...] },
              ...]}
    """

    chunked_response = await get_openai_completion_chunked(
        participant_generation_prompt, 
        indexed_subgroup_data, 
        chunk_size=1  # Adjust this value as needed
    )

    return chunked_response

# Run the async function to generate participants
try:
    # This will work in both Jupyter and Colab
    loop = asyncio.get_event_loop()
    all_participants = loop.run_until_complete(generate_participants())
except RuntimeError:
    # Fallback for environments where get_event_loop() might fail
    all_participants = asyncio.run(generate_participants())


In [None]:
# Combine subgroups by index from all_subgroups_data and participants from all_participants into group_data
group_data = {
    "subgroups": [
        {
            "description": subgroup_data['description'],
            "participants": next(item for item in all_participants['data'] if item["subgroup_index"] == subgroup_data["subgroup_index"])["participants"]
        }
        for subgroup_data in indexed_subgroup_data
    ]
}



print("\nGenerated Subgroups and Participants:")
print(json.dumps(group_data, indent=4))

## 7. LLM Interaction for Statement Generation

In [None]:
# Calculate the total number of participants
total_participants = sum(len(subgroup['participants']) for subgroup in group_data['subgroups'])

# Sample statements per participant - Ensure the number of samples matches the total number of participants
statements_per_participant = sample_statements_per_participant(total_participants)

# Create an iterator from statements_per_participant
statements_iter = iter(statements_per_participant)

# Prepare the data structure for chunked processing
chunkable_data = []
participant_id_counter = 1  # Initialize a counter for unique participant IDs

for subgroup_index, subgroup in enumerate(group_data['subgroups']):
    for participant in subgroup['participants']:
        chunkable_data.append({
            'participant_id': participant_id_counter,
            'subgroup_index': int(subgroup_index),
            'num_statements': int(next(statements_iter)),
            'participant': participant
        })
        participant_id_counter += 1  # Increment the counter for the next participant


In [None]:
# Verify that all statements_per_participant values were used
if any(True for _ in statements_iter):
    print("Warning: Not all statements_per_participant values were used.")

# Prepare subgroup descriptions
subgroup_descriptions = [
    f"Subgroup {i}: {subgroup['description']}"
    for i, subgroup in enumerate(group_data['subgroups'])
]

statement_generation_prompt = f"""
Generate statements for a community AI model alignment survey. Use the following information:

Community Description: {full_community_description}
Statement Format: "{statement_format}"

Subgroup Descriptions:
{json.dumps(subgroup_descriptions, indent=2)}

For each participant in the input data, generate the specified number of statements that align with their subgroup and individual characteristics. Ensure statements are diverse and relevant to the community's goals.

The input data format is:
[participant_id, subgroup_index, number_of_statements_to_generate, participant_description]

Return your response in the following JSON format:
{{
    "data": [
        {{
            "participant_id": "<participant_id>",
            "statements": ["Statement 1", "Statement 2", ...]
        }},
        ...
    ]
}}
"""

async def generate_statements():
    chunked_response = await get_openai_completion_chunked(
        statement_generation_prompt, 
        chunkable_data, 
        chunk_size=10  # Adjust this value as needed
    )
    return chunked_response

# Run the async function to generate statements
try:
    # This will work in both Jupyter and Colab
    loop = asyncio.get_event_loop()
    all_statements = loop.run_until_complete(generate_statements())
except RuntimeError:
    # Fallback for environments where get_event_loop() might fail
    all_statements = asyncio.run(generate_statements())

# convert list of dict objects: [{"participant_id":<participant_id>, "statements":[<statements>]}] to a dict with keys: {<participant_id>:[<statements>],}
participant_statements = {item['participant_id']: item['statements'] for item in all_statements['data']}

try:
    print("\nGenerated Participant Statements:")
    print(json.dumps(participant_statements, indent=4))
except json.JSONDecodeError as e:
    print(f"Error parsing JSON: {e}")
    participant_statements = []

In [None]:
# Verify that the number of statements per participant is consistent
assert statements_per_participant == [len(s) for s in participant_statements.values()] == [len(i['statements']) for i in  all_statements['data']] == [i['num_statements'] for i in chunkable_data]

## 8. LLM Interaction for Vote Simulation

In [None]:
## 8. LLM Interaction for Vote Simulation

print("Starting vote simulation process...")

# Prepare numbered statements list
statements_list = []
statement_id = 1
for participant_id, statements in participant_statements.items():
    for stmt in statements:
        statements_list.append({'statement_id': statement_id, 'text': stmt})
        statement_id += 1

print(f"Prepared {len(statements_list)} statements for {len(participant_statements)} participants")

numbered_statements_list = json.dumps(statements_list, indent=4)

# Prepare participants and assignments data
max_statements = len(statements_list)
votes_per_participant = calculate_votes_per_participant(len(participant_statements), max_statements)
vote_distributions = calculate_vote_distribution_per_participant(len(participant_statements))

print(f"Calculated votes per participant and vote distributions")

chunkable_data = []
for i, (participant_id, statements) in enumerate(participant_statements.items()):
    num_votes = votes_per_participant[i]
    available_statements = [s['statement_id'] for s in statements_list]
    assigned_statements = random.sample(available_statements, min(num_votes, len(available_statements)))
    chunkable_data.append({
        'participant_id': participant_id,
        'statements_assigned': assigned_statements,
        'vote_distribution': dict(zip(['agree', 'disagree', 'pass'], vote_distributions[i]))
    })

print(f"Prepared chunkable data for {len(chunkable_data)} participants")

vote_simulation_prompt = f"""
Simulate voting patterns for a community AI model alignment survey. Use the following information:

Community Description: {full_community_description}
Statements:
{numbered_statements_list}

For each participant in the input data, determine how they would likely vote on their assigned statements. Use the following voting options:
1 = Agree
-1 = Disagree
0 = Pass

Ensure that each participant's voting pattern closely matches their target vote distribution.

The input data format is:
[participant_id, statements_assigned, vote_distribution]

Return your response in the following JSON format:
{{
    "data": [
        {{
            "participant_id": "Unique identifier",
            "votes": {{"statement_id": vote, ...}}
        }},
        ...
    ]
}}
"""

print("Prepared vote simulation prompt")

async def simulate_votes():
    print("Starting vote simulation...")
    chunked_response = await get_openai_completion_chunked(
        vote_simulation_prompt,
        chunkable_data,
        chunk_size=5  # Adjust this value as needed
    )
    print("Completed vote simulation")
    return chunked_response

# Run the async function to simulate votes
print("Initiating async vote simulation...")
try:
    loop = asyncio.get_event_loop()
    vote_simulation_response = loop.run_until_complete(simulate_votes())
    print("Async vote simulation completed")
except RuntimeError:
    print("RuntimeError occurred, falling back to asyncio.run()")
    vote_simulation_response = asyncio.run(simulate_votes())
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    vote_simulation_response = None

if vote_simulation_response:
    try:
        vote_data = vote_simulation_response['data']
        # make sure vote statement_ids are ints
        participant_votes = {item['participant_id']: {int(stmt_id): vote for stmt_id, vote in item['votes'].items()} for item in vote_data}
        # participant_votes = { <participant_id>: {<statement_id>: <vote>, ...}, ... }
        print("\nSimulated Votes:")
        print(json.dumps(participant_votes, indent=4))
    except (json.JSONDecodeError, KeyError) as e:
        print(f"Error processing vote data: {e}")
        participant_votes = {}
else:
    print("No vote simulation response received")
    participant_votes = {}

print("Vote simulation process completed")

## 9. Vote Matrix Generation

In [None]:
# Create empty matrix (rows = participants, columns = statements)
participant_ids = list(participant_votes.keys())
statement_ids = sorted(set(stmt_id for votes in participant_votes.values() for stmt_id in votes))

vote_matrix = pd.DataFrame(index=participant_ids, columns=statement_ids)

# Populate matrix with votes
for participant_id, votes in participant_votes.items():
    for stmt_id, vote_value in votes.items():
        vote_matrix.at[participant_id, stmt_id] = vote_value

# Convert data types
vote_matrix = vote_matrix.apply(pd.to_numeric, errors='coerce')

## 10. Results Visualization and Analysis

In [None]:
import matplotlib.colors as mcolors

# Create a custom colormap
cmap = mcolors.ListedColormap(['red', 'gray', 'green'])
bounds = [-1.5, -0.5, 0.5, 1.5]
norm = mcolors.BoundaryNorm(bounds, cmap.N)

plt.figure(figsize=(12, 8))
plt.imshow(vote_matrix.fillna(0), aspect='auto', cmap=cmap, norm=norm, interpolation='none')

# Create a custom colorbar
cbar = plt.colorbar(ticks=[-1, 0, 1])
cbar.set_ticklabels(['Disagree', 'Pass', 'Agree'])
cbar.set_label('Vote')

plt.xlabel('Statements')
plt.ylabel('Participants')
plt.title('Vote Matrix Heatmap')
plt.show()

# Calculate and display vote distribution statistics
vote_counts = vote_matrix.stack().value_counts()
print("\n### Vote Distribution Statistics ###")
print(vote_counts)

In [None]:
# Convert vote_matrix to a list of lists
vote_matrix_list = vote_matrix.values.tolist()

# Replace NaN values with None
vote_matrix_list = [[None if pd.isna(value) else int(value) for value in row] for row in vote_matrix_list]

# Print the result
print("Vote matrix as a list of lists:")
print("[")
for row in vote_matrix_list:
    print(f"    {row},")
print("]")

In [None]:
## 11. Data Export and LLM Cache Management

import json
from IPython.display import FileLink

# Create a dictionary mapping statement_id to statement text
statement_dict = {item['statement_id']: item['text'] for item in statements_list}

# Create a reverse mapping of statement text to id
text_to_id = {text: id for id, text in statement_dict.items()}

# Prepare the data structure
export_data = {
    "community_info": {
        "name": community_name,
        "description": community_description,
        "goals": community_goals
    },
    "user_inputs": {
        "num_subgroups": num_subgroups,
        "num_participants": num_participants
    },
    "statement_format": statement_format,
    "statements": [
        {"id": stmt_id, "text": text} for stmt_id, text in statement_dict.items()
    ],
    "subgroups": [
        {
            "index": i,
            "description": subgroup["description"],
            "participants": [
                {
                    "id": participant_id,
                    "description": participant,
                    "statements": [
                        {"id": text_to_id[stmt], "text": stmt}
                        for stmt in participant_statements.get(int(participant_id), [])
                        if stmt in text_to_id
                    ],
                    "votes": {int(k): v for k, v in participant_votes.get(int(participant_id), {}).items()}
                }
                for participant_id, participant in enumerate(subgroup["participants"], start=1)
            ]
        }
        for i, subgroup in enumerate(group_data["subgroups"])
    ]
}

# Export the data to a JSON file
with open('simulation_data.json', 'w') as f:
    json.dump(export_data, f, indent=2)

print("Simulation data exported to 'simulation_data.json'")

# Export the LLM cache
with open('llm_cache.json', 'w') as f:
    json.dump(llm_cache, f, indent=2)

print("LLM cache exported to 'llm_cache.json'")

# Provide download links for Colab or local
if IN_COLAB:
    from google.colab import files
    files.download('simulation_data.json')
    files.download('llm_cache.json')
else:
    from IPython.display import FileLink
    display(FileLink('simulation_data.json'))
    display(FileLink('llm_cache.json'))