In [None]:
#!/usr/bin/env python
# -*- coding: utf8 -*-
"""Takes in a google form ranked choice voting result via google speadsheet and 
goes through each round until there are only 2 candidates left.

Original script by rrosasl https://github.com/rrosasl/RankedVoting
and his write up https://rrosasl.medium.com/ranked-choice-voting-with-google-forms-and-python-c471ea568a60
However, that part of the code is now unused.

Modified by hansioux to record e-mail addresses and added a tie breaker.
Alternate implementation with pyrankvote also added.
"""
import pandas as pd
import numpy as np
import re

#To use import data from Google
from google.colab import auth
import gspread
from oauth2client.client import GoogleCredentials

#To sort list
from operator import itemgetter
#from itertools import repeat
#from collections import Counter

#To us the pyrankvote library
!pip install pyrankvote
import pyrankvote
from pyrankvote import Candidate, Ballot

#To visualize Sankey Diagram
import plotly
import plotly.graph_objects as go
import matplotlib.pyplot as plt

Collecting pyrankvote
  Downloading pyrankvote-2.0.3-py3-none-any.whl (13 kB)
Installing collected packages: pyrankvote
Successfully installed pyrankvote-2.0.3


In [None]:
def retrieve_google_sheets():
  auth.authenticate_user()

  import gspread
  from oauth2client.client import GoogleCredentials

  gc = gspread.authorize(GoogleCredentials.get_application_default())

  #worksheet = gc.open('Your spreadsheet name').sheet1
  wb = gc.open_by_url('https://docs.google.com/spreadsheets/d/1y7ID3rjDI-Ih1Sv1ZQO3m3syowdDIzZyObyzSAneIE0/edit?usp=sharing')

  # get_all_values gives a list of rows.
  wb = wb.worksheet('Form Responses 1')
  rows = wb.get_all_values()

  # Convert to a DataFrame and render.
  df = pd.DataFrame.from_records(rows)

  new_header = df.iloc[0] #grab the first row for the header
  df = df[1:] #take the data less the header row
  df.columns = new_header #set the header row as the df header
  df = df.iloc[:,1:] # Remove time stamp

  #Convert votes to int
  for col in df.columns:
    if col != 'Email Address':
      df[col] = df[col].astype(int)

  #If e-mail is recorded, remove from the df
  df_email = df
  df = df.drop('Email Address', 1)

  df = df.reset_index().iloc[:,1:]

  # print(df)

  return df

In [None]:
def genSankey(df,cat_cols=[],value_cols='',title='Sankey Diagram'):   
  '''
  https://gist.github.com/ken333135/09f8793fff5a6df28558b17e516f91ab
  '''
  # maximum of 6 value cols -> 6 colors
  colorPalette = ['#4B8BBE','#306998','#FFE873','#FFD43B','#646464']
  labelList = []
  colorNumList = []
  for catCol in cat_cols:
    labelListTemp =  list(set(df[catCol].values))
    colorNumList.append(len(labelListTemp))
    labelList = labelList + labelListTemp
        
  # remove duplicates from labelList
  labelList = list(dict.fromkeys(labelList))
    
  # define colors based on number of levels
  colorList = []
  for idx, colorNum in enumerate(colorNumList):
    colorList = colorList + [colorPalette[idx]]*colorNum
        
  # transform df into a source-target pair
  for i in range(len(cat_cols)-1):
    if i==0:
      sourceTargetDf = df[[cat_cols[i],cat_cols[i+1],value_cols]]
      sourceTargetDf.columns = ['source','target','count']
    else:
      tempDf = df[[cat_cols[i],cat_cols[i+1],value_cols]]
      tempDf.columns = ['source','target','count']
      sourceTargetDf = pd.concat([sourceTargetDf,tempDf])
    sourceTargetDf = sourceTargetDf.groupby(['source','target']).agg({'count':'sum'}).reset_index()
        
  # add index for source-target pair
  sourceTargetDf['sourceID'] = sourceTargetDf['source'].apply(lambda x: labelList.index(x))
  sourceTargetDf['targetID'] = sourceTargetDf['target'].apply(lambda x: labelList.index(x))
    
  # creating the sankey diagram
  data = dict(
           type='sankey',
           node = dict(
                    pad = 15,
                    thickness = 20,
                    line = dict(
                             color = "black",
                             width = 0.5
                           ),
                    label = labelList,
                    color = colorList
                 ),
          link = dict(
                   source = sourceTargetDf['sourceID'],
                   target = sourceTargetDf['targetID'],
                   value = sourceTargetDf['count']
                 )
        )
    
  layout =  dict(
              title = title,
              font = dict(
                       size = 10
                     )
             )
       
  fig = dict(data=[data], layout=layout)
  return fig

In [None]:
def plot_sankey(vote_rounds):
  col_rounds = vote_rounds.columns.tolist()
  col_rounds.remove('value')
  df_sankey = vote_rounds.groupby(col_rounds).count().reset_index()
  for col in col_rounds:
    df_sankey[col] = df_sankey[col].apply(str) + str(col)
    #print(df_sankey[col])
  df_sankey

  sankey_title = 'Vote by Ranking'

  sankey_fig = genSankey(df_sankey,cat_cols=col_rounds,value_cols='value',title=sankey_title)
  #plotly.offline.plot(fig, validate=False)

  fig = go.Figure(sankey_fig)
  fig.update_layout(width=int(1200))

  fig.add_annotation(
              x=0,
              y=1.1,
              showarrow= False,
              text="First round")

  fig.add_annotation(
              x=1,
              y=1.1,
              showarrow= False,
              text="Final round")

  fig.show()

In [None]:
def org_sankey_data(election_result):
  vote_rounds = pd.DataFrame()

  cnt = 0
  for r in election_result.rounds:
    vote_rounds[cnt] = list(np.concatenate([list(np.repeat(c.candidate.name, int(c.number_of_votes))) for c in r.candidate_results]).flat)
    cnt += 1
  
  vote_rounds['value'] = [1 for x in range(vote_rounds.shape[0])]

  plot_sankey(vote_rounds)

In [None]:
df = retrieve_google_sheets()

candidate_iterable = map(Candidate, df.columns.to_list())
candidates = list(candidate_iterable)
#print(candidates)

carr = np.array(candidates)

ballot_list = carr[(np.argsort(df.values, axis=1))].tolist()
ballots = [Ballot(ranked_candidates=b) for b in ballot_list]

election_result = pyrankvote.instant_runoff_voting(candidates, ballots)
winners = election_result.get_winners()
print("Ranked-choice / IRV Results:")
print(election_result)

org_sankey_data(election_result)

Ranked-choice / IRV Results:
ROUND 1
Candidate                         Votes  Status
------------------------------  -------  --------
[D. Swinhoe's pheasant]               3  Hopeful
[B. Taiwan blue magpie]               1  Hopeful
[C. Mikado pheasant]                  1  Hopeful
[A. Black-faced spoonbill]            1  Hopeful
[E. Formosan Whistling Thrush]        1  Rejected

ROUND 2
Candidate                         Votes  Status
------------------------------  -------  --------
[D. Swinhoe's pheasant]               3  Hopeful
[C. Mikado pheasant]                  2  Hopeful
[B. Taiwan blue magpie]               1  Rejected
[A. Black-faced spoonbill]            1  Rejected
[E. Formosan Whistling Thrush]        0  Rejected

FINAL RESULT
Candidate                         Votes  Status
------------------------------  -------  --------
[C. Mikado pheasant]                  4  Elected
[D. Swinhoe's pheasant]               3  Rejected
[B. Taiwan blue magpie]               0  Rejected
[A.

In [None]:
# Elections for more than one elected seats/options
# Set numbers of seats to win
#ns = 1
#ns = 2

#election_result = pyrankvote.single_transferable_vote(candidates, ballots, number_of_seats=ns)
#winners = election_result.get_winners()
#print("STV Results with %d spots:"%ns)
#print(election_result)

#election_result = pyrankvote.preferential_block_voting(candidates, ballots, number_of_seats=ns)
#winners = election_result.get_winners()
#print("PBV Results with %d spots:"%ns)
#print(election_result)