---
title: "The Condorcet Jury Theorem and Democratic Rationality"
subtitle: "Sensitivity Analysis of Failure Conditions"
categories: ["theory", "simulation", "causal inference", "sensitivity analysis"]
keep-ipynb: true
self-contained: true
draft: true
toc: true
execute: 
  freeze: auto 
  execute: true
  eval: true
jupyter: applied-bayesian-regression-modeling-env
image: 'evolving_dag.png'
author:
    - url: https://nathanielf.github.io/
    - affiliation: PyMC dev
citation: true
---

In [None]:
import pandas as pd
import numpy as np
import pymc as pm
import matplotlib.pyplot as plt
import arviz as az
import seaborn as sns

In [None]:
# Set random seed for reproducibility
np.random.seed(42)

# ============================================================================
# SIMULATE DATA
# ============================================================================
# Simulate some jury voting data
n_cases = 50  # number of cases
n_jurors = 15  # number of jurors

def make_ground_truth(n_cases, n_jurors, blocks=False):
    # True states (1 = guilty, 0 = not guilty)
    true_states = np.random.binomial(1, 0.5, n_cases)

    # Individual juror competencies (for simulation)
    true_p = 0.7  # average competence
    true_discrimination = 0.5  # how much competencies vary (sd in logit space)

    # Simulate heterogeneous competencies
    logit_p_jurors = np.random.normal(np.log(true_p / (1 - true_p)), 
                                    true_discrimination, 
                                    n_jurors)
    
    if blocks:
        block_id = np.array([
            0, 0, 0, 0, 0,
            1, 1, 1, 1, 1, 1,
            2, 2, 2, 2
        ])
        n_blocks = len(np.unique(block_id))

        true_sigma_block = 1.2
        block_effect = np.random.normal(
            0.0,
            true_sigma_block,
            n_blocks
        )
        logit_p_jurors = (logit_p_jurors + block_effect[block_id])

    p_jurors = 1 / (1 + np.exp(-logit_p_jurors))

    # Simulate votes
    votes = np.zeros((n_cases, n_jurors))
    for i in range(n_cases):
        for j in range(n_jurors):
            if true_states[i] == 1:
                votes[i, j] = np.random.binomial(1, p_jurors[j])
            else:
                votes[i, j] = np.random.binomial(1, 1 - p_jurors[j])

    return votes, p_jurors

votes, p_jurors = make_ground_truth(n_cases, n_jurors)

print(f"Data simulated: {n_cases} cases, {n_jurors} jurors")
print(f"True average competence: {p_jurors.mean():.3f}")
print(f"Majority vote accuracy: {(votes.mean(axis=1) > 0.5).mean():.3f}")


In [None]:
# Define different prior specifications
prior_specs = {
    'weakly_informative': {'alpha': 3, 'beta': 2, 'desc': 'Weakly informative (centered at 0.6)'},
    'strong_competence': {'alpha': 10, 'beta': 5, 'desc': 'Strong prior (p ~ 0.67)'},
    'barely_competent': {'alpha': 6, 'beta': 5, 'desc': 'Skeptical prior (p ~ 0.55)'},
    'incompetent': {'alpha': 5, 'beta': 10, 'desc': 'Incompetent prior (p ~ 0.33)'},
}

In [None]:
# | output: false
    
def fit_condorcet_base_model(votes, spec):

    with pm.Model() as model:
        # SENSITIVITY PARAMETER: Prior on competence
        # This is our first example of treating assumptions as parameters
        p = pm.Beta('p', alpha=spec['alpha'], beta=spec['beta'])
        
        # True state of each case (latent)
        true_state = pm.Bernoulli('true_state', p=0.5, shape=n_cases)
        
        # Likelihood
        vote_prob = pm.Deterministic('vote_prob', pm.math.switch(
            pm.math.eq(true_state[:, None], 1),
            p,
            1 - p
        ))
        
        likelihood = pm.Bernoulli('votes', p=vote_prob, observed=votes)
        
        # Posterior predictive: majority vote accuracy for different jury sizes
        jury_sizes_eval = [3, 7, 15]
        for size in jury_sizes_eval:
            # Simulate votes for a new case (truth = 1)
            votes_sim = pm.Bernoulli(f'sim_votes_{size}', p=p, shape=size)
            majority_correct = pm.Deterministic(
                f'majority_correct_{size}',
                pm.math.sum(votes_sim) > size / 2
            )
        
        # Sample
        idata = pm.sample_prior_predictive()
        idata.extend(pm.sample(2000, tune=1000, random_seed=42,
                                       target_accept=0.95, return_inferencedata=True))
        idata.extend(pm.sample_posterior_predictive(idata))

    return idata, model

traces = {}

for prior_name, spec in prior_specs.items():
    print(f"\nFitting with {spec['desc']}...")
    idata, model = fit_condorcet_base_model(votes, spec)
    traces[prior_name] = idata
    traces[prior_name + '_model'] = model


In [None]:
print("\n" + "="*70)
print("PRIOR SENSITIVITY RESULTS")
print("="*70)
ests = {}
for prior_name in prior_specs.keys():
    for i in [3, 7, 15]:
        p = traces[prior_name]['prior'][f'majority_correct_{i}'].mean().item()
        if prior_name in ests:
            ests[prior_name].append(p)
        else: 
            ests[prior_name] = [p]
    

prior_estimates = pd.DataFrame(ests, index=['Correct % for Majority of 3', 'Correct % for Majority of 7', 'Correct % for Majority of 15'])
prior_estimates


In [None]:
print("\n" + "="*70)
print("PRIOR SENSITIVITY RESULTS")
print("="*70)
ests = {}
for prior_name in prior_specs.keys():
    for i in [3, 7, 15]:
        p = traces[prior_name]['posterior'][f'majority_correct_{i}'].mean().item()
        if prior_name in ests:
            ests[prior_name].append(p)
        else: 
            ests[prior_name] = [p]
    

posterior_estimates = pd.DataFrame(ests, index=['Correct % for Majority of 3', 'Correct % for Majority of 7', 'Correct % for Majority of 15'])
posterior_estimates

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(20, 5))
axs = axs.flatten()
jitters = np.linspace(-0.4, 0.4, 3)

for prior_name in prior_specs.keys():
    axs[0].plot(prior_estimates.index, prior_estimates[prior_name], label=prior_name + ' prior', marker='o')
    axs[1].plot(posterior_estimates.index, posterior_estimates[prior_name], label=prior_name + ' prior', marker='o')
axs[0].legend()
axs[1].legend()
axs[0].set_title("Prior Estimates for Majority Accuracy");
axs[1].set_title("Posterior Estimates for Majority Accuracy");


In [None]:
# Define different priors on discrimination parameter
discrimination_priors = {
    'weak_discrimination': {'sigma': 0.5, 'desc': 'Weak discrimination prior (σ ~ 0.25)'},
    'moderate_discrimination': {'sigma': 1.0, 'desc': 'Moderate discrimination prior (σ ~ 0.5)'},
    'strong_discrimination': {'sigma': 2.0, 'desc': 'Strong discrimination prior (σ ~ 2)'},
}


def fit_discrimination_binomial_model(votes, n_jurors, priors):
    majority_votes = (votes.mean(axis=1) > 0.5).astype(int)
    agreements_per_juror = np.array([(votes[:, j] == majority_votes).sum() for j in range(n_jurors)])
    empirical_competence = agreements_per_juror / n_cases

    with pm.Model() as sensitivity_model:
        # Hyperpriors for the population distribution
        mu_logit_p = pm.Normal('mu_logit_p', mu=0.6, sigma=0.5)
        
        # KEY SENSITIVITY PARAMETER: individual discrimination
        sigma_logit_p = pm.HalfNormal('sigma_logit_p', sigma=spec['sigma'])
        
        # NON-CENTERED PARAMETERIZATION for better sampling
        # Use: logit_p_juror = mu + sigma * z, where z ~ Normal(0, 1)
        z_juror = pm.Normal('z_juror', mu=0, sigma=1, shape=n_jurors)
        logit_p_juror = pm.Deterministic('logit_p_juror', 
                                        mu_logit_p + sigma_logit_p * z_juror)
        p_juror = pm.Deterministic('p_juror', pm.math.invlogit(logit_p_juror))
        
        # Mean competence
        mean_p = pm.Deterministic('mean_p', pm.math.invlogit(mu_logit_p))
        
        # Likelihood
        pm.Binomial('agreements', 
                   n=n_cases, 
                   p=p_juror, 
                   observed=agreements_per_juror)
        
        # Sample with non-centered parameterization
        idata = pm.sample_prior_predictive()
        idata.extend(pm.sample(
            1000,
            tune=2000,
            random_seed=42,
            target_accept=0.95,
            return_inferencedata=True,
            idata_kwargs={"log_likelihood": True}
        )
        )
        idata.extend(pm.sample_posterior_predictive(idata))

    return idata, model


traces_discrimination = {}

for prior_name, spec in discrimination_priors.items():
    print(f"\nFitting with {spec['desc']}...")
    idata, model = fit_discrimination_binomial_model(votes, n_jurors, spec)
    traces_discrimination[prior_name] = idata
    traces_discrimination[prior_name + '_model'] = model


In [None]:
az.summary(idata)


In [None]:
import numpy as np

# ============================================================
# CONSOLIDATED POSTERIOR PREDICTIVE WORKFLOW
# ============================================================

# Required inputs:
#   idata        : InferenceData from collapsed (binomial) model
#   n_cases      : number of cases
#   n_jurors     : number of jurors
#   jury_sizes   : list of jury sizes to evaluate (e.g. [3, 5, 7, 10, 15])

jury_sizes = [3, 5,  7, 10, 15]

# ------------------------------------------------------------
# 1. Generative expansion: truth -> votes
# ------------------------------------------------------------

def simulate_votes(p_juror, n_cases, truth=None):
    """
    p_juror : (n_jurors,)
    returns:
        truth : (n_cases,)
        votes : (n_cases, n_jurors)
    """
    if truth is None:
        truth = np.random.binomial(1, 0.5, size=n_cases)
    votes = np.zeros((n_cases, n_jurors), dtype=int)

    for i in range(n_cases):
        for j in range(n_jurors):
            prob = p_juror[j] if truth[i] == 1 else 1 - p_juror[j]
            votes[i, j] = np.random.binomial(1, prob)

    return truth, votes


# ------------------------------------------------------------
# 3. Diagnostic functions
# ------------------------------------------------------------

def majority_accuracy(votes, truth):
    majority = votes.mean(axis=1) > 0.5
    return np.mean(majority == truth)


def unanimity_rate(votes):
    return np.mean(
        (votes.sum(axis=1) == 0) |
        (votes.sum(axis=1) == votes.shape[1])
    )


def juror_agreement_rates(votes, truth):
    return np.mean(votes == truth[:, None], axis=0)


def error_correlation(votes, truth):
    errors = votes != truth[:, None]
    return np.corrcoef(errors.T)


def majority_accuracy_for_size(votes, truth, jury_size):
    n_cases, n_jurors = votes.shape
    correct = np.zeros(n_cases, dtype=int)

    for i in range(n_cases):
        jurors = np.random.choice(
            n_jurors, size=jury_size, replace=False
        )
        sub_votes = votes[i, jurors]
        majority = sub_votes.mean() > 0.5
        correct[i] = (majority == truth[i])

    return correct.mean()


# ------------------------------------------------------------
# 4. Run posterior predictive simulations
# ------------------------------------------------------------

def run_post_fit_ppc(p_juror_samples, n_cases, truth=None, jury_sizes=[3, 5, 7, 10, 15]):
    # Storage
    n_samples = p_juror_samples.shape[1]
    n_jurors = p_juror_samples.shape[0]
    ppc_results = {
        "majority_accuracy_15": np.zeros(n_samples),
        "unanimity_rate_15": np.zeros(n_samples),
        "agreement_rates": np.zeros((n_samples, n_jurors)),
        "error_corr": np.zeros((n_samples, n_jurors, n_jurors)),
        "majority_accuracy_by_size": {
            k: np.zeros(n_samples) for k in jury_sizes
        }
    }
    for s in range(n_samples):
        truth_s, votes_s = simulate_votes(
            p_juror_samples[:, s],
            n_cases, 
            truth
        )

        # Full jury diagnostics
        ppc_results["majority_accuracy_15"][s] = (
            majority_accuracy(votes_s, truth_s)
        )
        ppc_results["unanimity_rate_15"][s] = (
            unanimity_rate(votes_s)
        )
        ppc_results["agreement_rates"][s] = (
            juror_agreement_rates(votes_s, truth_s)
        )
        ppc_results["error_corr"][s] = (
            error_correlation(votes_s, truth_s)
        )

        # Sub-jury diagnostics
        for k in jury_sizes:
            ppc_results["majority_accuracy_by_size"][k][s] = (
                majority_accuracy_for_size(votes_s, truth_s, k)
            )

    return ppc_results

def summarise_error_corr(ppc_results):
    corr = ppc_results["error_corr"]  # (samples, jurors, jurors)
    n = corr.shape[1]

    off_diag = []
    for s in range(corr.shape[0]):
        mat = corr[s]
        off_diag.append(
            mat[np.triu_indices(n, k=1)]
        )

    off_diag = np.concatenate(off_diag)

    return {
        "mean_off_diag": off_diag.mean(),
        "sd_off_diag": off_diag.std(),
        "p95_abs_corr": np.percentile(np.abs(off_diag), 95),
    }


# ------------------------------------------------------------
# 5. Summaries (example outputs)
# ------------------------------------------------------------

def summarise_post_fit_ppc(ppc_results):
    print("\n=== Majority accuracy (full jury) ===")
    a = np.percentile(
        ppc_results["majority_accuracy_15"], [5, 50, 95]
    )
    print(a)

    print("\n=== Unanimity rate (full jury) ===")
    b = np.percentile(
        ppc_results["unanimity_rate_15"], [5, 50, 95]
    )
    print(b)
    summaries = []
    percentiles = [5, 50, 95]
    print("\n=== Majority accuracy by jury size ===")
    for k in jury_sizes:
        print(f"Jury size {k}:")
        c = np.percentile(
                ppc_results["majority_accuracy_by_size"][k],
                percentiles
            )
        print(c)
        summaries.append(c)


    print("\n=== Mean juror agreement rates ===")
    d = ppc_results["agreement_rates"].mean(axis=0)
    print(d)
    summaries.append(d)
    summaries_df = pd.DataFrame(summaries[:-1]).T
    majorities = [f'majority_accuracy_{i}' for i in jury_sizes]
    columns = majorities
    summaries_df.columns = columns
    summaries_df.index = [f'percentile_{i}' for i in percentiles]
    return summaries_df


def compare_prior_posterior(idata):
    p_juror_samples_prior = (idata.prior["p_juror"].stack(sample=("chain", "draw")).values)  
    p_juror_samples_posterior = (idata.posterior["p_juror"].stack(sample=("chain", "draw")).values)  
    n_jurors, n_samples = p_juror_samples_posterior.shape
    ppc_result_prior = run_post_fit_ppc(p_juror_samples_prior, n_cases, true_states)
    ppc_result_posterior = run_post_fit_ppc(p_juror_samples_posterior, n_cases, true_states)
    summaries_prior = summarise_post_fit_ppc(ppc_result_prior)
    summaries_posterior = summarise_post_fit_ppc(ppc_result_posterior)

    summary = pd.concat({'prior': summaries_prior, 'posterior': summaries_posterior})
    return summary, ppc_result_posterior


In [None]:
# | code-fold: true

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def plot_differences(df):
    # 1. Clean up column names to get numeric x-axis values
    # Assuming your df is named 'df'
    x_values = [3, 5, 7, 10, 15] # Extracted from your column headers
    # If you have 'majority_accuracy_15' as mentioned in your text, add 15 to this list.

    # 2. Extract the specific rows for plotting
    prior_median = df.loc[('prior', 'percentile_50')]
    prior_low = df.loc[('prior', 'percentile_5')]
    prior_high = df.loc[('prior', 'percentile_95')]

    post_median = df.loc[('posterior', 'percentile_50')]
    post_low = df.loc[('posterior', 'percentile_5')]
    post_high = df.loc[('posterior', 'percentile_95')]

    # 3. Create the plot
    plt.figure(figsize=(10, 6))

    # Plot Prior
    plt.plot(x_values, prior_median, label='Prior Median', color='blue', marker='o')
    plt.fill_between(x_values, prior_low, prior_high, color='blue', alpha=0.2, label='Prior (5th-95th)')

    # Plot Posterior
    plt.plot(x_values, post_median, label='Posterior Median', color='red', marker='o')
    plt.fill_between(x_values, post_low, post_high, color='red', alpha=0.2, label='Posterior (5th-95th)')

    # Formatting
    plt.title('Majority Accuracy: Prior vs Posterior Distributions')
    plt.xlabel('Number of Jurors (n) in Majority Calculation')
    plt.ylabel('Majority Accuracy Score')
    plt.xticks(x_values)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)

    plt.show()

def plot_error_corr_heatmap(ppc_results, title="Error correlation"):
    mean_corr = ppc_results["error_corr"].mean(axis=0)

    plt.figure(figsize=(6, 5))
    sns.heatmap(
        mean_corr,
        vmin=-0.3,
        vmax=0.3,
        cmap="coolwarm",
        square=True,
        cbar_kws={"label": "Error correlation"}
    )
    plt.title(title)
    plt.tight_layout()
    plt.show()


In [None]:
summaries_weak_discrimination,  ppc_result_posterior_weak_discrimination = compare_prior_posterior(traces_discrimination['weak_discrimination'])

summaries_weak_discrimination


In [None]:
plot_error_corr_heatmap(ppc_result_posterior_weak_discrimination)

summarise_error_corr(ppc_result_posterior_weak_discrimination)


In [None]:
plot_differences(summaries_weak_discrimination)

In [None]:
summaries_moderate_discrimination, ppc_result_posterior_moderate_discrimination = compare_prior_posterior(traces_discrimination['moderate_discrimination'])

summaries_moderate_discrimination

In [None]:
plot_error_corr_heatmap(ppc_result_posterior_moderate_discrimination)

summarise_error_corr(ppc_result_posterior_moderate_discrimination)


In [None]:
plot_differences(summaries_moderate_discrimination)

In [None]:
summaries_strong_discrimination, ppc_result_posterior_strong_discrimination = compare_prior_posterior(traces_discrimination['strong_discrimination'])

summaries_strong_discrimination


In [None]:
plot_error_corr_heatmap(ppc_result_posterior_strong_discrimination)

summarise_error_corr(ppc_result_posterior_strong_discrimination)


In [None]:
plot_differences(summaries_strong_discrimination)

### Case Difficulty


In [None]:
def ppc_with_case_difficulty(
    idata,
    n_cases,
    sigma_case,
    rng=np.random.default_rng(123),
    true_states = None
):
    """
    PPC generator with shared case-level shocks.
    """

    logit_p = (idata.posterior['logit_p_juror']
    .stack(sample=("chain", "draw")).values)

    n_jurors, n_samples = logit_p.shape
    truth = np.zeros((n_samples, n_cases))
    if true_states is None: 
        true_states = rng.binomial(1, 0.5, size=n_cases)
    truth[:, ] = true_states
    delta_case = rng.normal(0.0, sigma_case, size=(n_samples, n_cases))

    votes = np.zeros((n_samples, n_cases, n_jurors), dtype=int)

    for s in range(n_samples):
        for i in range(n_cases):
            sign = 1 if truth[s, i] == 1 else -1
            logits = sign * logit_p[:, s] + delta_case[s, i]
            p = 1 / (1 + np.exp(-logits))
            votes[s, i] = rng.binomial(1, p)

    return {
        "votes": votes,
        "true_state": truth,
        "delta_case": delta_case,
    }


for sigma in [0.0, 0.2, 0.5, 1]:
    ppc = ppc_with_case_difficulty(
        traces_discrimination['weak_discrimination'],
        n_cases=50,
        sigma_case=sigma,
        true_states=true_states
    )

    n_samples = ppc['votes'].shape[0]
    corrs = []

    for s in range(n_samples):
        C = error_correlation(ppc["votes"][s, :], ppc['true_state'][s])
        corrs.append(C)

    corrs = np.stack(corrs)
    off_diag = corrs[:, ~np.eye(corrs.shape[1], dtype=bool)]

    acc = [majority_accuracy(ppc["votes"][i, :, :], ppc["true_state"][i, :]) for i in range(n_samples)]

    print(f"\nσ_case = {sigma}")
    print("Mean majority accuracy:", np.mean(acc))
    print("mean_corr", np.nanmean(off_diag))
    print("median_corr", np.nanmedian(off_diag))
    print("p95_abs_corr", np.nanpercentile(np.abs(off_diag), 95))
    print("nan_fraction", np.isnan(off_diag).mean())


## Final Model


In [None]:
votes, p_jurors = make_ground_truth(n_cases, n_jurors, blocks=True)
majority_votes = (votes.mean(axis=1) > 0.5).astype(int)
agreements_per_juror = np.array([(votes[:, j] == majority_votes).sum() for j in range(n_jurors)])

block_id = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2])

with pm.Model() as final_model:

    # --------------------------------------------------
    # Juror skill (individual discrimination)
    # --------------------------------------------------
    mu_alpha = pm.Normal("mu_alpha", mu=0.0, sigma=0.5)
    sigma_alpha = pm.HalfNormal("sigma_alpha", sigma=1.0)

    z_juror = pm.Normal("z_juror", 0.0, 1.0, shape=n_jurors)
    alpha_j = pm.Deterministic(
        "alpha_j",
        mu_alpha + sigma_alpha * z_juror
    )

    # --------------------------------------------------
    # Block / faction effects
    # --------------------------------------------------
    n_blocks = len(np.unique(block_id))
    sigma_block = pm.HalfNormal("sigma_block", sigma=1.0)

    block_effect = pm.Normal(
        "block_effect",
        mu=0.0,
        sigma=sigma_block,
        shape=n_blocks
    )

    beta_block_j = block_effect[block_id]

    # --------------------------------------------------
    # Case difficulty (asymmetric, shared)
    # --------------------------------------------------
    mu_case = pm.Normal("mu_case", mu=0.0, sigma=0.5)
    sigma_case = pm.HalfNormal("sigma_case", sigma=1.0)

    # Expected difficulty effect (collapsed over cases)
    delta_bar = pm.Normal(
        "delta_bar",
        mu=mu_case,
        sigma=sigma_case / pm.math.sqrt(n_cases)
    )

    # --------------------------------------------------
    # Effective correctness probability
    # --------------------------------------------------
    logit_p_correct = alpha_j + beta_block_j + delta_bar
    p_correct = pm.Deterministic(
        "p_correct",
        pm.math.sigmoid(logit_p_correct)
    )

    # --------------------------------------------------
    # Binomial likelihood (collapsed)
    # --------------------------------------------------
    pm.Binomial(
        "agreements",
        n=n_cases,
        p=p_correct,
        observed=agreements_per_juror
    )

    idata = pm.sample(
        2000,
        tune=2000,
        target_accept=0.975,
        return_inferencedata=True
    )


In [None]:
az.summary(idata)
