# Notebook for _Nyquist's Ranked Choice Voting Tabulator_

This is my sandbox for exploring the results of [ranked choice voting](https://en.wikipedia.org/wiki/Ranked_voting) elections. Learn more at the [GitHub Repository](https://github.com/KalebNyquist/ranked-choice-tabulator).

> 💡 Looking for data to experiment with? Try the [Ranked Choice Voting Resource Center's Data Clearinghouse](https://www.rcvresources.org/data-clearinghouse).

## Navigation
* [Functional Code](#Functional-Code)
    * [Import Libraries & At-Large Functions](#Import-Libraries-&-At-Large-Functions)
    * [Select Configuration for Reading Cast Vote Records](#Select-Configuration-for-Reading-Cast-Vote-Records)
        * [Configuration Wizard `cfg_wizard()`](#Configuration-Wizard-cfg_wizard())
        * [Load and Save Configurations from Memory `load_cfgs()`](#Load-and-Save-Configurations-from-Memory-load_cfgs())
    * [Tabulate Election Results](#Tabulate-Election-Results)
    * [Visualize Election Results](#Visualize-Election-Results)
    * [Narrate Election Results](#Narrate-Election-Results) `narrate_results()`
    * [Check for Non-Monotoniciy](#Check-for-Non-Monotonicity) `monotonicity_check()`
* [Execution Code](#Execution-Code)

# Functional Code
This section defines the Python functions, global variables, and libraries/packages needed to execute the code. Every cell needs to be run prior to executing the code but nothing significant will happen until [the execution code](#Execution-Code) is run afterwards.

## Import Libraries & At-Large Functions

When using Google Colab, un-comment the lines below to install the necessary libraries

In [None]:
# !pip install parse
# !pip install num2words

In [None]:
import pandas as pd
import numpy as np
from parse import parse
import json
import os
from math import ceil, floor
from collections import Counter
from num2words import num2words

# cfg_wizard
import pathlib

# presentation
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import matplotlib.patches as patches

# non-monotonicity check
from itertools import permutations, combinations

## Select Configuration for Reading Cast Vote Records
### Configuration Wizard `cfg_wizard()`

In [None]:
def get_path_to_election_data():
    """ - [ ] get source"""
    try:
        current_location = pathlib.Path(__file__).parent.resolve()
    except NameError:
        current_location = pathlib.Path().absolute().__str__().replace("\\", "/")
    return current_location + '/election_data'


def get_data_location():
    """Guides user where to copy and input file containing cast vote record data."""
    
    print("To start, add the file or folder containing the data to the `election_data` folder found here:")
    print("     ", get_path_to_election_data(), "\n")
    
    data_storage = input("Did you add a (1) file or (2) folder? ")
    while data_storage not in ["1", "2"]:
        print(data_storage, "is invalid input")
        data_storage = input("Did you add a (1) file or (2) folder? ")
        
    if data_storage == "1":
        print("Great! What is the file name (including extension)?")
        file_name = input("File Name: ")
        return file_name, None
        
    elif data_storage == "2":
        print("Alright! What is the folder name?")
        folder_name = None
        while folder_name is None:
            tentative_folder_name = input("Folder Name: ")
            if tentative_folder_name in [x.name for x in os.scandir('election_data') if x.is_dir()]:
                folder_name = tentative_folder_name
                break
            else:
                print(tentative_folder_name, "not found in /election_data folder. Try again.")
        print(f"Sounds good. What is the extension of the data files in the folder? ")
        
        files_found = []
        while len(files_found) == 0: 
            extension = input("File Extension (for example, .csv): ")
            extension = f".{extension}".replace("..", ".")
            file_name = folder_name + "/*" + extension

            file_path, extension = file_name.split("*")

            for file in os.listdir("election_data/{}".format(file_path)):
                if file.endswith(extension):
                    files_found.append(file)
            if len(files_found) == 0:
                print(f"No files found with extension {extension}. Make sure you inputted the extension correctly (case-sensitive!).")
                
        print(len(files_found), "files found:", ", ".join(files_found))
        
        return file_name, files_found
    

def get_record_structure():
    """Guides user on how to identify & specify the structure of the cast vote record data."""
    
    record_structures = {
        "field" : "When there are '1st Place', '2nd Place', etc. columns with candidate IDs as the value in each.",
        "value" : "When there are 'Ranking A' + 'Candidate A' columns, then 'Ranking B' + 'Candidate B', columns, etc. as rankings and candidate IDs are paired as separate but corresponding columns.",
        "array" : "When all the rankings + candidates information are stored together as a list. For example: ['(1) Candidate A', '(2) Candidate B', etc.]"
    }
    
    print("\nOkay, here comes the tricky part. There are three known ways to structure RCV data:")
    
    for format_, description in record_structures.items():
        print("    `{}` — {}".format(format_, description))

    record_structure = input("Enter how your is data structured [field, value, or array]? ").lower()
    while record_structure not in record_structures:
        print(record_structure, "is invalid input")
        record_structure = input("Enter how your is data structured [field, value, or array]? ").lower()
        
    return record_structure


def get_header_row():
    """Asks user to idenify a valid header row"""
    
    print("\nIs there a header row that contains column names?")
    
    header_row = None
    
    while header_row is None:
        header_row = input("Instructions: Enter header row (leave empty for none, 0 for top row, 1 for second row, etc.): ")
        if header_row == "":
            pass
        else:
            try:
                header_row = int(header_row)
            except ValueError:
                print(header_row, "is invalid input. Enter an integer (0, 1, 2, etc.) or leave blank.")
                header_row = None
    
    if header_row == "":
        header_row = None
    
    return header_row


def get_unique_id(header_row):
    """Asks user to select a column that identifies each ballot uniquely (if it exists)"""
    
    # - [ ] change to the input system used for other columns
    
    print("\n**OPTIONAL**: Is there a column that identifies each ballot uniquely? (i.e., no duplicates)")
    
    unique_id = None
    
    if header_row is None:
        print("Instructions: Because you identified no header row, column names will be 0-indexed integers.")
        while unique_id is None:
            unique_id = input("Enter unique id as integer (or leave empty for none): ")
            if unique_id == "":
                pass
            else:
                try:
                    unique_id = int(unique_id)
                except ValueError:
                    print(unique_id, "is invalid input. Enter an integer (0, 1, 2, etc.) or leave blank.")
                    unique_id = None
    if type(header_row) is int:
        print("Instructions: Because you identified a header row, use column names from the header row.")
        unique_id = input("Enter unique id (or leave empty for none): ")
        
    if unique_id == "":
        unique_id = None
    
    return unique_id


def get_tied_symbol():
    """Asks user to select a character that identifies tied candidates within the same field value."""
    
    print("\nIs there a symbol that identifies candidates that are equally ranked on the same ballot?")
    print("For example, if 'A=B' denotes a tie between Candidate A & Candidate B, then the `tied symbol` is '='.")
    
    tied_symbol = input("Tied symbol (leave blank for none): ")
    if tied_symbol == "":
        tied_symbol = None
    
    return tied_symbol


def get_file_delimiter():
    """Asks user to identify the spreadsheet's delimiter."""
    print("\nWhat is the delimiter character for the spreadsheet? (often, but not always, a comma)")
    file_delimiter = input("Spreadsheet delimiter: ")
    return file_delimiter


def load_data(file_path, header_row, file_delimiter):
    """Loads cast vote record data using the appropriate `pandas` module according to the extension"""
    
    extension = file_path.split(".")[-1].lower()
    check_and_create_dirs()
    
    if extension == "csv":
        print("Loading .csv file…", end="\r")
        election_df = pd.read_csv(file_path,
                                  low_memory=False,
                                  header=header_row,
                                 )
    elif extension in ['xls', 'xlsx']:
        print("Loading Excel file…", end="\r")
        election_df = pd.read_excel(file_path,
                                    header=header_row,
                                   )
    elif extension in ['txt', 'prm']:
        print("Loading text table file…", end="\r")
        election_df = pd.read_table(file_path,
                                    header = header_row,
                                    delimiter = file_delimiter,
                                   )
    else:
        raise Exception
        
    return election_df


def print_columns(columns_dict):
    """Prints a dictionary of column index numbers and names to ease selecting the appropriate column"""
    print("\nThe following columns are in the data:")
    for i, column in columns_dict.items():
        print(f"    [{i}] {column}")
        
        
def get_field_columns(columns_dict):
    """Guides the user to identify the appropiate columns when encountering a `field` record structure"""
    
    print_columns(columns_dict)
    field_columns = []

    i = 1
    while True:
        w = num2words(i, ordinal = True)
        field_col_input = input(f"Enter the column # where the {w} place candidate is listed for each row (or leave blank to finish): ")
        
        if field_col_input == "":
            break
        
        try:
            field_col = columns_dict[int(field_col_input)]
        except Exception:
            print("Error: enter listed value")
            continue
        
        field_columns.append(field_col)
        i += 1

    return field_columns


def get_value_columns(columns_dict):
    """Guides the user to identify the appropiate columns when encountering a `value` record structure"""
    
    print_columns(columns_dict)
    field_columns = []
    value_columns = []
    
    i = 1
    while True:
        w = num2words(i, ordinal = True)
        value_col_input = input(f"Enter the column # where a candidate is {w}-listed (in terms of data structure, not actual rank) for each row (or leave blank to finish): ")
        if value_col_input == "":
            break
        
        try:
            value_col = columns_dict[int(value_col_input)]
        except Exception:
            print("Error: enter listed value")
            continue
        
        while True:
            field_col_input = input(f"Enter the column # where the candidate in column `{value_col}` is ranked for each row.")
            try:
                field_col = columns_dict[int(field_col_input)]
                break
            except Exception:
                print("Error: enter listed value")
        
        value_columns.append(value_col)
        field_columns.append(field_col)
        
        i += 1
        
    return field_columns, value_columns


def cfg_wizard():
    """Text-based guide written in conversational English to aid setup of a configuration for reading cast vote record data (`read_cfg`)."""
    
    print("🧙 Welcome to the Configuration Wizard!")
    print("I am a step-by-step guide for setting the parameters necessary for reading your RCV data into the program.\n")
    
    check_and_create_dirs()
    file_name, files_found = get_data_location()
    file_delimiter = get_file_delimiter()
    header_row = get_header_row()
    
    
    if "*" in file_name:
        file_path = "election_data/" + file_name.split("*")[0] + files_found[0]
    elif "*" not in file_name:
        file_path = "election_data/" + file_name
        
    election_df = load_data(file_path, header_row, file_delimiter)
    columns_dict = {i + 1 : col for i, col in enumerate(election_df.columns)}
    
    record_structure = get_record_structure()
    unique_id = get_unique_id(header_row)
    tied_symbol = get_tied_symbol()

    read_cfg = {"File Name" : file_name, 
            "Record Structure" : record_structure,
            "File Delimiter" : file_delimiter,
            "Header Row" : header_row,
            "Unique Record ID" : unique_id,
            "Tied Symbol" : tied_symbol}
    
    if record_structure == "field":
        field_columns = get_field_columns(columns_dict)
        read_cfg.update({"Ranking Field Values" : field_columns})
    elif record_structure == "values":
        field_columns, value_columns = get_value_columns(columns_dict)
        read_cfg.update({"Ranking Field Values" : field_columns,
                         "Candidate Field Values" : value_columns})
    elif record_structure == "array":
        # - [ ] Make same quality as above
        array_column = input("Type name of array column: ")
        array_delimiter = input("Type delimiter used for array elements: ")
        array_template = input("Type template used for array element using template tags, e.g. '{candidate}.[{rank}]' = 'CandidateB.[3]' : ")
        
        if "{candidate}" not in array_template:
            print("Invalid array template. Needs `{candidate}` in string.")
            raise Exception
        if "{rank}" not in array_template:
            print("Invalid array template. Needs `{rank}` in string.")
            raise Exception
        
        read_cfg.update({"Array Column" : array_column,
                         "Array Delimiter" : array_delimiter,
                         "Array Template" : array_template})
    
    
    # - [ ] add ballots to skip
    # - [ ] add candidates to skip
    
    cfg_nickname = input("Give this configuration a nickname (leave blank to skip): ")
    if cfg_nickname != "":
        read_cfg.update({"Configuration Nickname" : cfg_nickname})
        
    
    return read_cfg


def candidate_name_wizard(ballots):
    """Text-based guide written in conversational English to aid setup of a map between candidate IDs and their names as they should be displayed."""
    
    print('🧙‍♂️ Welcome to the candidate name wizard!\nI am going to provide you the `ids` of candidates (i.e., as they are shown in the data), and you are going to give me their "real name" (i.e., what you want shown in the visual/verbal outputs).\n')
    
    candidate_names = {}
    print("Loading candidate names...", end="\r")
    
    for candidate_id in determine_all_candidates(ballots):
        candidate_names.update({candidate_id : input(f"Name for `{candidate_id}`?")})
        
    return candidate_names


def get_candidate_to_name_map(use_candidate_ids_as_names = True):
    """Creates a candidate ID to name map, with the option of having the names for presentation be equal to IDs used for internal processing."""

    if use_candidate_ids_as_names:
        name_map = {x : x for x in determine_all_candidates(ballots)}
    if use_candidate_ids_as_names is False:
        candidate_names = candidate_name_wizard(ballots)
        view_cfg = {"Election Name" : input("Election Name: "),
                "Candidate Names" : candidate_names}
        save_cfg(view_cfg, which='view')
        name_map = view_cfg['Candidate Names']
        
    return name_map


### Load and Save Configurations from Memory `load_cfgs()`

In [None]:
def check_and_create_dirs(check_for = ['election_data']):
    """Makes sure necessary directories exist, and if not create them."""
    for required_dir in check_for:
        if required_dir not in os.listdir():
            os.mkdir(required_dir)

            
def save_cfg(cfg_to_save, which = "read"):
    """Saves a configuration as a subdocument in a JSON file that stores all local configurations. A backup file is also created to mitigate loss when an error or crash is encountered in the process of saving."""
    
    try:
        with open(f'{which}_cfgs.json', mode='r') as load_cfgs:
            cfgs = json.load(load_cfgs)
    except FileNotFoundError:
        cfgs = []
    
    with open(f'{which}_cfgs_backup.json', mode="w") as backup_cfgs:
        json.dump(cfgs, backup_cfgs)
    
    if cfg_to_save in cfgs:
        print("⚠️ Identical configuration already exists. No change made.")
    elif cfg_to_save not in cfgs:
        cfgs.append(cfg_to_save)
    
        with open(f'{which}_cfgs.json', mode='w') as save_cfgs:
            json.dump(cfgs, save_cfgs)

    return cfgs


def load_cfgs(which = "read"):
    """Guides the user to load a locally stored configuration. The `cfg_wizard()` can also be accessed through this interface."""
    try:
        with open(f'{which}_cfgs.json') as read_cfgs:
            cfgs = json.load(read_cfgs)
    except FileNotFoundError:
        cfgs = []
    
    cfgs_dict = {}
    cfg_selection = "0"
    no_name = "????? (No Nickname or File Name)"
    
    left_margin = len(str(len(cfgs)))
    
    print("# |".rjust(left_margin + 3), "Saved Configurations\n—————————————————————————————————————————————————————————————————————————")
    for i, cfg in enumerate(cfgs, start = 1):
        str_i = f"{i} |".rjust(left_margin + 3)
        cfgs_dict.update({str(i) : cfg})
        if 'Configuration Nickname' in cfg:
            print(str_i, cfg['Configuration Nickname'])
        if 'Election Name' in cfg:
            print(str_i, cfg['Election Name'])
        elif 'File Name' not in cfg:
            print(str_i, no_name)
        elif 'Configuration Nickname' not in cfg:
            print(str_i, cfg['File Name'])
    print("+ |".rjust(left_margin + 3), "🧙 Create new configuration using wizard")
    
    print("\n", end="")
    cfg_selection = input("Load Configuration File #: ")
    while cfg_selection not in cfgs_dict.keys():
        if cfg_selection == "+":
            print("\n", end="")
            read_cfg = cfg_wizard()
            save_cfg(read_cfg)
            cfgs_dict.update({"N" : read_cfg})
            cfg_selection = "N"
            break
        
        if len(cfgs_dict) > 0:
            print(f"File # not found. Select # between {min(cfgs_dict)}-{max(cfgs_dict)}, or select `+` to create new configuration.")
        elif len(cfgs_dict) == 0:
            print(f"File # not found. Select `+` to create new configuration.")
        
        cfg_selection = input("Load Configuration File #: ")
        
    read_cfg = cfgs_dict[cfg_selection]
    print("✔", read_cfg.get('Configuration Nickname',
                            read_cfg.get('File Name',
                                         read_cfg.get('Election Name',
                                         no_name))), "configuration selected.")
    
    return read_cfg

## Tabulate Election Results

In [None]:
def load_election_data(file_path):
    """Loads cast vote record data using the appropriate `pandas` module according to the extension"""
    # - [ ] Similar to load data for configuration wizard, worthwhile to merge
    
    extension = file_path.split(".")[-1].lower()
    check_and_create_dirs()
    
    if extension == "csv":
        print(f"Loading .csv file: {file_path}".ljust(20), end="\r")
        election_df = pd.read_csv(file_path,
                                  low_memory=False,
                                  header=read_cfg['Header Row']
                                 )
    elif extension in ['xls', 'xlsx']:
        print(f"Loading Excel file: {file_path}".ljust(20), end="\r")
        election_df = pd.read_excel(file_path,
                                    header=read_cfg['Header Row']
                                   )
    elif extension in ['txt', 'prm']:
        print(f"Loading text table file: {file_path}".ljust(20), end="\r")
        election_df = pd.read_table(file_path,
                                    header = read_cfg['Header Row'],
                                    delimiter = read_cfg.get('File Delimiter'),
                                    skiprows = read_cfg.get('Skip Rows'),
                                   )
    #elif json
    else:
        raise Exception
        
    return election_df


def clean_election_dataframe(read_cfg, election_df):
    """Cleans the dataframe of cast vote records by removing non-unique records and ballots that the read configuration designates to skip"""
    if read_cfg['Unique Record ID'] is not None:
        election_df = election_df[election_df[read_cfg["Unique Record ID"]].notna()]

        skip_ballots = read_cfg.get('Skip Ballots', [])
        for skip_ballot in skip_ballots:
            election_df = election_df[election_df[read_cfg["Unique Record ID"]] != skip_ballot]

    election_df = election_df.fillna(np.nan).replace([np.nan], [None])
    return election_df


def get_election_data_as_dataframe(read_cfg):
    """Gets the cast vote records as a pandas dataframe"""
    if "*" in read_cfg['File Name']:

        file_paths = []

        file_path, extension = read_cfg['File Name'].split("*")
        file_path = f"election_data/{file_path}"

        for file in os.listdir(file_path):
            if file.endswith(extension):
                file_paths.append("{}{}".format(file_path, file, extension))

        election_df = pd.concat([load_election_data(file_path) for file_path in file_paths])

    elif type(read_cfg['File Name']) == str:
        file_path = f"election_data/{read_cfg['File Name']}"
        election_df = load_election_data(file_path)
    print("✔ Loaded!".ljust(80))
    
    election_df = clean_election_dataframe(read_cfg, election_df)
    print("✔ Cleaned!".ljust(30))
    
    return election_df


def generate_by_value_columns_of_interest(read_cfg):
    """Gets the names of columns used to extract relevant data from the dataframe when encountering a `value` data stucture. Note that `cand` (used throughout) is short for 'candidate'."""
    rank_columns = {}
    cand_columns = {}
    
    for i, rank_value in enumerate(read_cfg['Ranking Field Values']):
        if read_cfg.get('Ranking Field Template', False):
            rank_columns.update({read_cfg['Ranking Field Template'].format(ranking_field_value = rank_value) : f"Rank {i}"})
        else:
            rank_columns.update({rank_value : f"Rank {i}"})
        
        if "{ranking_field_value}" in read_cfg.get('Candidate Field Template', False):
            cand_columns.update({read_cfg['Candidate Field Template'].format(ranking_field_value = rank_value) : f"Cand {i}"})

    if "{candidate_field_value}" in read_cfg.get('Candidate Field Template', False):
        for i, cand_value in enumerate(read_cfg['Candidate Field Values']):
            cand_columns.update({read_cfg['Candidate Field Template'].format(candidate_field_value = cand_value) : f"Cand {i}"})
    
    return rank_columns, cand_columns
        

def generate_by_field_columns_of_interest(read_cfg):
    """Gets the names of columns used to extract relevant data from the dataframe when encountering a `field` data stucture."""
    rank_scores = {}
    cand_columns = {}
    
    for i, rank_value in enumerate(read_cfg['Ranking Field Values'], start = 1):
        
        rank_scores.update({f"Rank {i}" : i})
        if read_cfg.get('Ranking Field Template', False):
            cand_columns.update({read_cfg['Ranking Field Template'].format(ranking_field_value = rank_value) : f"Cand {i}"})
        else:
            cand_columns.update({rank_value : f"Cand {i}"})
        
    return rank_scores, cand_columns


longest_array = 0

def generate_ballot_by_array(election_record, read_cfg):
    """Creates a ballot when encoutering an `array` data structure, according to the ballot data schema used internally."""
    global longest_array
    
    if read_cfg['Record Structure'] != "array":
        raise Exception
    
    ballot = {}
    
    if "Unique Record ID" in read_cfg:
        try:
            ballot.update({"Record ID" : election_record[read_cfg["Unique Record ID"]]})
        except KeyError:
            ballot.update({"Record ID" : election_record[int(read_cfg["Unique Record ID"])]})
    
    try:
        raw_vote = election_record[read_cfg["Array Column"]]
    except KeyError:
        raw_vote = election_record[int(read_cfg["Array Column"])]
    
    if type(raw_vote) == list:
        vote_array = raw_vote
    elif type(raw_vote) == str:
        vote_array = raw_vote.split(read_cfg["Array Delimiter"])
    elif raw_vote is None:
        vote_array = []
    else:
        raise Exception(ballot.get("Record ID"), type(raw_vote))
    
    i = 0
    for vote in vote_array:
        vote = vote.strip()
        deconstructed_vote = parse(read_cfg['Array Template'], vote)
        if deconstructed_vote is None:
            continue
        ballot.update({f"Rank {i}" : deconstructed_vote['rank']})
        ballot.update({f"Cand {i}" : deconstructed_vote['candidate']})

        i += 1
        if i > longest_array:
            longest_array = i
    
    return ballot


def get_ballots_and_column_maps(read_cfg, election_df):
    """Creates dictionaries used to map voter rankings with corresponding candidate ids."""
    columns_of_interest = {}

    if read_cfg['Record Structure'] == "value":
        rank_columns, cand_columns = generate_by_value_columns_of_interest(read_cfg)    
        for cols in [rank_columns, cand_columns]:
            columns_of_interest.update(cols)
        if read_cfg.get("Unique Record ID",) is not None: 
            columns_of_interest.update({read_cfg["Unique Record ID"] : "Record ID"})
        ballots = election_df[columns_of_interest].rename(columns = columns_of_interest).to_dict("records")

    elif read_cfg['Record Structure'] == "field":
        rank_scores, cand_columns = generate_by_field_columns_of_interest(read_cfg)
        rank_columns = dict(zip(rank_scores.keys(), rank_scores.keys()))
        for cols in [cand_columns]:
            columns_of_interest.update(cols)
        ballots = election_df[columns_of_interest].rename(columns = columns_of_interest).to_dict("records")
        for ballot in ballots:
            ballot = ballot.update(rank_scores)

    elif read_cfg['Record Structure'] == "array":
        election_records = election_df.to_dict("records")
        ballots = [generate_ballot_by_array(election_record, read_cfg) for election_record in election_records]
        rank_columns = {f"Rank {i}" : f"Rank {i}" for i in range(0, longest_array)}
        cand_columns = {f"Cand {i}" : f"Cand {i}" for i in range(0, longest_array)}

    rank_to_cand_map = dict(zip(rank_columns.values(), cand_columns.values()))
    cand_to_rank_map = {v : k for k, v in rank_to_cand_map.items()}

    print( "✔ Mapped!")
    
    return ballots, rank_to_cand_map, cand_to_rank_map, rank_columns, cand_columns


def get_readable_votes(ballot):
    """Adds a series of  'Vote #' fields to each ballot that combines the information from the 'Rank #' and 'Cand #' columns"""
    votes = {}
    
    for rank_column in rank_columns.values():

        try:
            rank_val = ballot.get(rank_column, None)
            if rank_val:
                rank_num = int(rank_val)
                if rank_column not in votes:
                    votes.update({rank_num : []})
            elif rank_val is None:
                continue
        except TypeError:
            print(rank_column, ballot)
            raise Exception
        except ValueError:
            continue

        candidate = ballot[rank_to_cand_map[rank_column]]
        
        if read_cfg['Tied Symbol']:
            if candidate is None:
                votes[rank_num].append(candidate)
            elif read_cfg['Tied Symbol'] in candidate:
                deconstructed_candidates = candidate.split(read_cfg['Tied Symbol'])
                for deconstructed_candidate in deconstructed_candidates:
                    votes[rank_num].append(deconstructed_candidate)
            elif read_cfg['Tied Symbol'] not in candidate:
                votes[rank_num].append(candidate)
                
        elif read_cfg['Tied Symbol'] is None:
            votes[rank_num].append(candidate)

    for vote, candidates in votes.items():
        ballot.update({"Vote {}".format(vote) : candidates})
    
    return ballot


def process_all_ballots(ballots):
    """Wraps the per-ballot `get_readable_votes()` function in a manner that processes all ballots."""
    
    for ballot in ballots:
        ballot = get_readable_votes(ballot)
    print("✔ Processed!")
    
    return ballots


def get_top_choice(ballot, elims):
    """Gets the top choice for a ballot after factoring in which candidates have been eliminated from consideration."""
    i = 1
    choice = None
    while choice is None:
        
        if f"Vote {i}" not in ballot:
            return None
        
        choices = ballot[f"Vote {i}"].copy()
        
        for elim in elims:
            if elim in choices:
                choices.remove(elim)
        
        if len(choices) >= 2:
            choice = choices
            return choice
        
        elif len(choices) == 1:
            [choice] = choices
            return choice
        
        elif len(choices) == 0:
            i += 1
            
        else:
            raise Exception("Stuck in 'No Choice' While Loop")
            

def get_total_votes(round_, include_nones = False):
    """Gets the total number of votes in a given round."""
    if include_nones:
        total_votes = sum(dict(round_).values())
    elif include_nones is False:
        total_votes = sum([votes for candidate, votes in dict(round_).items() if candidate is not None])
    return total_votes


def contest_round(round_):
    """Identify the top and bottom-ranked candidates for each ("instant-runoff") round to determine who should be eliminated and if anyone won the election in that round."""
    
    total_votes = get_total_votes(round_)
    win_threshold = floor((total_votes / 2) + 1)
    
    round_winner = {"candidate": None, "votes" : 0}
    round_loser = {"candidate": None, "votes" : total_votes}

    for candidate, votes in round_.items():

        if candidate is None:
            continue

        if votes > round_winner["votes"]:
            round_winner = {"candidate" : candidate, "votes" : votes}
        elif votes == round_winner["votes"]:
            if round_winner["candidate"] is None:
                round_winner = {"candidate" : candidate, "votes" : votes}
            elif type(round_winner["candidate"]) == list:
                round_winner["candidate"].append(candidate)
            else:
                round_winner["candidate"] = [round_winner["candidate"], candidate]

        if votes < round_loser["votes"]:
            round_loser = {"candidate" : candidate, "votes" : votes}
        elif votes == round_loser["votes"]:
            if round_loser["candidate"] is None:
                round_loser = {"candidate" : candidate, "votes" : votes}
            elif type(round_loser["candidate"]) == list:
                round_loser["candidate"].append(candidate)
            else:
                round_loser["candidate"] = [round_loser["candidate"], candidate]

    if round_winner["votes"] >= win_threshold:
        winner_to_crown = round_winner
    elif round_winner["votes"] < win_threshold:
        winner_to_crown = None

    loser_to_eliminate = round_loser
    
    return loser_to_eliminate, round_winner, winner_to_crown, total_votes, win_threshold


def contest_favorite_betrayal_round(round_, forced_elim, forced_elim_order):
    """Variation of `contest_round()` used for identifying possible non-monotonicity in `monotonicity_check()`."""
    
    total_votes = get_total_votes(round_)
    win_threshold = floor((total_votes / 2) + 1)
    
    round_winner = {"candidate": None, "votes" : 0}
    round_loser = {"candidate": None, "votes" : total_votes}
    
    for candidate, votes in round_.items():

        if candidate is None:
            continue  
        elif candidate == forced_elim:
            forced_loser = {"candidate" : candidate, "votes" : votes}
        elif candidate not in forced_elim_order:
            actual_winner = {"candidate" : candidate, "votes" : votes}

        if votes > round_winner["votes"]:
            round_winner = {"candidate" : candidate, "votes" : votes}
        elif votes == round_winner["votes"]:
            if round_winner["candidate"] is None:
                round_winner = {"candidate" : candidate, "votes" : votes}
            elif type(round_winner["candidate"]) == list:
                round_winner["candidate"].append(candidate)
            else:
                round_winner["candidate"] = [round_winner["candidate"], candidate]
        
        if votes < round_loser["votes"]:
            round_loser = {"candidate" : candidate, "votes" : votes}
        elif votes == round_loser["votes"]:
            if round_loser["candidate"] is None:
                round_loser = {"candidate" : candidate, "votes" : votes}
            elif type(round_loser["candidate"]) == list:
                round_loser["candidate"].append(candidate)
            else:
                round_loser["candidate"] = [round_loser["candidate"], candidate]

    if forced_elim_order[-1] == forced_elim:
        if actual_winner['votes'] > forced_loser['votes']:
            loser_to_eliminate = forced_loser
        elif actual_winner['votes'] == forced_loser['votes']:
            # stopgap
            loser_to_eliminate = forced_loser
        elif actual_winner['votes'] < forced_loser['votes']:
            loser_to_eliminate = actual_winner
    elif forced_elim in forced_elim_order[0:-1]:
        loser_to_eliminate = forced_loser

    if round_winner["votes"] >= win_threshold:
        winner_to_crown = round_winner
    elif round_winner["votes"] < win_threshold:
        winner_to_crown = None
    
    return loser_to_eliminate, round_winner, winner_to_crown, total_votes, win_threshold


def determine_all_first_round_candidates_on_board(ballots):
    """Determines who are all the candidates who appear in the first round of voting."""
    all_candidates = []
    for ballot in ballots:
        top_votes = ballot.get('Vote 1')
        if top_votes:
            for top_vote in top_votes:
                if top_vote:
                    all_candidates.append(top_vote)
            all_candidates = list(set(all_candidates))
    return all_candidates


def determine_candidates_eligibility_for_first_round(ballots):
    """Used to determine which candidates are (in)eligible for the first round of ("instant-runoff") voting."""
    
    # All Candidates
    all_candidates = set(determine_all_candidates(ballots))
    
    # All Candidates in the First Round (Including Ties)
    eligible = set(determine_all_first_round_candidates_on_board(ballots))
    # - [ ] what if a competitive candidate is also only tie-ranked for first?
    
    # Ineligible Candidates
    ineligible = all_candidates.difference(eligible)
    
    return all_candidates, eligible, ineligible


def contest_election(ballots, criteria_test = None, forced_elim_order = None):
    """Runs an ("instant-runoff") election in a series of consecutive rounds to determine the election winner."""
    
    rounds = {}
    results = {}
    round_i = 0
    
    elims = read_cfg.get("Pre-Eliminated Candidate Codes", []).copy()
    all_candidates, eligible, ineligible = determine_candidates_eligibility_for_first_round(ballots)
    
    if criteria_test in ["Favorite Betrayal"]:
        pass
    else:
        for ineligible_candidate in ineligible:
            elims.append(ineligible_candidate)
    
    winner_to_crown = None

    while winner_to_crown is None:
        round_i += 1
        print(f"Contesting Round {round_i}...", end="\r")
        rounds.update({round_i : Counter()})
        tied_ballots = 0

        for ballot in ballots:
            top_choice = get_top_choice(ballot, elims)
            if type(top_choice) == list:
                tied_ballots += 1
            elif (type(top_choice) in [str, float, int]) or (top_choice is None):
                rounds[round_i][top_choice] += 1
            else:
                raise Exception(f"Top Choice Error: {top_choice} {type(top_choice)}")
        
        if criteria_test in ["Favorite Betrayal"]:
            forced_elim = forced_elim_order[round_i - 1]
            loser_to_eliminate, round_winner, winner_to_crown, total_votes, win_threshold = contest_favorite_betrayal_round(rounds[round_i],
                                                                                                                            forced_elim,
                                                                                                                            forced_elim_order)
            elims.append(forced_elim)
            
        elif criteria_test is None:
            loser_to_eliminate, round_winner, winner_to_crown, total_votes, win_threshold = contest_round(rounds[round_i])

            if type(loser_to_eliminate['candidate']) == list:
                elims.extend(loser_to_eliminate['candidate'])
            else:
                elims.append(loser_to_eliminate['candidate'])
        
        ordered_vote_counts = dict(sorted(rounds[round_i].items(), key = lambda kv: kv[1], reverse=True))
        
        results[round_i] = {}
        results[round_i].update({"Tied Ballots" : tied_ballots})
        results[round_i].update({"Vote Counts" : ordered_vote_counts})
        results[round_i].update({"Total Votes" : total_votes})
        results[round_i].update({"Win Threshold" : win_threshold})
        results[round_i].update({"Round Winner" : round_winner['candidate']})
        if criteria_test not in ["Favorite Betrayal"]:
            results[round_i].update({"Eliminated" : loser_to_eliminate['candidate']})
        elif criteria_test in ["Favorite Betrayal"]:
            results[round_i].update({"Eliminated" : loser_to_eliminate['candidate']})

    results[round_i].update({"Winner" : winner_to_crown['candidate']})
    print(f"✔ Contested! {name_map[winner_to_crown['candidate']]} won after {len(rounds)} rounds.".ljust(30))
    return rounds, results


def determine_all_candidates(ballots):
    """Determines who all the candidates in a given election are on the basis of whether or not they are in the ballot data."""
    all_candidates = []
    
    for ballot in ballots:
        #for cand_column in cand_columns:
        for cand_column in cand_columns.values():
            add_candidate = ballot.get(cand_column)
            if add_candidate:
                
                if read_cfg['Tied Symbol'] is not None:
                    deconstructed_add_candidates = add_candidate.split(read_cfg['Tied Symbol'])
                elif read_cfg['Tied Symbol'] is None:
                    deconstructed_add_candidates = [add_candidate]
                
                for dac in deconstructed_add_candidates:
                    if dac:
                        all_candidates.append(dac)
                all_candidates = list(set(all_candidates))
                
    return all_candidates


def calculate_vote_transfers(result_0, result_1):
    """Determines how many votes are transferred from a candidate eliminated in a given round to the candidates who are eligible for the next round."""
    transfers = {}
    for candidate in result_1['Vote Counts']:
        if candidate not in result_0['Vote Counts']:
            continue
        transfer = result_1['Vote Counts'][candidate] - result_0['Vote Counts'][candidate]
        transfers.update({candidate : transfer})
    return transfers


def calculate_percentages(result, total_ballots):
    """Determines the percentage of votes earned by each candidate in a given round."""
    percentages = {}
    
    total_votes = result['Total Votes']
    
    for candidate, votes in result['Vote Counts'].items():
        ballot_percentage = (votes / total_ballots) # i.e., includes undervotes
        vote_percentage = (votes / total_votes) # i.e., does not include undervotes
        percentages.update({candidate : {"Of Ballots" : ballot_percentage,
                                         "Of Votes" : vote_percentage}})
    
    return percentages


def enrich_results(results):
    """Adds additional statistics to each round's results."""
    
    for i, result in results.items():
    
        percentages = calculate_percentages(result, len(ballots))
        result.update({'Percentages' : percentages})

        if 'Winner' in result:
            continue

        elif 'Eliminated' in result:
            transfers = calculate_vote_transfers(results[i], results[i + 1])
            result.update({'Vote Transfers' : transfers})
        
    print("✔ Enriched!")
    return results

## Visualize Election Results

### Horizontal Bar Graph

In [None]:
def get_color_to_candidate_map(ballots, results, palette_choice = "tableau_remix"):
    """Get map of candidate IDs to a color from desired palette."""
    
    def get_palette_choice(palette_choice):
        """Get desired palette. Learn more about these palettes at https://gist.github.com/KalebNyquist/965e2c2b5512026817ed9aec69bbe278"""

        # - [ ] add greyscale choice
        if palette_choice == "xgfs":
            palette = ['#ebac23', '#b80058', '#008cf9', '#006e00', '#00bbad', '#d163e6', '#b24502', '#ff9287', '#5954d6',
                       '#00c6f8', '#878500', '#00a76c']
            greys = ['#585858', '#eeeeee']
        elif palette_choice == "tableau_remix":
            palette = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b', '#e377c2', '#bcbd22', '#17becf', 
                       '#aec7e8', '#ffbb78', '#98df8a', '#ff9896', '#c5b0d5', '#c49c94', '#f7b6d2', '#dbdb8d', '#9edae5']
            greys = ['#7f7f7f', '#c7c7c7']
        else:
            raise Exception

        return palette, greys
    
    
    palette, greys = get_palette_choice(palette_choice)

    all_candidates, eligible, ineligible = determine_candidates_eligibility_for_first_round(ballots)
    eligible_first_round_vote_counts = {e : results[1]['Vote Counts'].get(e, 0) for e in eligible}
    eligible_ordered = dict(sorted(eligible_first_round_vote_counts.items(), key = lambda kv: kv[1], reverse=True))

    color_map = {}
    for c, candidate in enumerate(eligible_ordered):
        cr = c % len(palette)
        color_map.update({candidate : palette[cr]})
        
    return color_map, greys


def get_legend(color_map, greys):
    """Get user_friendly visual legend used for mapping candidate-specific colors to their names."""

    handles = [patches.Patch(color=v, label=name_map[k]) for k, v in color_map.items()]
    grey_handles = [patches.Patch(color=greys[i], label=["Tied Ballot", "Exhausted Ballot"][i]) for i in [0, 1]]
    handles = handles + grey_handles

    plt.legend(handles=handles, fontsize=12, frameon=False)
    plt.gca().set_axis_off()

    return plt


def get_horizontal_bar_graph_of_rounds(results):
    """Get results displayed in a horizontal bar graph."""

    fig, ax = plt.subplots(len(results), 1, figsize = (12,len(results) * 2.05))

    plt.subplots_adjust(left=0.1,
                        bottom=0.1, 
                        right=0.9, 
                        top=0.9, 
                        wspace=0.4, 
                        hspace=0.4)

    for i, result in results.items():

        ax_no = i-1

        barh_coordinates = []
        cumulative_percentage = 0


        exhausted_section = {}

        for candidate, percentage in result['Percentages'].items():

            if candidate:
                barh_section = {"Candidate" : name_map[candidate],
                                "Vote Share" : percentage["Of Ballots"],
                                "Cumulative" : cumulative_percentage,
                                "Color" : color_map[candidate]
                               }

                barh_coordinates.append(barh_section)
                cumulative_percentage += percentage["Of Ballots"]


            elif candidate is None:
                exhausted_percentage = (total_ballots - result["Total Votes"]) / total_ballots
                exhausted_section = {"Candidate" : "Exhausted Ballots",
                                     "Vote Share" : exhausted_percentage,
                                     "Color" : greys[1]}


        tied_percentage = result["Tied Ballots"] / total_ballots

        tied_section = {"Candidate" : "Tied Ballots",
                        "Vote Share" : tied_percentage,
                        "Cumulative" : cumulative_percentage,
                        "Color" : greys[0]}

        barh_coordinates.append(tied_section)
        cumulative_percentage_plus_tied = cumulative_percentage + tied_percentage

        if exhausted_section != {}:
            exhausted_section.update({"Cumulative" : cumulative_percentage_plus_tied})
            barh_coordinates.append(exhausted_section)

        eliminated_candidate = result['Eliminated']
        if type(eliminated_candidate) == list:
            eliminated_string = ", ".join(result['Eliminated'])
            eliminated_vote = result['Percentages'][result['Eliminated'][0]]['Of Ballots'] * 100
        elif type(eliminated_candidate) != list:
            eliminated_string = name_map[result['Eliminated']]
            eliminated_vote = result['Percentages'][result['Eliminated']]['Of Ballots'] * 100

        leading_candidate = result['Round Winner']
        if type(leading_candidate) == list:
            leading_string = ", ".join(result['Round Winner'])
            leading_vote = result['Percentages'][result['Round Winner'][0]]['Of Ballots'] * 100
        elif type(leading_candidate) != list:
            leading_string = name_map[result['Round Winner']]
            leading_vote = result['Percentages'][result['Round Winner']]['Of Ballots'] * 100

        win_condition = "{}%".format(round(100 * result['Win Threshold'] / total_ballots , 2))
        if "Winner" in result:
            win_condition = "✔ {}".format(win_condition)
        else:
            win_color = "black"

        # X Axis
        ax[ax_no].set_xticks([cumulative_percentage * .5], minor=False)
        ax[ax_no].set_xticks([0, cumulative_percentage], minor=True)
        ax[ax_no].set_xticklabels(["Required to Win Election:\n{}".format(win_condition)], minor=False, color = win_color)
        ax[ax_no].set_xticklabels(["This Round's Leader:\n{} • {:.2f}%".format(leading_string, leading_vote),
                                   "Eliminated This Round:\n{} • {:.2f}%".format(eliminated_string, eliminated_vote)],
                                  minor=True)
        ax[ax_no].set_xlim(-.01, 1)
        ax[ax_no].grid(True, axis="x", color="black", lw=2, alpha=.4)
        ax[ax_no].grid(True, axis="x", color="black", which="minor", lw=2, alpha=1)

        # TODO:
        # https://stackoverflow.com/questions/33678606/changing-color-of-single-characters-in-matplotlib-plot-labels
        # [ ] make all current xticks "major" / remove 'win_color' entirely
        # [ ] add a colored ◼ 
        
        # Y Axis
        ax[ax_no].set_yticks([0])
        label = "{} round".format(num2words(i, ordinal=True)).title()
        ax[ax_no].set_yticklabels([label], weight="bold")

        # Cosmetics
        ax[ax_no].set_frame_on(False)
        ax[ax_no].tick_params(axis="x", direction='in', length=0, pad=3, which="minor")
        ax[ax_no].tick_params(axis="x", direction='in', length=0, pad=3, which="major")
        ax[ax_no].tick_params(axis="y", direction='in', length=0, pad=12.5)

        for coordinate in barh_coordinates:
            ax[ax_no].barh(0,
                         coordinate["Vote Share"],
                         left=coordinate["Cumulative"],
                         label=coordinate["Candidate"],
                         color=coordinate["Color"]
                        )

        ax[ax_no].get_xticklabels(minor = True)[0].set_ha("left")
        ax[ax_no].get_xticklabels(minor = True)[1].set_ha("right")
        
    return fig

# Narrate Election Results `narrate_results()`

In [None]:
relative_performance_words = {
    "+" : "rose to",
    "-" : "fell to",
    "=" : "remained at",
    "!" : "was"
}

leader_performance_words = {
    "+" : "took the lead",
    "-" : None,
    "=" : "held first place",
    "!" : "received the most highest rankings of any candidate"
}


def fvote(number_of_votes, singular = False, noun = "vote", extra = "", verb = None):
    """Parameter-guided string creation to display the number and kind of votes in a human-readable manner"""
    if singular:
        return "{:,} {noun}".format(number_of_votes, noun = noun)
    elif (number_of_votes >= 2) or (number_of_votes == 0):
        if verb == "are":
            return "{:,} {noun}s {extra}are".format(number_of_votes, extra = extra, noun = noun)
        else:
            return "{:,} {noun}s".format(number_of_votes, noun = noun)
    elif number_of_votes == 1:
        if verb == "are":
            return f"1 {noun} {extra}is"
        else:
            return f"1 {noun}"
    else:
        raise Exception

        
def narrate_result(result_i, results):
    """Gives a text-based account for what happened in a particular round of voting."""
            
    def candidate_relative_performance(candidate):
        """Evaluates a candidate's performance in the rankings relative to the previous round of voting."""
        
        if result_i - 1 not in results:
            return "!"
        
        this_round = list(result['Vote Counts']).index(candidate)
        try:
            last_round = list(results[result_i - 1]['Vote Counts']).index(candidate)
        except ValueError:
            raise Exception
        
        if this_round == last_round:
            return "="
        elif this_round > last_round:
            return "-"
        elif this_round < last_round:
            return "+"
    
    result = results[result_i]
    round_name = num2words(result_i, to="ordinal")
    candidates = [k for k, v in list(result['Vote Counts'].items()) if k]
    votes = [v for k, v in list(result['Vote Counts'].items()) if k]
    win_threshold = result['Win Threshold']
    
    ## Leading Candidate
    a = f"In the {round_name} round of voting, "
    
    # - [ ] replace with round winner
    if 'Winner' not in result:
        b = f"{name_map[candidates[0]]} {leader_performance_words[candidate_relative_performance(candidates[0])]} with {fvote(votes[0])}."
    elif 'Winner' in result:
        b = f"{name_map[candidates[0]]} won the election with {fvote(votes[0])}, {fvote(votes[0] - win_threshold)} more than the {fvote(win_threshold, singular=True)} threshold required to win."
    
    ## Intermediate Candidates
    c = ""
    
    if 'Winner' in result:
        listed_candidates = candidates[1:]
    if 'Winner' not in result:
        listed_candidates = candidates[1:-1]
    
    for rank, intermediate_candidate in enumerate(listed_candidates, start=1):
        
        if (rank == len(candidates) - 2) & (rank != 1):
            c = c + "and "
        
        rank_ordinal = num2words(rank + 1, to="ordinal")
        appendage = f"{name_map[intermediate_candidate]} {relative_performance_words[candidate_relative_performance(intermediate_candidate)]} {rank_ordinal} with {fvote(votes[rank])}, "
        c = c + appendage
    
        if (rank == len(candidates) - 2) or (len(listed_candidates) == 1):
            if c.endswith(", "):
                c = c[:-2] + ". "
            
    ## Eliminated/Winning Candidate
            # - [ ] add eliminated/winners and transfer votes
    if ('Eliminated' in result) & ('Winner' not in result):
        eliminated_candidate = result['Eliminated']
        if type(eliminated_candidate) == list:
            eliminated_votes = result['Vote Counts'][result['Eliminated'][0]]
        elif type(eliminated_candidate) != list:
            eliminated_votes = result['Vote Counts'][result['Eliminated']]
        
        if type(eliminated_candidate) == list:
            eliminated_candidates_name_mapped = [name_map[x] for x in eliminated_candidate[0:-2]]
            d = f"Because {', '.join(eliminated_candidates_name_mapped)} and {name_map[eliminated_candidate[-1]]} tied for "
        if type(eliminated_candidate) != list:
            d = f"Because {name_map[eliminated_candidate]} received "
            
        d = d + f"the fewest highest rankings with only {fvote(eliminated_votes)}, they do not advance to the next round."
        
        if 'Vote Transfers' in result:
            
            if type(eliminated_candidate) == list:
                eliminated_candidates_name_mapped = [name_map[x] for x in eliminated_candidate[0:-2]]
                e = f" Of {', '.join(eliminated_candidates_name_mapped)} and {name_map[eliminated_candidate[-1]]}'s combined {fvote(eliminated_votes * len(eliminated_candidate))}," 
            if type(eliminated_candidate) != list:
                e = f" Of {name_map[eliminated_candidate]}'s {fvote(eliminated_votes)},"
            
            recipients = [k for k, v in list(result['Vote Transfers'].items()) if k and v != 0]
            received_votes = [v for k, v in list(result['Vote Transfers'].items()) if k and v != 0]
            vote_transfers = dict(zip(recipients, received_votes))
            
            for i, (recipient, received_vote) in enumerate(vote_transfers.items()):
                if (i == len(vote_transfers) - 1) & (i != 0):
                    e = e + " and"
                e = e + f" {fvote(received_vote, verb='are')} transferred to {name_map[recipient]},"
                if i == len(vote_transfers) - 1:
                    if c.endswith(", "):
                        e = e[:-2] + ". "
                
            undervotes = result['Vote Transfers'].get(None, 0)
            if undervotes > 0:
                f = f"{fvote(undervotes, noun='ballot', extra = 'did not rank any of the remaining candidates and', verb='are')} removed from consideration. "
                e = e + f
                
            d = d + e
            
        c = c + d
    
    next_round = num2words(result_i + 1, to="ordinal")
    if 'Winner' not in result:
        z = f"Because no candidate reached the {fvote(win_threshold, singular=True)} threshold to win the election, voting continues on to the {next_round} round.\n"
    elif 'Winner' in result:
        z = ""
    
    narration = a + b + " " + c + z
    
    return narration


def narrate_results(results):
    """Wrapper function to process each round's result into `narrate_result{}` to give a text-based account for what happened over the course of an instant-run off."""
    for result in results:
        print(narrate_result(result, results))

## Check for Non-Monotonicity `monotonicity_check()`

In [None]:
def determine_feasibility_by_subversive_elim_count(subversive_elim_orders):
    """Checks number of 'subversive' (i.e., resulting in a different winner than expected) candidate elimination orders to determine in an investigtion of non-monotonicity is warranted"""
    
    if len(subversive_elim_orders) > 0:
        print(f"✔ {len(subversive_elim_orders)} subversive elimination orders found. Continue with analysis.")
        return True
    elif len(subversive_elim_orders) == 0:
        print("🛇 Zero subversive elimination orders found. Non-monotonicity cannot be proven. Analysis complete.")
        return False

def get_pairwise_winners(elim_order, pairs):
    """Not a strict/Condorcet pairwise (i.e. % of first-place votes) but rather who places higher in terms of the elimination order."""
    pairwise_winners = {}
    
    for pairwise in pairs:
        home_team = pairwise[0]
        away_team = pairwise[1]

        home_team_score = elim_order[home_team]
        away_team_score = elim_order[away_team]

        if home_team_score > away_team_score:
            pairwise_winners.update({pairwise : home_team})
        elif home_team_score < away_team_score:
            pairwise_winners.update({pairwise : away_team})
        else:
            # - [ ] TODO: Manage tie situation
            raise Exception
    
    return pairwise_winners


def run_pairwise_tournament(pairs, real_elim_order, subversive_elim_orders):
    """Runs each pairing of candidates against their performance in the actual results and each of the subversive elimination orders identified."""
    
    def investigate_results_by_pairing_for_abnormalities(row):
        """Analyzes a row of the dataframe (representing a pairing of candidates) to see if there is something abnormal about the winners (i.e. different candidate is victorious depending on the elimination order)"""
        winners = [winner for elim_order, winner in row.items() if elim_order != "Analysis"]
        winners = list(set(winners))

        if len(winners) == 1:
            return "Consistent" # The same candidate will win regardless of the elimination order (real or subversive)

        # Determine distribution of multiple winners
        real_winner = []
        fake_winners = []

        for elim_order, winner in row.items():
            if elim_order == "Real":
                real_winner.append(winner)
            elif elim_order == "Analysis":
                continue
            else:
                fake_winners.append(winner)

        fake_winners = list(set(fake_winners))

        if len(fake_winners) >= 2:
            return "Mixed" # Multiple candidates can win depending on the subversive elimination order

        if len(fake_winners) == 1:
            if fake_winners != real_winner:
                return "Opposed" # A candidate who wins in each subversive elimination order is different than the candidate who wins given the real results
    
    pairwise_tournament = {}
    pairwise_tournament.update({"Real" : get_pairwise_winners(real_elim_order, pairs)})
    
    for z in subversive_elim_orders:
        forced_elim_order = {x : i + 1 for i, x in enumerate(z['elimination order'])}
        forced_elim_str = "-".join(list(forced_elim_order.keys()))
        pairwise_winners = get_pairwise_winners(forced_elim_order, pairs)
        pairwise_tournament.update({forced_elim_str : pairwise_winners})
    
    bracket = pd.DataFrame(pairwise_tournament)
    bracket["Analysis"] = bracket.apply(investigate_results_by_pairing_for_abnormalities, axis=1)
    
    print("✔ Pairwise matchings across various elimination orders simulated.")
    return bracket


def determine_opposed_pairings(bracket):
    """Distillation of the analyses from `investigate_results_by_pairing_for_abnormalities()` to determine number of opposed pairings."""
    bracket_analysis = dict(bracket['Analysis'].value_counts())
    num_opposed = bracket_analysis.get('Opposed', 0)
    
    opposed_pairing = bracket[bracket['Analysis'] == "Opposed"]

    if num_opposed == 0:
        print("🛇 Can't continue analysis.", num_opposed)
        return opposed_pairing, False
    elif num_opposed == 1:
        print(f"✔ 1 opposed pairing detected: {'-'.join(list(opposed_pairing['Real'].keys())[0])}. Continue analysis!")
        return opposed_pairing, True
    elif num_opposed >= 2:
        print("🛇 Can't continue analysis.", num_opposed)
        return opposed_pairing, False
    

def get_relative_ranks_of_opposed_pair(opposed_pairing):
    """Figure out which candidate in a opposed pairing performs better — this is the candidate who is on the cusp of elimination and whose voters favor the other candidate in the opposed pair over the actual winner."""

    [higher_rank] = opposed_pairing['Real'].values
    [pair_identities] = [opposed_pairing['Real'].keys()][0]
    [lower_rank] = [candidate for candidate in pair_identities if candidate != higher_rank]
    
    return higher_rank, lower_rank


def get_critical_result(results, lower_rank):
    """Determine the round in the real election where the (subversive) candidate is eliminated before they can have a significant numbers of votes be counted for them from another candidate's elimination"""
    [g] = [(i, result) for i, result in results.items() if result['Eliminated'] == lower_rank]
    critical_i = g[0]
    critical_result = g[1]
    print("✔ Critical result detected: Round", critical_i)
    return critical_i, critical_result


def reverse_evaluate_ballot_for_key_candidates(ballot, num_candidates, higher_rank, lower_rank, winner):
    """Determine the order of the key candidates: the real winner, the subversive candidate (lower_rank), and the alternative elimination (higher_rank)"""
    evaluation_dict = {}

    for ranked_cand in [higher_rank, lower_rank, winner]:
        cand_to_i = {cand_a : cand_i for cand_i, cand_a in ballot.items() if cand_a == ranked_cand}

        if cand_to_i == {}:
            evaluation_dict.update({ranked_cand : num_candidates})
            continue

        [cand] = list(cand_to_i.keys())

        if ranked_cand != cand:
            raise Exception

        [cand_i] = list(cand_to_i.values())
        rank_i = cand_to_rank_map[cand_i]
        rank = ballot[rank_i]

        evaluation_dict.update({ranked_cand : int(rank)})
        
    return evaluation_dict

def get_possible_betrayals(ballots, lower_rank, higher_rank, winner):
    """Dictionary collecting all the ballots that represent a voter who can help ruin the election by choosing to rank the winner above the alternate elimination candidate"""

    all_candidates = determine_all_candidates(ballots)
    num_candidates = len(all_candidates)

    possible_betrayals = {
        "Winner-Above-Spoiler" : [],
        "Winner-Equal-Spoiler" : [],
        "Winner-Below-Spoiler" : []
    }
    
    for ballot in ballots:

        evaluation_dict = reverse_evaluate_ballot_for_key_candidates(ballot, num_candidates, higher_rank, lower_rank, winner)

        if evaluation_dict[higher_rank] < evaluation_dict[lower_rank] < evaluation_dict[winner]:
            possible_betrayals["Winner-Below-Spoiler"].append(ballot)
        elif evaluation_dict[higher_rank] < evaluation_dict[winner] == evaluation_dict[lower_rank]:
            possible_betrayals["Winner-Equal-Spoiler"].append(ballot)
        elif evaluation_dict[higher_rank] < evaluation_dict[winner] < evaluation_dict[lower_rank]:
            possible_betrayals["Winner-Above-Spoiler"].append(ballot)
        else:
            pass

    print("✔ Ballots sorted by defection possibility.")
    return possible_betrayals


def get_threshold_dict(possible_betrayals, betrayals_remaining):
    """Used as a way of marking how many voters defected, and of what preference profile, at a given point in a hypothetical vote rank swap. Used to mark the lower and upper bounds of a non-monotonic range."""
    
    threshold_dict = {
        "Simple Advance" : len(possible_betrayals['Winner-Above-Spoiler']) - betrayals_remaining["Simple Advance"],
        "Tie Broken + Advance" : len(possible_betrayals['Winner-Equal-Spoiler']) - betrayals_remaining["Tie Broken + Advance"],
        "Leapfrog + Advance" : len(possible_betrayals['Winner-Below-Spoiler']) - betrayals_remaining["Leapfrog + Advance"]
    }
    
    return threshold_dict


def get_betrayals_remaining(possible_betrayals):
    """Used to count how many voters may defect to cause a 'favorite betrayal'"""
    
    n_above = len(possible_betrayals['Winner-Above-Spoiler'])
    n_equal = len(possible_betrayals['Winner-Equal-Spoiler'])
    n_below = len(possible_betrayals['Winner-Below-Spoiler'])
    betrayals_remaining = {"Simple Advance" : n_above, "Tie Broken + Advance" : n_equal, "Leapfrog + Advance" : n_below}
    return betrayals_remaining


def determine_thresholds(possible_betrayals, critical_result, lower_rank, higher_rank, winner):
    """Shifts voters who can defect/betray their favorite by moving their vote to the winner one at a time, and then looking to see if the outcome of the election has been changed"""
    
    spoiler_advantage, alternate_elim_threshold, spoiler_needs_to_win = nonmonotonicity_factors(possible_betrayals,
                                                                                                   critical_result,
                                                                                                   lower_rank,
                                                                                                   higher_rank,
                                                                                                   winner)

    betrayals_remaining = get_betrayals_remaining(possible_betrayals)

    for betrayal_type in betrayals_remaining:

        while betrayals_remaining[betrayal_type] > 0:

            if alternate_elim_threshold == 0:
                lower_threshold = get_threshold_dict(possible_betrayals, betrayals_remaining)

            if spoiler_advantage == spoiler_needs_to_win:
                higher_threshold = get_threshold_dict(possible_betrayals, betrayals_remaining)

            betrayals_remaining[betrayal_type] -= 1

            alternate_elim_threshold -= 1

            if betrayal_type in ['Tie Broken + Advance', 'Leapfrog + Advance']:
                spoiler_needs_to_win += 1

            if betrayal_type in ['Leapfrom + Advance']:
                spoiler_advantage -= 1
                
    print("✔ Non-monotonicity thresholds calculated.")
    return lower_threshold, higher_threshold


def describe_threshold(threshold, threshold_type, possible_betrayals, higher_rank, lower_rank, winner):
    """An English human-readable explanation of a non-monotonic bound"""
    
    higher_rank_total = len(possible_betrayals['Winner-Above-Spoiler']) + len(possible_betrayals['Winner-Equal-Spoiler']) + len(possible_betrayals['Winner-Below-Spoiler'])

    above_betrayals = threshold['Simple Advance']
    above_possible = len(possible_betrayals['Winner-Above-Spoiler'])

    equal_betrayals =  threshold['Tie Broken + Advance']
    equal_possible = len(possible_betrayals['Winner-Equal-Spoiler'])

    below_betrayals = threshold['Leapfrog + Advance']
    below_possible = len(possible_betrayals['Winner-Below-Spoiler'])

    if threshold_type == "lower":
        keywords = ("begins", "minimum")
    elif threshold_type == "higher":
        keywords = ("ends", "maximum")
    
    threshold_english = f"""Non-monotonicity {keywords[0]} when, of the {higher_rank_total} voters who prefer {name_map[higher_rank]} over {name_map[winner]} & {name_map[lower_rank]}, at {keywords[1]}:
    • {above_betrayals} of the {above_possible} voters who **prefer {name_map[winner]} over {name_map[lower_rank]}** defect, and place {name_map[winner]} above {name_map[higher_rank]} 
    • {equal_betrayals} of the {equal_possible} voters who **equally rank {name_map[winner]} and {name_map[lower_rank]}** defect and place {name_map[winner]} above {name_map[higher_rank]} and {name_map[lower_rank]}
    • {below_betrayals} of the {below_possible} voters who **prefer {name_map[lower_rank]} over {name_map[winner]}** defect and place {name_map[winner]} above {name_map[higher_rank]} and {name_map[lower_rank]}"""

    return threshold_english


def nonmonotonicity_factors(possible_betrayals, critical_result, lower_rank, higher_rank, winner):
    """Key numbers used when calculating if an election is nonmonotonic"""
    spoiler_advantage = len(possible_betrayals['Winner-Below-Spoiler']) - len(possible_betrayals['Winner-Above-Spoiler'])
    alternate_elim_threshold = critical_result['Vote Counts'][higher_rank] - critical_result['Vote Counts'][lower_rank]
    spoiler_needs_to_win = critical_result['Vote Counts'][winner] - critical_result['Vote Counts'][lower_rank]
    return spoiler_advantage, alternate_elim_threshold, spoiler_needs_to_win


def monotonicity_check(results, ballots):
    """Runs a monotonicity check of the results and ballots"""
    
    def betrayal_factors(results, ballots):
        """Key numbers used when calculating hypothetical alternatives (i.e., deviant elimination orders) to the actual result"""
    
        winner = results[len(results)]['Winner']

        winner_set = set([winner])
        candidates_but_winner = set(determine_all_candidates(ballots)).difference(winner_set)

        if 'Pre-Eliminated Candidate Codes' in read_cfg:
            candidates_but_winner = candidates_but_winner.difference(read_cfg['Pre-Eliminated Candidate Codes'])

        real_elim_order = {}
        reverse_elim_order = {round_i : result['Eliminated'] for round_i, result in results.items()}
        for round_i, result in reverse_elim_order.items():
            if type(result) == list:
                for x in result:
                    real_elim_order.update({x : round_i})
            elif type(result) != list:
                real_elim_order.update({x : round_i})
        
        real_elim_order = {result['Eliminated'] : round_i for round_i, result in results.items()}
        # - [ ] https://statisticsbyjim.com/probability/permutations-probabilities/ <-- how many is too many?
        possible_elim_orders = permutations(candidates_but_winner)
        elim_orders = [x for x in possible_elim_orders]

        pairs = [x for x in combinations(candidates_but_winner, 2)]

        print("✔ Prerequisite calculations made.")
        return winner, real_elim_order, elim_orders, pairs
    
    
    def find_subversive_elimination_orders(elim_orders, ballots):
        """Find orders of candidate elimination that would dethrone the real winner."""

        subversive_elim_orders = []

        print("\n########## Beginning election simulations ##########\n")

        for elim_order in elim_orders:
            print("Election Simulation with Elimination Order:", elim_order)
            elim_rounds, elim_results = contest_election(ballots,
                                                         criteria_test="Favorite Betrayal",
                                                         forced_elim_order = elim_order
                                                        )

            elim_order_winner = elim_results[len(elim_results)]['Winner']

            if elim_order_winner != winner:
                print("‼ Simulated Winner different from Real Winner:", elim_order_winner, "≠", winner)
                subversive_elim_orders.append({"elimination order" : elim_order,
                                      "results" : elim_results})

            print("\n", end="")

        print("########## Concluding election simulations ##########\n")

        return subversive_elim_orders
    
    def narrative_numbers(results):
        """Get round number in human-readable English; other simple-to-calcuate numbers that help describe the narrative can be added here in the future."""
        real_winning_round = "{} round".format(num2words(len(results), ordinal=True))
        return real_winning_round

    
    print("Beginning non-monotonicity check…", end="\r")
    winner, real_elim_order, elim_orders, pairs = betrayal_factors(results, ballots)
    subversive_elim_orders = find_subversive_elimination_orders(elim_orders, ballots)
    analysis_possible = determine_feasibility_by_subversive_elim_count(subversive_elim_orders)
    if analysis_possible is False:
        return None, None

    bracket = run_pairwise_tournament(pairs, real_elim_order, subversive_elim_orders)
    opposed_pairing, able_to_continue = determine_opposed_pairings(bracket)
    if able_to_continue is False:
        return None, None

    higher_rank, lower_rank = get_relative_ranks_of_opposed_pair(opposed_pairing)
    critical_i, critical_result = get_critical_result(results, lower_rank)

    possible_betrayals = get_possible_betrayals(ballots, lower_rank, higher_rank, winner)
    spoiler_advantage, alternate_elim_threshold, spoiler_needs_to_win = nonmonotonicity_factors(possible_betrayals,
                                                                                                   critical_result,
                                                                                                   lower_rank,
                                                                                                   higher_rank,
                                                                                                   winner)

    lower_threshold, higher_threshold = determine_thresholds(possible_betrayals, critical_result, lower_rank, higher_rank, winner)

    real_winning_round = narrative_numbers(results)
    
    print("\n########## Begin Reporting Results ##########\n")
    print(f"In this election, {name_map[winner]} was elected the winner in the {real_winning_round}. However, there exists a range of non-monotonicity where {name_map[lower_rank]} will paradoxically win the election if {name_map[winner]} receives additional votes in the form of higher rankings.¹\n")
    print(f"Before we start moving votes around, note that {name_map[lower_rank]} only needs {spoiler_needs_to_win} votes to beat {name_map[winner]}, and if {name_map[higher_rank]} is eliminated before {name_map[lower_rank]} that will give {name_map[lower_rank]} an additional {spoiler_advantage} votes over {name_map[winner]}. Only {alternate_elim_threshold} votes separate {name_map[higher_rank]} from {name_map[lower_rank]}, and every additional vote for {name_map[winner]} over {name_map[higher_rank]} decreases this gap.\n")
    print(describe_threshold(lower_threshold, "lower", possible_betrayals, higher_rank, lower_rank, winner), end="\n\n")
    # [ ] move language into into `describe_threshold` function
    print(f"In this range of non-monotonicity {name_map[lower_rank]} will win the election instead of {name_map[winner]} despite not receiving any more votes due to {name_map[higher_rank]} having an earlier elimination.\n")
    print(describe_threshold(higher_threshold, "higher", possible_betrayals, higher_rank, lower_rank, winner), end="\n")
    print(f"\nAt this point, {name_map[winner]} will still be elected winner when receiving more votes.")
    print("\n¹: For simplicity, non-competitive minor candidates have been factored out.")
    print("\n########## End Reporting Results ##########\n")
    
    print("✔ Non-monotonicity check completed.")
    
    return lower_threshold, higher_threshold

# Execution Code

### Load Configuration for Reading "Cast Vote Record" Data

In [None]:
read_cfg = load_cfgs()

### Run Election to Determine Winner and Round-by-Round Results

In [None]:
election_df = get_election_data_as_dataframe(read_cfg)
ballots, rank_to_cand_map, cand_to_rank_map, rank_columns, cand_columns = get_ballots_and_column_maps(read_cfg, election_df)
name_map = get_candidate_to_name_map(use_candidate_ids_as_names=True)
ballots = process_all_ballots(ballots)
total_ballots = len(ballots)
rounds, results = contest_election(ballots)
results = enrich_results(results)
print("✔ Complete!")

### Create Name Map for Mapping Candidate IDs to Name for Presentation

In [None]:
name_map = get_candidate_to_name_map(use_candidate_ids_as_names=True)

### Visualize Results

In [None]:
color_map, greys = get_color_to_candidate_map(ballots, results)
fig = get_horizontal_bar_graph_of_rounds(results)

In [None]:
legend = get_legend(color_map, greys)

### Narrate Results

In [None]:
narrate_results(results)

### Check for Non-Monotonicity

In [None]:
lower_threshold, higher_threshold = monotonicity_check(results, ballots)

_[Return to Navigation](#Navigation)_