# Basic framework
Before converting to a python file for better usage

#### Defining class for election process

In a **Election** of given **number of candidate** and **number of seats**, 

- It will have a **Election setting**, that will be generated from the election

- You will have a **Schema** that takes the **Ballot** type, also the with schema, the **winner can be calculated** given a set of ballots.

- For **Ballots** (Use all ballots here to take advantage of pandas/numpy speed), each **ballot** inside will have a unique **ballot type** 
    - (TODO: ballot type defined manually or autogenerated)

- As Election **Begins**, **Voters** starts to **vote**, they generate either fixed proportion votes or random votes based on some distribution

- A **Counter** will be used to count the **Ballots** based on given **bias**

- You will also have one or more  **Auditor(s)** that will **audit** the result generated

In [None]:
print_variable(*args):
    for arg in args:
        

In [158]:
sample_ballots0 = [
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
    [1, 0],
]

sample_ballots1 = [
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
    [0, 1],
]

sample_ballots2 = [
    [0, 1],
    [0, 1],
    [0, 1],
    [1, 1],
    [1, 1],
    [1, 1],
    [1, 1],
    [1, 1],
    [1, 1],
]

sample_parameter0 = [
    0.5, 0.5
]

sample_parameter1 = [
    0.6, 0.4
]

sample_parameter2 = [
    0.7, 0.3
]

sample_error_matrix0 = [
    [0.9, 0.1],
    [0.1, 0.9]
]

sample_error_matrix1 = [
    [0.1, 0.9],
    [0.9, 0.1]
]

sample_error_matrix2 = [
    [0.5, 0.5],
    [0.5, 0.5]
]

In [283]:
import abc
import numpy as np
from numpy.random import choice
import scipy as sp
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import mpmath as mm
from sklearn.preprocessing import normalize

def random_choice_prob_index(a, axis=1):
    """
    https://stackoverflow.com/questions/47722005/vectorizing-numpy-random-choice-
    for-given-2d-array-of-probabilities-along-an-a
    A faster version of choice for a matrix of probabilities
    
    Args:
        a (np.array): Array of probabilities to use for generating
        axis (int): 0 to indicate each row is a parameter set, 1 for column
    """
    r = np.expand_dims(np.random.rand(a.shape[1-axis]), axis=axis)
    return (a.cumsum(axis=axis) > r).argmax(axis=axis)

def index_to_mask(array):
    if isinstance(array, pd.DataFrame):
        array = array.values
    mask = np.zeros(shape=array.shape)
    for row, values in enumerate(array):
        mask[row][values] = True
    return mask
        

class Schema(abc.ABC):
    """
    Schema is a class that defines the behavior of the election outcome. 
    And controls given individual ballot generation.
    """
    def __init__(self, num_candidates=2, num_votes=1, num_seats=1):
        """
        Schema is a class that defines basic property of an election, it
        also controls how the winner is generated
        
        Args:
            num_candidates (int): The number of candidates joining the election
            num_votes (int): The maximum number of votes one can cast before
            num_seats (int): Number of candidates elected after the election
        """
        self._num_candidates = num_candidates
        # Set num_votes to -1 to allow unlimited number vote (For preferential Voting)
        if num_votes == -1:
            num_votes = num_candidates
        self._num_votes = num_votes
        self._num_seats = num_seats
        
        """TODO generate ballot table for generating error!"""
        self._ballot_table = None
        
    def create_ballot_list(self):
        """
        Create the ballot poll for election
        
        Returns:
            Ballots: An empty ballot poll of Ballots
        """
        return Ballots(self)
    
    @abc.abstractmethod
    def get_winners(self, ballot_list, ranking=False):
        """
        returns winner of the contest given the ballot_list of Ballots created by
        this instance of Schema
        
        Args:
            ballot_list (Ballots): a Ballots object that must be directly created by
                this schema instance
            ranking (bool): If the function should also return a winner with their ranking
        Returns:
            np.array: Array indicating the winner(s) of the election (Top k args)
            pd.DataFrame: DataFrame detailing the candidates and their votes (sorted).
        """
        pass
    
    @property
    def num_votes(self):
        return self._num_votes
    
    @property
    def num_candidates(self):
        return self._num_candidates
    
    @property
    def num_seats(self):
        return self._num_seats
    
class BulletSchema(Schema):
    """
    Bullet Schema is a schema under which all ballots only have True or False associated
    But no preference numbers
    """
    def create_ballot_list(self):
        """
        Create and returns a BulletBallot poll
        
        Returns:
            BulletBallots: An empty ballot poll of BulletBallots
        """
        return BulletBallots(self)
    
    def get_winners(self, ballot_list, ranking=False):
        """
        Get the winner based on plurality voting rule, disregard if the ballot poll is the
        real poll or counted poll
        """
        assert ballot_list.schema is self, "Ballot poll must match schema"
        ans = []
        # Select only valid ballots
        ballots = ballot_list.ballots[ballot_list.validate_ballots()]
        ballots_count = ballots.sum(axis=0)
        order = ballots_count.argsort()[::-1]
        results = pd.Series(False, index=np.arange(self._num_candidates))
        results[order[:self._num_seats]] = True
        ans.append(results)
        if ranking:
            ranking = [(i, ballots_count.loc[i]) for i in order.values]
            ranking = pd.DataFrame(ranking, columns=["candidate", "votes"])
            ans.append(ranking)
        return ans
    
class PluralitySchema(BulletSchema):
    def __init__(self, num_candidates=2, num_seats=1):
        super().__init__(num_candidates=num_candidates, num_seats=num_seats)


class AbstractSchemaEntity(abc.ABC):
    """
    Based Election schema related entity
    """
    def __init__(self, schema):
        self._schema = schema
        
    @property
    def schema(self):
        return self._schema

#     def check_wrapper_function(check_fn, assert_message):
#         def check_wrapper(function):
#             def checked_function(*args, **kwargs):
#                 assert check_fn, assert_message
#             return checked_function
#         return check_wrapper

    
class Ballots(AbstractSchemaEntity, abc.ABC):
    """
    Defines the type of Ballots as well as storing ballots in the class
    """
    def __init__(self, *args, **kwargs):
        """
        Instantiate a basic ballot type
        """
        super().__init__(*args, **kwargs)
        schema = self.schema
        self._num_candidates = schema.num_candidates 
        self._num_votes = schema.num_votes
        self._ballots = pd.DataFrame(columns=np.arange(self._num_candidates))
        
    def _check_ballots(self, ballots):
        """
        Check if the ballot to be added have the same number of entries 
        as the specified number of candidates
        
        Args:
            ballots (2-D Matrix): list of ballots to check if they meet requirements
        
        Returns:
            bool: if they meet requirement or not
        """
        if isinstance(ballots, pd.DataFrame):
            ballots = ballots.reindex(sorted(ballots.columns), axis=1)
        
        ballots = np.array(ballots)
        if len(ballots.shape) == 2 and ballots.shape[1] == self._num_candidates:
            return True
        return False
        
    def add_ballots(self, ballots):
        """
        Check if the ballots meets requirements, and then add them  to this poll
        
        Args:
            ballots (2-D Matrix): The array of ballots to add to poll
        """
        assert self._check_ballots(ballots), "Dimensions Must agree to add ballots"
        self._ballots = self._ballots.append(pd.DataFrame(ballots, 
                                                           columns=np.arange(self._num_candidates)),
                                             ignore_index=True)
    @abc.abstractmethod
    def validate_ballots(self, ballots=None):
        """
        Takes an array of ballots (the ballots in own poll if not specified),
        and return an array of validity of the votes
        
        Args:
            ballots (2-D Matrix): Array of ballots to examine (Default to None)
        
        Returns:
            np.array: Array of boolean values indicating if the row is valid
        """
        pass
        
    @property
    def ballots(self):
        # Return a copy of ballots to make sure no inplace change can be done
        return self._ballots.copy()

        
class BulletBallots(Ballots):
    """
    Concrete bullet type voting ballot poll
    """
    def add_ballots(self, ballots):
        super().add_ballots(ballots)
        # First add ballots then turn to a boolean dataframe
        self._ballots = self._ballots.astype(bool)
        
    def validate_ballots(self, ballots=None):
        if ballots:
            return ballots.astype(bool).sum(axis=1) <= self._num_votes
        return self._ballots.sum(axis=1) <= self._num_votes
    
    
class PluralityBallots(BulletBallots):
    pass
        

        
class Voter(AbstractSchemaEntity, abc.ABC):
    """
    Voter class votes for the elction by using the Ballots class generate ballot 
    (By first internally generate ballots then pass to Ballots)
    """
    
    def __init__(self, total, **kwargs):
        """
        Initialise Voter with a given schema and an indicated voter number
        
        Args:
            schema (Schema): The schma the voter has to obey
            total (int): Total number of voter (indicative only)
        """
        super().__init__(**kwargs)
        self._total = total    # Total number of voter in the voting system, 
                               # (not have to be this many)
        
    @abc.abstractmethod
    def vote(self, n=None, *args, **kwargs):
        """
        Takes the number of votes to generate and special kwargs to generate
        votes with different style
        
        Args:
            n (int): The nuber of votes to generate (Defaults to None, 
                voter number specified in init)
            kwargs: Concrete voter class specific arguments to use for
                generating votes
        Returns:
            np.array: array of votes
        """
        pass
    
    
class BulletVoter(Voter):
    """
    Bullet Voter class votes in Multinominal Distribution, and the votes don't have preference with them
    """
    vote_options = [
        "ratio",         # Specifies ratio of votes to each candidate (not implemented)
        "parameter",     # Specifies the parameter to Multinominal Distribution (not implemented)
        "vote",          # Specifies the exact type of vote the vote will vote (eg. [True, False]) 
    ]

    def vote(self, n=None, *args, **kwargs):
        """
        Vote based on Bullet Vote schema
        
        Args:
            ratio (list-like): Ratio of votes each candidates should get
            parameter (list-like): Probability parameter for categorial distribution to generate votes
            vote (list-like): Specific type of votes to cast
        """
        if n is None:
            n = self._total
        if "vote" in kwargs.keys():
            return np.array([kwargs["vote"]] * n)
        elif "parameter" in kwargs.keys():
            # TODO currently only handles plurality voting. Should be modified to handle bullet voting
            mask = np.array(choice(self.schema.num_candidates, 
                                   n*self.schema.num_votes, 
                                   p=kwargs["parameter"])).reshape(-1, 1)
            output = np.zeros(shape=[n, self.schema.num_candidates])
            for candidate in range(self.schema._num_candidates):
                output[(mask==candidate).reshape(-1), candidate] = True
            return output
        else:
            raise NotImplementedError()
            
class PluralityVoter(BulletVoter):
    """
    Plurality Voter
    """
    pass
            
            
class Counter(AbstractSchemaEntity, abc.ABC):
    """
    Counter is responsible for counting the ballots givent the 'real Ballots', It will have
    a prespecified error type and distribution
    """
    
    # Types of error_matrix accepted
    error_matrix_types = [
        "ballots",       # as defined in [A Bayesian Method for Auditing]
        "candidates"     # Generalisation of most simple Error type! for each 
                         # vote casted random allocate based on the candidate level error matrix
    ]
    
    def __init__(self, error_matrix, error_matrix_type, *args, **kwargs):
        """
        A counter is responsible for intepreting the real ballot list(poll), based on it's bias
        The bias is given by error_matrix which must be either "ballots" or "candidates" types
        of error
        
        Args:
            schema (Schema): The schema used for the election
            error_matrix (2-D Matrix): the content of error matrix
            error_matrix_type: The type of error matrix given
        """
        super().__init__(*args, **kwargs)
        if not isinstance(error_matrix, np.ndarray):
            error_matrix = np.array(error_matrix)
        # Normalise error matrix if the error is not normalised
        if any(error_matrix.sum(axis=1) != 1):
            # Average over all matrix row values
            error_matrix = normalize(error_matrix, axis=1, norm="l1")
        self._error_matrix = error_matrix 
        self._error_matrix_type = error_matrix_type
        assert error_matrix_type in self.error_matrix_types, \
                f"Type must be in list of error matrix types {self.error_matrix_types}"
        
    @abc.abstractmethod
    def _check_ballots_error_matrix(self):
        """
        Sanity check for the error matrix based on ballot level errors
        This should be implemented for different schema
        
        Returns:
            bool: if the matrix is a valid ballots error matrix
        """
        pass
    
    def _check_candidates_error_matrix(self):
        """
        Sanity check for the error matrix based on candidate level errors
        
        Returns:
            bool: if the matrix is a valid candidates error matrix
        """
        height = len(self._error_matrix)
        width = len(self._error_matrix[0])
        return height == width and height == schema.num_candidates

    @abc.abstractmethod
    def count(self, ballot_list, filter_invalid=True):
        """
        count should take a ballot poll, and return a counted ballot poll, based on the given error matrix
        
        Args:
            ballot_list (Ballots): A ballot object to count with
            filter_invalid (bool): whether invalid votes in ballot_list should be filtered before counting.
        """
        pass

    
class BulletCounter(Counter):    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # TODO Check matrix ensure integraty
        if self._error_matrix_type == "ballots":
            assert self._check_ballots_error_matrix(), "Ballot error matrix is invalid"
        elif self._error_matrix_type == "candiates":
            assert self._check_candidates_error_matrix(), "Candidate error matrix is invalid"
    
    def count(self, ballot_list, filter_invalid=True):
        # Currently only applicable to one plurality voting (one votes per voter)
        if self._error_matrix_type == "ballot":
            raise NotImplementedError("Ballot error not yet implemented")
        elif self._error_matrix_type == "candidates":
            return self.candidate_level_error(ballot_list, filter_invalid)
        
    def _check_ballots_error_matrix(self):
        raise NotImplementedError("_check_ballots_error_matrix is not implemented")
    
    def ballot_level_error(self, ballot_list, filter_invalid=True):
        raise NotImplementedError("ballot_level_error is not implemented")
        
    def candidate_level_error(self, ballot_list, filter_invalid=True):
        """
        created counter candidate level ballot list
        """
        ballots = ballot_list.ballots
        shape = ballots.shape
        num_ballots = shape[0]
        num_candidates = shape[1]
        
        validity = ballot_list.validate_ballots().values
        validity = np.tile(validity.reshape(-1, 1), num_candidates)
        counted_ballots = pd.DataFrame(False, columns=np.arange(num_candidates), 
                                       index=np.arange(num_ballots))
        new_ballot_list = self.schema.create_ballot_list()
        random_values = np.array([random_choice_prob_index(self._error_matrix) 
                                  for i in range(num_ballots)])
        for i in range(num_candidates):
            mask = random_values == i
            # print(mask, ballots, validity)
            if filter_invalid:
                column_value = np.any(mask & ballots & validity, axis=1)
            else:
                column_value = np.any(mask & ballots, axis=1)
            counted_ballots.iloc[:, i] = column_value
        new_ballot_list.add_ballots(counted_ballots)
        return new_ballot_list
    

class PluralityCounter(BulletCounter):
    """
    Suffcient for use
    """
    pass
    
    
class Auditor(AbstractSchemaEntity, abc.ABC):
    """
    Auditor audits the election by sampling from a given poll and perform
    a statistical test on the polled ballots
    """    
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._samples = self.schema.create_ballot_list()
        # Which sample are we up to?
        self._sample_index = 0
        self._ready = False
    
    def setup(self, real_ballot_list, counted_ballot_list):
        """
        Setup the audit process by storing ballots
            Setup the audit process by storing ballots
        
        Args:
            real_ballot_list (Ballots): The real ballot list to sample 
            counted_ballot_list (Ballots): The counter counted ballot list to audit
        """
        assert real_ballot_list.schema is self.schema
        # The audit is now ready
        self._ready = True
        # The actual list of ballots
        self._real_ballot_list = real_ballot_list
        # Records if the real ballot has been sampled
        self._real_ballot_sampled = np.zeros([self.real_ballot_list.ballots.shape[0]])
        # The reported list of ballots
        self._counted_ballot_list = counted_ballot_list
        # Who is elected?
        self._real_seats = self.schema.get_winners(self.real_ballot_list)[0]
        self._elected_seats = self.schema.get_winners(self.counted_ballot_list)[0] 
    
    def _check_ready(self):
        """
        Function wrapper for checking if audit is ready
        """
        return self._ready
    
    def sample(self, n=1, *args, **kwargs):
        """
        Take more sample randomly from the real ballots
        
        Args:
            n (int): number of ballots to sample
        """
        sample_index = choice(self.real_ballot_list.ballots.index, n, *args, **kwargs)
        # Add the sampled ballots to the pool
        self._samples.add_ballots(self.real_ballot_list.ballots.loc[sample_index, :])
        # Records which index has been sampled
        self._real_ballot_sampled[sample_index] = True
        
    @abc.abstractmethod
    def fit_samples(self):
        """
        Fit the newly sampled samples to possibly the auditing parameters
        """
        pass
    
    @abc.abstractmethod
    def audit(self):
        """
        Start auditing if the audit is already set up.
        """
        assert self._check_ready, "The audit is not yet ready (Need to call setup)"
        pass
   
    @property
    def real_ballot_list(self):
        return self._real_ballot_list
    
    @property
    def counted_ballot_list(self):
        return self._counted_ballot_list
    
    @property
    def samples(self):
        return self._samples
    
    @property
    def real_seats(self):
        return self._real_seats
    
    @property
    def elected_seats(self):
        return self._elected_seats
               
class BayesianAuditor(Auditor):
    """
    Bayesian Auditor carries out bayesian audit for the given election output
    """
    pass

class PluralityBayesianAuditor(BayesianAuditor):
    """
    Special Bayesian Auditor for plurality voting case"""
    pass

class PluralityBallotPollingBayesianUrnModelVoter(PluralityVoter):
    """
    This class is delegated the job of simulating samples baesd on sampled ballots
    """
    def vote(self, n=None, samples=None, prior=None, **kwargs):
        assert n is not None and samples is not None, "Need to provide all parameters"
        if isinstance(samples, Ballots):
            samples = samples.ballots
        self._urn = samples.copy()
        if not prior:
            prior = np.identity(self.schema.num_candidates)
        self._urn = self._urn.append(pd.DataFrame(data=prior, columns=self._urn.columns))
        self._additional = pd.DataFrame(columns=self._urn.columns)
        self._fill_urn(n=n)
        return self._additional
        
         
    def _fill_urn(self, n):
        for i in range(n):
            draw_index = choice(self._urn.index)
            drawn = self._urn.loc[draw_index]
            self._urn = self._urn.append(drawn, ignore_index=True)
            self._additional = self._additional.append(drawn, ignore_index=True)

In [282]:
total_voter = 2000
total_sample = 100

# bullet_schema = BulletSchema()
plurality_schema = PluralitySchema()
# bullet_ballot_list = bullet_schema.create_ballot_list()
plurality_ballot_list = plurality_schema.create_ballot_list()
# bullet_voter = BulletVoter(schema=bullet_schema, total=50)
plurality_voter = PluralityVoter(schema=plurality_schema, total=200)
# sample_ballots = bullet_voter.vote(parameter=sample_parameter1)
plurality_real_ballots = plurality_voter.vote(parameter=sample_parameter1)
# bullet_ballot_list.add_ballots(sample_ballots)
plurality_ballot_list.add_ballots(plurality_real_ballots)

# bullet_counter = BulletCounter(sample_error_matrix1, error_matrix_type="candidates", schema=bullet_schema)
plurality_counter = PluralityCounter(sample_error_matrix1, error_matrix_type="candidates", schema=bullet_schema)
# counted_ballots = bullet_counter.count(bullet_ballot_list)
counted_ballots = plurality_counter.count(plurality_ballot_list)
plurality_sample_voter = PluralityBallotPollingBayesianUrnModelVoter(total=total_voter, schema=plurality_schema)

plurality_sample_ballots = plurality_schema.create_ballot_list()
plurality_sample_ballots.add_ballots(plurality_real_ballots[:total_sample])

simulated_votes = plurality_sample_voter.vote(n=total_voter-total_sample, samples=plurality_sample_ballots)
simulated_list = plurality_schema.create_ballot_list()
simulated_list.add_ballots(simulated_votes)
print(plurality_schema.get_winners(simulated_list))
print(plurality_schema.get_winners(plurality_ballot_list))

[0     True
1    False
dtype: bool]
[0     True
1    False
dtype: bool]


In [4]:
class Election():
    """
    Defines the election to run a simulation on. The election need a set of pre-specified values to start.
    If no default arguments are given, the election will start simulating a simple two candidate plurality
    contest.
    
    Attributes:
        schema:
    
    """
    def __init__(self, schema=None, num_candidate=2, num_elected=1, social=None):
        self._schema = schema
        self._num_candidate = num_candidate
        self._num_elected = num_elected
        self._social = social
        
        
        
        
        