# California Recall Simulation

In [None]:
from pref_voting.voting_methods import *
from pref_voting.generate_spatial_profiles import *
from pref_voting.analysis import estimated_std_error, binomial_confidence_interval
from pref_voting.spatial_profiles import linear_utility

import numpy as np
import pandas as pd

from tqdm.notebook import tqdm

In [None]:
# Simulation Parameters
verbose = False

csvfile = './data/recall_data'

# recall_decision_type, replacement_rule, manipulation_level
recall_replacement_decisions =  [
    ('sincere', 'sincere', 1.0),
    ('strategic', 'strategic-top-two', 1.0),
    # ('strategic', 'strategic-top-two', 0.5),
    ('strategic', 'strategic-top-two', 0.0),
]

concentration_parameters = [
    0.1, 
    1.0, 
    10.0, 
    100.0, 
    1_000.0, 
    10_000.0, 
    100_000.0, 
]

min_num_samples_for_subjective_prob = 10  
max_num_samples_for_subjective_prob = 10  

num_cands_incumbent_party =  2 
num_cands_opposition_party = 4

num_voters = 1_000

concentration_param_for_num_voters_left = 50
concentration_param_for_num_voters_right = 50

binom_ci_threshold_for_events = 0.01
min_num_samples_for_events = 1_000
max_num_samples_for_events = 100_000

std = 0.5

num_dims = 2

In [None]:
def get_num_samples(min_num_samples, max_num_samples): 
    return np.random.randint(min_num_samples, max_num_samples + 1)

In [None]:
def largest_pl_score_event(pl_scores):

    max_pl_score = max(pl_scores)
    event = [int(score == max_pl_score) for score in pl_scores]
    return event

def top_two_pl_score_event(pl_scores): 
    sorted_pl_scores = sorted(pl_scores, reverse=True)
    event = [int(score in sorted_pl_scores[:2]) for score in pl_scores]
    return event

In [None]:

def find_candidate_distances(sprof): 
    dist = {c: {} for c in sprof.candidates}
    for c1 in sprof.candidates: 
        for c2 in sprof.candidates: 
            dist[c1][c2] = np.linalg.norm(sprof.candidate_position(c1) - sprof.candidate_position(c2))
    return dist

In [None]:
def generate_strategic_vote_shares(
        vote_shares, top_two_cands, sprof, manipulation_level): 
    """
    Return a vector of vote shares taking into account the voters that strategize
    """
    cand_distances = find_candidate_distances(sprof)
    updated_vote_shares = vote_shares.copy()

    top_two_c1, top_two_c2 = top_two_cands
    for c  in vote_shares.keys(): 
        v = vote_shares[c]
        if c in top_two_cands: 
            continue

        if cand_distances[c][top_two_c1] < cand_distances[c][top_two_c2]: 
            updated_vote_shares[top_two_c1] += manipulation_level * vote_shares[c]
            updated_vote_shares[c] -= manipulation_level * vote_shares[c]
        else: 
            updated_vote_shares[top_two_c2] += manipulation_level * vote_shares[c]
            updated_vote_shares[c] -= manipulation_level * vote_shares[c]

    return updated_vote_shares

In [None]:
def generate_probabilities(
    pl_scores, # dictionary of plurality scores
    num_voters, # number of voters
    concentration_param, # concentration parameter for Dirichlet distribution
    manipulation_level, # proportion of votes to transfer in strategic manipulation
    sprof,
    num_samples = 1000 # number of samples to generate from Dirichlet distribution
    ):

    """
    This function simulates a set of possible election outcomes using Dirichlet distributions over plurality scores to estimate:
      - The probability of each candidate winning the election.
      - The probability of each candidate ranking in the top two.
      - The probability of each candidate winning under strategic manipulation, where an opposition candidate is strategically weakened, and another is bolstered.

    The function returns the following dictionaries: 
        - winner_prob: Probability of each candidate winning the election.
        - top_two_prob: Probability of each candidate ranking in the top two.
        - strategic_winner_prob: Probability of each candidate winning under strategic manipulation.
    """

    candidates = sorted(list(pl_scores.keys()))
    cand_to_cidx = {c: candidates.index(c) for c in candidates}
    winner_events = [[] for c in candidates]
    top_two_events = [[] for c in candidates]

    pl_samples = np.random.dirichlet(
        [((pl_scores[c] + 1) / (num_voters + len(candidates))) * concentration_param # Laplace smoothing
        for c in candidates], 
        size = num_samples)

    num_samples = 0 

    for pl_sample in pl_samples:

        # check if any new_pl_scores is nan
        assert not np.isnan(pl_sample).any(), "Dirichlet sample contains nan values"
            
        new_pl_scores  = list(pl_sample)

        winner_event = largest_pl_score_event(new_pl_scores)
        top_two_event = top_two_pl_score_event(new_pl_scores)

        for cidx, _ in enumerate(candidates):
            winner_events[cidx].append(winner_event[cidx])
            top_two_events[cidx].append(top_two_event[cidx])

    winner_prob = {c: np.mean(winner_events[cidx]) 
                   for cidx, c in enumerate(candidates)}
    top_two_prob = {c: np.mean(top_two_events[cidx]) 
                    for cidx, c in enumerate(candidates)}

    sorted_top_two_probs = sorted(list(set(top_two_prob.values())), reverse=True)
    if len(sorted_top_two_probs) == 1: 
        top_two = random.sample(candidates, k=2)   
    else: 
        top1 = random.choice([c for c in candidates if top_two_prob[c] == sorted_top_two_probs[0]]) 
        top2 = random.choice([c for c in candidates if top_two_prob[c] == sorted_top_two_probs[1]]) 
        top_two = [top1, top2]

    strategic_winner_events = [[] for c in candidates]

    for pl_sample in pl_samples:
        
        pl_scores  = {c: pl_sample[cand_to_cidx[c]] for c in candidates}
        updated_pl_scores = generate_strategic_vote_shares(pl_scores, top_two, sprof, manipulation_level)
        strategic_winner_event = largest_pl_score_event([updated_pl_scores[c] for c in candidates])

        for cidx, _ in enumerate(candidates):
            strategic_winner_events[cidx].append(strategic_winner_event[cidx])

    strategic_winner_prob = {c: np.mean(strategic_winner_events[cidx]) for cidx, c in enumerate(candidates)}

    if verbose: 
        print(f"plurality score vector: {pl_scores}")
        print("\twinner_prob: ", winner_prob)
        print("\ttop_two_prob: ", top_two_prob)
        print("\tstrategic_winner_prob: ", strategic_winner_prob)
    return winner_prob, top_two_prob, strategic_winner_prob

In [None]:
def should_recall_naive(util, prob, candidates, incumbent):
    ranking = sorted(candidates, key=lambda c: util(c), reverse=True) 
    return ranking.index(incumbent) >= len(candidates)/2
    
def should_recall_avg(util, prob, candidates, incumbent): 
    avg_util = np.mean([util(c) for c in candidates])
    return util(incumbent) < avg_util

def should_recall_exp_utility(util, prob, candidates, incumbent): 
    return util(incumbent) < sum([util(c) * prob[c] for c in candidates if c != incumbent])

recall_decision_functions = {
    "expected utility": should_recall_exp_utility,
    "naive": should_recall_naive,
    "average": should_recall_avg
}

def get_probability(probs, prob_type):
    if prob_type == 'sincere':
        return probs[0]
    elif prob_type == 'strategic-top-two':
        return probs[1]
    elif prob_type == 'strategic':
        return probs[2]
    else:
        print("Invalid recall decision type")
        return None

In [None]:
def get_ballot(
    replacement_rule,
    util, 
    prob_for_recall,
    prob_for_vote, 
    incumbent, 
    incumbent_party_candidates,
    opposition_party_candidates,): 

    """
    Get a ballot from a voter based on their recall decision function, utility function, and subjective probabilities.
    """

    available_candidates = [
        c for c in incumbent_party_candidates + opposition_party_candidates 
        if c != incumbent
    ]

    # Make the recall decision
    recall_decision = should_recall_exp_utility(util, prob_for_recall, available_candidates, incumbent)

    # Rank the candidates according to the utility function
    ranking = sorted(available_candidates, key=util, reverse=True)

    # Rank the candidates according prob_for_vote
    prob_ranking = sorted(available_candidates, key=lambda c: prob_for_vote[c], reverse=True)

    if not ranking:
        # No candidates to choose from
        return recall_decision, None

    # Decide on the candidate to vote for
    if replacement_rule == 'sincere':
        # vote for the candidate with the highest utility
        candidate_choice = ranking[0]
    elif replacement_rule == 'strategic-party': # Strategic voting

        if ranking[0] in incumbent_party_candidates:
            candidate_choice = ranking[0]
        else:
            candidate_choice = [c for c in prob_ranking if c in opposition_party_candidates][0]
    elif replacement_rule == 'strategic-top-two':
        top_two = prob_ranking[:2]
        if ranking.index(top_two[0]) < ranking.index(top_two[1]):
            candidate_choice = top_two[0]
        else:
            candidate_choice = top_two[1]
    else:
        raise ValueError("Invalid replacement rule")
        
    return recall_decision, candidate_choice

def election_outcome(ballots, candidates): 
    """
    Determine whether the recall passes and the would-be winner of the election if the recall passes.
    """

    recall_outcome = sum([b[0] for b in ballots]) / len(ballots) > 0.5

    pl_scores = {c: sum([1 for b in ballots if b[1] == c]) for c in candidates}

    max_pl_score = max(pl_scores.values())

    new_winner = random.choice([c for c in candidates if pl_scores[c] == max_pl_score])

    return recall_outcome, new_winner

## The Simulation

In [None]:
for recall_decision_type, replacement_rule, manipulation_level in recall_replacement_decisions: 
    print("manipulation_level", manipulation_level) 
    print("recall_decision_type", recall_decision_type)
    print("replacement_rule", replacement_rule)
    # Iterate over uncertainty degrees
    for concentration_param in concentration_parameters: 
        print(f"Concentration Parameter: {concentration_param}")
        # Initialize data storage
        data_for_df = {
            "pr_recall": [],
            "err_recall_events": [],
            "pr_incumbent_maj_pref_winner_after_recall": [],
            "err_incumbent_maj_pref_winner_after_recall_events": [],
            "pr_incumbent_maj_pref_winner": [],
            "pr_new_winner_incumbent_party": [],
            "pr_new_winner_maj_pref_incumbent_conditional_winner_opposition_party": [],
            "pr_new_winner_maj_pref_incumbent_conditional_winner_incumbent_party_event": [],
            "pr_incumbent_condorcet_winner": [],
            "conditional_pr_incumbent_maj_pref_winner_after_recall": [],
            "err_conditional_pr_incumbent_maj_pref_winner_after_recall": [],
            "num_trials": [],
            "num_samples_for_prob": [],
            "num_cands_incumbent_party": [],
            "num_cands_opposition_party": [],
            "total_num_cands": [],
            "num_voters": [],
            "concentration_param_for_num_voters_left": [],
            "concentration_param_for_num_voters_right": [],
            "num_dims": [],
            "recall_decision_type": [],
            "replacement_rule": [],
            "manipulation_level": [],
            "concentration_parameter": [],
            "min_num_samples_for_subjective_prob": [],
            "max_num_samples_for_subjective_prob": [],
            "binom_ci_threshold_for_events": [],
            "min_num_samples_for_events": [],
            "max_num_samples_for_events": [],
            "std": [],
        }
        # main events
        recall_events = []
        incumbent_maj_pref_winner_after_recall_events = []

        # events for analysis
        new_winner_incumbent_party_event = []

        # Initialize error metrics
        err_recall_events = float('inf')
        err_incumbent_maj_pref_winner_after_recall_events = float('inf')

        # events for analysis
        new_winner_from_incumbent_party_event = []
        new_winner_maj_pref_incumbent_conditional_winner_opposition_party_event = []
        new_winner_maj_pref_incumbent_conditional_winner_incumbent_party_event = []
        num_incumbent_maj_pref_winner = 0
        num_incumbent_condorcet_winner = 0
        n_trials = 0 

        # Run simulations until convergence criteria are met
        with tqdm(total=max_num_samples_for_events, desc="Simulation", unit="trial") as pbar:
            while (n_trials < min_num_samples_for_events or 
                ((err_recall_events > binom_ci_threshold_for_events or 
                    err_incumbent_maj_pref_winner_after_recall_events > binom_ci_threshold_for_events) and 
                    n_trials < max_num_samples_for_events)):
                

                # Generate voter clusters
                v_size_incumbent, v_size_opposition = np.random.dirichlet([concentration_param_for_num_voters_left, concentration_param_for_num_voters_right])

                # Generate spatial profile

                # Generate candidate clusters
                cand_clusters = [
                    ((-1, 0), generate_covariance(num_dims, std, 0), num_cands_incumbent_party),  # Incumbent party
                    ((1, 0), generate_covariance(num_dims, std, 0), num_cands_opposition_party),  # Opposition party
                ]

                voter_clusters = [
                    ((-1, 0), generate_covariance(num_dims, std, 0), int(num_voters * v_size_incumbent)),  # Incumbent voters
                    ((1, 0), generate_covariance(num_dims, std, 0), int(num_voters * v_size_opposition)),  # Opposition voters
                ]

                # Generate spatial and utility profiles
                sprof = generate_spatial_profile_polarized(cand_clusters, voter_clusters, cluster_types = ["incumbent party", "opposition party"])
                uprof = sprof.to_utility_profile(utility_function=linear_utility)
                prof = uprof.to_ranking_profile()

                ### end generate spatial profile

                ### 
                incumbent = prof.candidates[0]

                prof_restricted = prof.remove_candidates([incumbent])
                pl_scores_restricted_profile = prof_restricted.plurality_scores()
                
                all_num_samples_for_prob = [get_num_samples(min_num_samples_for_subjective_prob, max_num_samples_for_subjective_prob) for _ in uprof.utilities]

                if verbose: 
                    print("the incumbent candidate is ", incumbent)
                    print("Incumbent party candidates: ", [c for c in prof.candidates if sprof.candidate_type(c) == "incumbent party"])
                    print("Opposition cumbent party candidates: ", [c for c in prof.candidates if sprof.candidate_type(c) == "opposition party"])
                # find_subjective_prob
                probs_utilities = [(generate_probabilities(
                    pl_scores_restricted_profile, 
                    prof.num_voters, 
                    concentration_param, 
                    manipulation_level,
                    sprof,
                    num_samples=all_num_samples_for_prob[uidx],
                ), u) for uidx, u in enumerate(uprof.utilities)]
                
                num_samples_for_prob = np.average(all_num_samples_for_prob) 
                
                recall_passes, new_winner = election_outcome([
                    get_ballot(
                        replacement_rule,
                        util, 
                        get_probability(probs, recall_decision_type),
                        get_probability(probs, replacement_rule), 
                        incumbent, 
                        [c for c in prof.candidates if sprof.candidate_type(c) == "incumbent party"], 
                        [c for c in prof.candidates if sprof.candidate_type(c) == "opposition party"], 
                    ) for probs, util in probs_utilities
                ], prof_restricted.candidates)

                if verbose: 
                    print(f"the recall passed: {recall_passes}")
                    print(f"the new winner is ", new_winner)
                num_incumbent_maj_pref_winner += int(prof.majority_prefers(incumbent, new_winner))
                cw = prof.condorcet_winner()    
                num_incumbent_condorcet_winner += int(cw == incumbent)
                

                # Record recall events
                if recall_passes:
                    recall_events.append(1)
                    incumbent_maj_pref_winner_after_recall_events.append(
                        int(prof.majority_prefers(incumbent, new_winner))
                    )

                else:
                    recall_events.append(0)
                    incumbent_maj_pref_winner_after_recall_events.append(0)
                
                new_winner_incumbent_party_event.append(int(sprof.candidate_type(new_winner) == "incumbent party"))

                if sprof.candidate_type(new_winner) != "incumbent party": 
                    new_winner_maj_pref_incumbent_conditional_winner_opposition_party_event.append(int(prof.majority_prefers(new_winner, incumbent)))

                if sprof.candidate_type(new_winner) == "incumbent party": 
                    new_winner_maj_pref_incumbent_conditional_winner_incumbent_party_event.append(int(prof.majority_prefers(new_winner, incumbent)))

                # Update error estimates
                low_err_recall_events, upper_err_recall_events = binomial_confidence_interval(recall_events)
                err_recall_events = upper_err_recall_events - low_err_recall_events

                if incumbent_maj_pref_winner_after_recall_events:
                    low_err_incumbent, upper_err_incumbent = binomial_confidence_interval(
                        incumbent_maj_pref_winner_after_recall_events
                    )
                    err_incumbent_maj_pref_winner_after_recall_events = upper_err_incumbent - low_err_incumbent
                else:
                    err_incumbent_maj_pref_winner_after_recall_events = float('inf')

                n_trials += 1
                pbar.update(1)

        # Record parameters and results
        data_for_df["pr_recall"].append(np.mean(recall_events))
        data_for_df["err_recall_events"].append(err_recall_events)

        data_for_df["pr_incumbent_maj_pref_winner_after_recall"].append(
            np.mean(incumbent_maj_pref_winner_after_recall_events) if len(incumbent_maj_pref_winner_after_recall_events) > 0 else 0
        )
        
        data_for_df["err_incumbent_maj_pref_winner_after_recall_events"].append(err_incumbent_maj_pref_winner_after_recall_events)

        conditional_events = [i for r, i in zip(recall_events, incumbent_maj_pref_winner_after_recall_events) if r == 1]

        data_for_df["conditional_pr_incumbent_maj_pref_winner_after_recall"].append(np.mean(conditional_events))
        low_err_cond_event, upper_err_cond_event = binomial_confidence_interval(conditional_events)
        data_for_df["err_conditional_pr_incumbent_maj_pref_winner_after_recall"].append(upper_err_cond_event - low_err_cond_event)

        data_for_df["pr_new_winner_incumbent_party"].append(np.mean(new_winner_incumbent_party_event))

        data_for_df["pr_new_winner_maj_pref_incumbent_conditional_winner_opposition_party"].append(np.mean(new_winner_maj_pref_incumbent_conditional_winner_opposition_party_event))

        data_for_df["pr_new_winner_maj_pref_incumbent_conditional_winner_incumbent_party_event"].append(np.mean(new_winner_maj_pref_incumbent_conditional_winner_incumbent_party_event))
        data_for_df["pr_incumbent_maj_pref_winner"].append(num_incumbent_maj_pref_winner / n_trials)
        data_for_df["pr_incumbent_condorcet_winner"].append(num_incumbent_condorcet_winner / n_trials)
        data_for_df["num_trials"].append(n_trials)
        data_for_df["num_samples_for_prob"].append(num_samples_for_prob)
        data_for_df["num_cands_incumbent_party"].append(num_cands_incumbent_party)
        data_for_df["num_cands_opposition_party"].append(num_cands_opposition_party)
        data_for_df["total_num_cands"].append(len(prof.candidates))
        data_for_df["num_voters"].append(prof.num_voters)
        data_for_df["concentration_param_for_num_voters_left"].append(concentration_param_for_num_voters_left)
        data_for_df["concentration_param_for_num_voters_right"].append(concentration_param_for_num_voters_right)
        data_for_df["num_dims"].append(num_dims)
        data_for_df["recall_decision_type"].append(recall_decision_type)
        data_for_df["replacement_rule"].append(replacement_rule)
        data_for_df["manipulation_level"].append(manipulation_level)
        data_for_df["concentration_parameter"].append(concentration_param)
        data_for_df["min_num_samples_for_subjective_prob"].append(min_num_samples_for_subjective_prob)
        data_for_df["max_num_samples_for_subjective_prob"].append(max_num_samples_for_subjective_prob)
        data_for_df["binom_ci_threshold_for_events"].append(binom_ci_threshold_for_events)
        data_for_df["min_num_samples_for_events"].append(min_num_samples_for_events)
        data_for_df["max_num_samples_for_events"].append(max_num_samples_for_events)
        data_for_df["std"].append(std)

        df = pd.DataFrame(data_for_df)

        if not verbose:
            df.to_csv(f"{csvfile}_{manipulation_level}_{recall_decision_type}_{replacement_rule}_{concentration_param}_{std}_{num_cands_incumbent_party}_{num_cands_opposition_party}.csv", index=False)
        else: 
            print("file not saved since verbose is on")