# Single Transferable Vote
## Counting Tool using Droop and GSheets
Based on https://github.com/jklundell/droop

And https://github.com/custozza/droop_from_gspreadsheet

Checkout submodules before starting to get Droop: `git submodule update --init --recursive`

## Setup

In [None]:
# install packages, once should be enough
!pip install requests pandas numpy

In [None]:

import requests
import pandas as pd
import numpy as np
import re
import subprocess
import os
import itertools

In [None]:
# Update these:  
# SPREADSHEET_ID = '1ZaN_eCVB2kFFkmeShoGR--mXGxCig28xbw_maABR7zQ' # ID from your public spreadsheet URL
SPREADSHEET_ID = '1-cwFOFbDAMAmhBQCnq40DfnqgV6E17Od3w3GxY2A0ig' # ID from your public spreadsheet URL
NUM_SEATS = 14
IS_PREFERENCES_LIST = True  # enable if ordering is important, STV itself doesn't produce a ranking
IGNORE_CANDIDATES = []

PATH_TO_DROOP = "deps/droop"
PYTHON_COMMAND = "python"
ELECTION_TITLE = "S8_Konferenz_2023"

VOTE_CSV = f'logs/vote.csv'

In [None]:
if IS_PREFERENCES_LIST:
    eval_seats = list(range(1, NUM_SEATS + 1))
else:
    eval_seats = [NUM_SEATS]

## Reading and Parsing

### Fetch Result from Spreadsheet

In [None]:
url = f"https://docs.google.com/spreadsheets/d/{SPREADSHEET_ID}/export?exportFormat=csv"

response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:
    # Get the content of the CSV
    csv_data = response.text

    # Process or print the CSV data as needed
    os.makedirs('logs', exist_ok=True)
    with open(VOTE_CSV, 'w', encoding="utf-8") as f:
        print(csv_data, file=f)
    print(f'Downloaded vote data from {url} to {VOTE_CSV})')
else:
    print(f'Failed to fetch data. Status code: {response.status_code}')

### Convert Ballots Matrix to Sorted Candidates ID per Ballot

In [None]:
# CONVERT BALLOTS MATRIX TO SORTED CANDIDATES ID PER BALLOT

df = pd.read_csv(VOTE_CSV)

# remove votes for ignored candidates
if IGNORE_CANDIDATES:
    print('before dropping ignored candidates:')
    pd.set_option('display.max_columns', 500)
    pd.set_option('display.width', 500)
    print(df)
    unknown_ignored_candidates = [c for c in IGNORE_CANDIDATES if c not in df.values[:, 0]]
    if unknown_ignored_candidates:
        raise ValueError(f"Unknown candidates: {unknown_ignored_candidates}")
    ignore_ids = [i for i, c in enumerate(df.values[:, 0]) if c in IGNORE_CANDIDATES]
    # shift the indexing, large->small to avoid changing the index of the next candidate
    ignore_ids_desc = sorted(ignore_ids, reverse=True)
    for candidate_to_ignore in ignore_ids_desc:
        position = df.iloc[candidate_to_ignore, 1:].to_numpy().astype(np.float32)
        position_bc = np.broadcast_to(position, (len(df), len(position)))
        needs_shift = df.iloc[:, 1:] > position_bc
        position_shift = np.where(needs_shift, -1, 0)
        df.iloc[:, 1:] += position_shift
    # set rows with ignored candidates to NaN
    df.iloc[ignore_ids, 1:] = np.nan
    # remove ignored candidates
    df = df.drop(df.index[ignore_ids])

    print('\nafter dropping ignored candidates:')
    print(df)

candidates = df.values[:, 0]
ballots = df.values[:, 1:]

ballots_with_candidate_preferences = [
                                        [(candidate, pref) 
                                        for candidate, pref in enumerate(ballot) 
                                        if not np.isnan(pref)] 
                                        for ballot in ballots.T]

ballots_with_candidates_sorted_by_preference = [
                                        [candidate 
                                        for candidate, preference in sorted(ballot, key=lambda x: x[1])] 
                                        for ballot in ballots_with_candidate_preferences]

print(f'Number of candidates: {len(candidates)}, Number of ballots: {len(ballots_with_candidates_sorted_by_preference)}')
print('\n'.join([' '.join(str(b)) for b in ballots_with_candidates_sorted_by_preference]))


### Write Droop's BLT Files

In [None]:
def get_blt_file(seats: int):
    return f'logs/vote_seats_{seats}.blt'

def write_blt(seats: int):
    blt_ballots = "\n".join(
        [f"1 {' '.join([str(c+1) for c in ballot])} 0 #" + ", ".join(candidates[c] for c in ballot)
        for ballot in ballots_with_candidates_sorted_by_preference])

    blt_candidates = "\n".join(f'"{candidate} #{i}"' for i, candidate in enumerate(candidates,1))

    blt_file = get_blt_file(seats)
    with open(blt_file, 'w') as file:
        blt_content = f'''
{len(candidates)} {seats}
{blt_ballots}
0 # end marker
{blt_candidates}
"{ELECTION_TITLE}" #Titel
        '''
        file.write(blt_content)
        print(blt_content)

In [None]:
for seats in eval_seats:
    write_blt(seats)

## Evaluate Ballots

### Run Droop for all seats

In [None]:
# Test call directly
# !python deps/droop/Droop.py meek vote.blt

def run_droop(seats: int) -> list:
    blt_file = get_blt_file(seats)
    log_file = f'logs/election_seats_{seats}.log'
    elected_file = f'logs/elected_seats_{seats}.txt'

    command = [PYTHON_COMMAND, PATH_TO_DROOP+'/Droop.py', "meek", blt_file]
    print(' '.join(command))
    result = subprocess.run(command, capture_output=True, text=True)
    with open(log_file, 'w') as file:
        file.write(result.stdout)

    number_of_votes = len(ballots[0])
    candidates_str = f'Candidates: {", ".join(candidates)}'
    stats_str = f'Ballots: {number_of_votes}, Candidates: {len(candidates)}, Winners: {seats}'
    ignored_str = f'Ignored candidates: {", ".join(IGNORE_CANDIDATES)}'

    rounds = result.stdout.split('Round ')
    election_pattern= r'Action: Elect: (?P<candidate>.*)'
    electees_in_rounds = [(i, elected.group('candidate')) 
                        for i, r in enumerate(rounds[1:],1)
                        for elected in re.finditer(election_pattern, r) 
                        if elected ]

    electees_in_rounds = [(id, round, person) for id, (round, person) in enumerate(electees_in_rounds, 1)]

    electees_in_round = "\n".join(f"{str(id).rjust(2)}{str(round).rjust(5)} {person}" for id, round, person in electees_in_rounds)
    elected_text = f"""
{candidates_str}
{stats_str}
{ignored_str}
ID ROUND NAME
{electees_in_round}
    """
    with open(elected_file, 'w') as file:
        file.write(elected_text)
        print(elected_text)

    return electees_in_rounds

In [None]:
electees_info_in_rounds_per_seat = [run_droop(seats) for seats in eval_seats]

### Evaluate Ranking

In [None]:
# Drop ID and Round columns
electees_in_rounds_per_seat = [[person[2] for person in electees] for electees in electees_info_in_rounds_per_seat]
for num_seats, electees in zip(eval_seats, electees_in_rounds_per_seat):
    print(f'{num_seats} seats: {electees}')


In [None]:
# Get changes
new_elected_per_seat = []
not_elected_anymore_per_seat = []
prev_elected = []
for num_seats in eval_seats:
    elected = electees_in_rounds_per_seat[num_seats-1]
    new_elected = [e for e in elected if e not in prev_elected]
    not_elected_anymore = [e for e in prev_elected if e not in elected]
    prev_elected = elected
    new_elected_per_seat.append(new_elected)
    not_elected_anymore_per_seat.append(not_elected_anymore)

In [None]:
# Print Changes
def print_changes(new_elected_per_seat, not_elected_anymore_per_seat, print_to_file=None):
    print_str = ''
    for num_seats, new_elected, not_elected_anymore in zip(eval_seats, new_elected_per_seat, not_elected_anymore_per_seat):
        print_str += f'\n{num_seats} seats: +{new_elected}, -{not_elected_anymore}'
    print(print_str)
    if print_to_file:
        with open(print_to_file, 'w') as file:
            file.write(print_str)

print_changes(new_elected_per_seat, not_elected_anymore_per_seat)

In [None]:
def get_preferences_for_candidate(candidate: str) -> list:
    candidate_id = int(candidate.split(' #')[-1]) - 1
    ballots_including_candidate = [i+1 for i, ballot in enumerate(ballots_with_candidates_sorted_by_preference) if candidate_id in ballot]
    ballot_preferences_ids = [ballot.index(candidate_id) + 1 for ballot in ballots_with_candidates_sorted_by_preference if candidate_id in ballot]
    return ballots_including_candidate, ballot_preferences_ids

def compare_preferences(prefs1: list, prefs2: list) -> int:
    prefs1_sorted = sorted(prefs1)
    prefs2_sorted = sorted(prefs2)
    zip_list = list(itertools.zip_longest(prefs1_sorted, prefs2_sorted))
    print(f'Comparing preferences: {zip_list}')
    for p1, p2 in zip_list:
        if p1 is None:
            return -1
        elif p2 is None:
            return 1
        elif p1 < p2:
            return 1
        elif p1 > p2:
            return -1

    return 0  # equal

# Test preferences comparison
if False:
    print(get_preferences_for_candidate('Candidate 1 #1'))
    print(compare_preferences([1, 2, 3], [1, 2, 3]))  # equal -> 0
    print(compare_preferences([1, 2, 3], [1, 2, 4]))  # first smaller -> 1
    print(compare_preferences([1, 2, 3], [1, 2, 2]))  # first larger -> -1
    print(compare_preferences([1, 2, 3], [1, 2]))  # first has more votes -> 1
    print(compare_preferences([1, 2, 3], [1, 2, 3, 4]))  # first has less votes -> -1


In [None]:

# Resolve case where an electee drops out for two others and is re-elected in the next round
new_elected_per_seat_resolved = new_elected_per_seat.copy()
not_elected_anymore_per_seat_resolved = not_elected_anymore_per_seat.copy()
for num_seats in eval_seats:
    new_elected = new_elected_per_seat_resolved[num_seats-1]
    not_elected_anymore = not_elected_anymore_per_seat_resolved[num_seats-1]
    if len(new_elected) == 2 and len(not_elected_anymore) == 1:
        if num_seats >= NUM_SEATS:
            print(f'Warning: Dropout with max seats, resolve manually: {new_elected} and {not_elected_anymore} in {num_seats} seats election')
        else:
            if new_elected_per_seat_resolved[num_seats] != not_elected_anymore:  # not elected anymore needs to be re-elected with one more seat
                print(f'Error: {not_elected_anymore[0]} drops out with {num_seats} seats, but is not re-elected with {num_seats+1} seats')
            else:
                print(f'Case to resolve: {num_seats} seats: +{new_elected}, -{not_elected_anymore}')

                candidate_a, candidate_b = new_elected
                candidate_a_ballots, candidate_a_prefs,  = get_preferences_for_candidate(candidate_a)
                candidate_b_ballots, candidate_b_prefs = get_preferences_for_candidate(candidate_b)
                print(f'{candidate_a} ballots: {candidate_a_ballots}')
                print(f'{candidate_b} ballots: {candidate_b_ballots}')
                print(f'{candidate_a} preferences: {candidate_a_prefs}')
                print(f'{candidate_b} preferences: {candidate_b_prefs}')
                comp_res = compare_preferences(candidate_a_prefs, candidate_b_prefs)
                if comp_res == 0:
                    print(f'Error: {candidate_a} and {candidate_b} have equal preferences. Resolve manually.')
                elif comp_res == 1:
                    print(f'{candidate_a} has better/more votes than {candidate_b}')
                    new_elected_per_seat_resolved[num_seats-1] = [candidate_a]
                    new_elected_per_seat_resolved[num_seats] = [candidate_b]
                    not_elected_anymore_per_seat_resolved[num_seats-1] = []
                elif comp_res == -1:
                    print(f'{candidate_b} has better/more votes than {candidate_a}')
                    new_elected_per_seat_resolved[num_seats-1] = [candidate_b]
                    new_elected_per_seat_resolved[num_seats] = [candidate_a]
                    not_elected_anymore_per_seat_resolved[num_seats-1] = []
                else:
                    print(f'Error: unexpected comparison result {comp_res}')
    elif len(new_elected) == 1 and len(not_elected_anymore) == 0:
        pass  # default case
    else:
        print(f'Warning: unexpected changes in {num_seats} seats election: +{new_elected}, -{not_elected_anymore}')

print('\nAfter resolving:')
print_changes(new_elected_per_seat_resolved, not_elected_anymore_per_seat_resolved, print_to_file='logs/00_election_result.txt')