## The Functions of the Discrete Voter Model

This notebook implements the following $6$ subroutines for the **discrete voter model**:
1. `make_grid`: create a probabilistic grid: an $\mathbf{R}^n$ array of values, $\omega$, that all sum to $1$
2. `shift_weight`: shift weight in a grid in a reversible way
3. `expec_votes`: given a grid, a candidate, and a district description, output the expectation of votes that candidate received
4. `prob_votes`: given a grid, a candidate, a district description, and the observed vote outcomes, output the probability of seeing that outcome
5. `mcmc`: run a Markov Chain Monte Carlo method on a state space of grids
6. `hill_climb`: optimize the expectation or probability with gradient descent

### `make_grid`
Create a probabilistic grid: an $\mathbf{R}^n$ array of values, $\omega$, that all sum to $1$.

In [224]:
import numpy as np
from operator import mul
import functools
import random

In [14]:
def make_grid(num_groups, matrix_size, random=True):
    """
    Create a probabilistic grid.
    
    num_groups (int): the number of groups to be represented 
    matrix_size (int): the dimensions of the matrix
    random (bool): whether the grid should be initialized uniformly
    or randomly
    
    return: a probabilistic grid
    """
    shape = tuple([matrix_size] * num_groups)
    if random:
        matrix = np.random.rand(*shape)
    else:
        matrix = np.ones(shape)
    return matrix / matrix.sum()

In [62]:
test_grid = make_grid(3, 10)

### `shift_weight`
Shift weight in a grid in a reversible way.

In [232]:
def shift_weight(grid, shift_type="uniform", epsilon=0.01):
    """
    Shift the weight in a probabilistic grid.
    
    grid (NumPy array): the probabilistic grid to be perturbed
    shift_type (string): the type of shift to be done. One of:
        1. uniform (default): add a uniform random variable to each cell then re-normalize
        2. shuffle: shuffle the matrix (O(n))
        3. right: shift epsilon of weight to the right
        4. left: shift epsilon of weight to the left
    epsilon (float): the value to laterally shift the grid by
    
    return: a probabilistic grid
    """
    if shift_type == "shuffle":
        np.random.shuffle(grid)
        return grid
    elif shift_type == "right":
        rolled = np.roll(grid, 1, axis=1)
        return grid + (rolled * epsilon) - (grid * epsilon)
    elif shift_type == "left":
        rolled = np.roll(grid, -1, axis=1)
        return grid + (rolled * epsilon) - (grid * epsilon)
    else:
        new_grid = grid + np.random.uniform()
        return new_grid / new_grid.sum()

### `expec_votes`
Given a grid, a candidate, and a district description, output the expectation of votes that candidate received.

In [203]:
def get_vote_outcome(flat_index, grid, demo, print_stats=False):
    """
    Find the vote outcome of a given election
    for a candidate, with a given probabilistic
    grid.
    
    flat_index (int): the flat index of the selected cell
    grid (NumPy array): the probabilistic grid for the precinct
    and candidate
    demo (dict): the demographics of the district
    print_stats (boolean): whether to print the statistics
    
    return: the expectation of the vote outcome for that cell
    """
    # Find the corresponding index
    index = np.unravel_index(flat_index, grid.shape)
    matrix_dim = grid.shape[0]
    
    if print_stats:
        print(f"The index is {index}.")
    
    # Calculate the vote outcomes given the cell selected
    
    vote_outcome = np.zeros(len(demo))
    
    for num, group in enumerate(demo):
        # Find the probabilities the cell represents for each group
        pct = index[num] / matrix_dim
        
        if print_stats:
            print(f"{int(pct * 100)}% of the {group} population voted for this candidate.")
            
        vote_outcome[num] += demo[group] * pct
        
    return np.sum(vote_outcome)

In [204]:
def expec_votes(grid, demo, print_all_stats=False):
    """
    Find the expectation of the vote outcome
    for a candidate, with a given probabilistic
    grid.
    
    grid (NumPy array): the probabilistic grid for the precinct
    and candidate
    demo (dict): the demographics of the district
    
    return: the expectation for the vote outcomes over the grid
    """
    probs = grid.flatten()
    outcomes = []
    for flat_index, prob in enumerate(probs):
        # Set up printing
        print_stats = False
        if flat_index % 10 == 0 and print_all_stats:
            print_stats = True
        outcomes.append(get_vote_outcome(flat_index, grid, demo, print_stats))
        
    return np.average(outcomes, weights=probs)

In [109]:
test_demo = {"Black": 10, "white": 8, "Latinx": 5}

In [146]:
test_flat_index = np.random.choice(range(test_grid.size), p=test_grid.flatten())

In [147]:
get_vote_outcome(test_flat_index, test_grid, test_demo)

15.0

In [148]:
expec_votes(test_grid, test_demo)

10.447431757252199

### `prob_votes`
Given a grid, a candidate, a district description, and the observed vote outcomes, output the probability of seeing that outcome.

In [205]:
def integer_partition(n, k, min_size=1):
    """
    Partition an integer.
    
    n (int): the integer to partition
    k (int): the number of elements in a partition
    min_size (int): the minimum size of an element
    in the partition
    
    return: a generator of partitions as tuples
    """
    if k < 1:
        return
    if k == 1:
        if n >= min_size:
            yield (n,)
        return
    for i in range(min_size, n // k + 1):
        for result in integer_partition(n - i, k - 1, i):
            yield (i,) + result

In [133]:
for partition in integer_partition(5, 3):
    print(partition)

(1, 1, 3)
(1, 2, 2)


In [189]:
def get_vote_probability(flat_index, grid, demo, observed):
    """
    Find the probability of a grid's cell producing a
    vote outcome of a given election for a candidate, 
    with a given probabilistic grid.
    
    flat_index (int): the flat index of the selected cell
    grid (NumPy array): the probabilistic grid for the precinct
    and candidate
    demo (dict): the demographics of the district
    observed (int): the number of votes the candidate
    received in the election
    
    return: the probability that a cell produced the observed outcome
    """
    # Find the corresponding index
    index = np.unravel_index(flat_index, grid.shape)
    matrix_dim = grid.shape[0]
    
    # Find the vote percentages for each demographic group
    vote_pcts = [index[num] / matrix_dim for num, group in enumerate(demo)]
    
    # Find the probability of the outcome
    total_prob = 0
    observed_factorial = np.math.factorial(observed)
    
    # Go through the possible partitions of the vote outcome, by race
    for partition in integer_partition(observed, len(demo)):
        prob = 1
        # Find the probability of seeing that outcome
        for num, group in enumerate(demo):
            prob *= (vote_pcts[num] ** partition[num]) * ((1 - vote_pcts[num]) ** (demo[group] - partition[num]))
        
        factorial_list = np.asarray([np.math.factorial(votes) for votes in partition])
        coefficient = observed_factorial / np.prod(factorial_list)

        total_prob += prob * coefficient
        
    return total_prob

In [171]:
test_observed = 10

In [190]:
get_vote_probability(test_flat_index, test_grid, test_demo, test_observed)

0.0007289680406400011

In [191]:
def prob_votes(grid, demo, observed):
    """
    Find the probability that a grid produced
    the observed number of votes that a candidate
    received in a given election, with a given
    probabilistic grid.
    
    grid (NumPy array): the probabilistic grid for the precinct
    and candidate
    demo (dict): the demographics of the district
    observed (int): the observed number of votes the candidate received
    
    return: the probability that a grid produced the observed outcomes
    """
    probs = grid.flatten()
    grid_prob = 0
    for flat_index, prob in enumerate(probs):
        vote_prob = get_vote_probability(flat_index, grid, demo, observed)
        grid_prob += vote_prob * prob
        
    return grid_prob

In [200]:
prob_votes(test_grid, test_demo, test_observed)

0.5336203195517829

### `mcmc`
Run a Markov Chain Monte Carlo method on a state space of grids.

In [240]:
def metropolis_hastings(n_iter, initial_grid, observed_votes, demo, 
                        update_type='uniform', scoring_type='prob', 
                        burn=0.1, epsilon=0.01):
    """
    Run the Metropolis-Hastings MCMC algorithm to sample the space
    of probabilistic demographic grids in the discrete
    voter model.
    
    n_iter (int): the number of iterations to run
    initial_grid (NumPy array): the probabilistic grid to start with
    observed_votes (int): the number of votes a candidate got in an election
    demo (dict): the demographics of the district
    update_type (string): the type of update to apply to the grid. One of:
        1. uniform (default): add a uniform random variable to each cell then re-normalize
        2. shuffle: shuffle the matrix (O(n))
        3. right: shift epsilon of weight to the right
        4. left: shift epsilon of weight to the left
    scoring_type (string): the type of scoring to use. One of:
        1. prob (default): score by the probability of a grid to produce the outcome
        2. expec: score by the difference in the outcome and the expectation of a grid
    burn (float): the fraction of the first samples to discard
    epsilon (float): the value to laterally shift the grid by
    
    return: a dictionary of the best scoring grid, the highest score it received,
    and a list of all the grids explored and their scores
    """
    grid = initial_grid
    current_score = 0
    results = {'best_grid': grid, 
               'best_score': current_score, 
               'all_grids': [grid], 
               'all_scores': [current_score]}
    
    # Iterate
    for _ in range(n_iter):
        # Generate a candidate
        candidate = shift_weight(grid, shift_type=update_type, epsilon=epsilon)
        
        # Score the candidate and accept or reject
        if scoring_type == 'prob':
            score = prob_votes(candidate, demo, observed_votes)
            
        else:
            expectation = expec_votes(candidate, demo)
            score = abs(observed_votes - expectation)
        
        # Accept if higher than the current score, or with that probability
        # if lower, and implicitly reject
        if score >= current_score or (score / current_score) > random.uniform(0, 1):
            grid = candidate
            results['all_grids'].append(grid)
            results['all_scores'].append(score)
            # Check the best score
            if score >= results['best_score']:
                results['best_score'] = score
                results['best_grid'] = grid
    
    return results

In [258]:
test_results = metropolis_hastings(100, test_grid, test_observed, test_demo, scoring_type='expec')

In [259]:
test_results['best_score']

0.35010762879240964