In [None]:
import random
import numpy as np
from src.utils import noise_bal, project_FPV
from src.intervals import Wald_CI

CVR = [np.array([1,2,-1], dtype=np.int8)]*700 + [np.array([2,-1,-1], dtype=np.int8)] * 770 + [np.array([3,-1,-1], dtype=np.int8)] * 880 + [np.array([4,1,-1], dtype=np.int8)] * 350 + [np.array([5,1,-1], dtype=np.int8)] * 300 + [np.array([-1,-1,-1], dtype=np.int8)] * 100
# make one large np array where each row is a cvr
CVR = np.array(CVR)

def noise_cvr_array(CVR, candidates, exhaust_sentinel=-1, noise_level=0.05):
    """
    Create a noised version of the CVR array following the same rules as noise_bal.
    
    Args:
        CVR: numpy array where each row is a ballot
        candidates: set of valid candidate IDs
        exhaust_sentinel: value representing exhausted/non-candidate (-1)
        noise_level: fraction of ballots to noise
    
    Returns:
        noised_BAL: numpy array with same shape as CVR but with noised ballots
    """
    noised_BAL = CVR.copy()
    n_ballots = len(CVR)
    n_to_noise = int(noise_level * n_ballots)
    
    # Randomly select indices to noise
    noised_indices = random.sample(range(n_ballots), k=n_to_noise)
    
    for idx in noised_indices:
        ballot = CVR[idx].copy()
        noised_BAL[idx] = noise_single_cvr(ballot, candidates, exhaust_sentinel)
    
    return noised_BAL

def noise_single_cvr(ballot, candidates, exhaust_sentinel=-1):
    """
    Apply noise to a single CVR ballot following the original noise_bal logic.
    """
    # Convert ballot to list of non-exhaust candidates (equivalent to the old tuple format)
    active_positions = ballot != exhaust_sentinel
    if not np.any(active_positions):
        # All exhausted ballot - treat as empty
        bal_list = []
    else:
        bal_list = ballot[active_positions].tolist()
    
    # Check if this is an "exhausted" ballot (equivalent to old (6,) case)
    if len(bal_list) == 0 or (len(bal_list) == 1 and bal_list[0] not in candidates):
        return random.choice([
            np.array([1, 2, exhaust_sentinel], dtype=ballot.dtype),
            np.array([2, exhaust_sentinel, exhaust_sentinel], dtype=ballot.dtype),
            np.array([3, exhaust_sentinel, exhaust_sentinel], dtype=ballot.dtype),
            np.array([4, 1, exhaust_sentinel], dtype=ballot.dtype),
            np.array([5, 1, exhaust_sentinel], dtype=ballot.dtype)
        ])
    
    noise_type = random.choice([1, 2, 3, 4, 5])
    
    if noise_type == 2:  # delete ranking
        if len(bal_list) == 1:
            noise_type = 1
        else:
            del_pos = random.randint(0, len(bal_list) - 1)
            del bal_list[del_pos]
    
    if noise_type == 1:  # insert ranking
        available_cands = [c for c in candidates if c not in bal_list]
        if available_cands:  # only insert if there are candidates available
            cand_to_insert = random.choice(available_cands)
            insert_pos = random.randint(0, len(bal_list))
            bal_list.insert(insert_pos, cand_to_insert)
    
    if noise_type == 3:  # permute two rankings
        if len(bal_list) == 1:
            noise_type = 4
        else:
            indices = random.sample(range(len(bal_list)), 2)
            bal_list[indices[0]], bal_list[indices[1]] = bal_list[indices[1]], bal_list[indices[0]]
    
    if noise_type == 4:  # switch one ranking with another non-ranked candidate
        available_cands = [c for c in candidates if c not in bal_list]
        if available_cands and len(bal_list) > 0:
            cand_to_switch_in = random.choice(available_cands)
            switch_pos = random.randint(0, len(bal_list) - 1)
            bal_list[switch_pos] = cand_to_switch_in
    
    if noise_type == 5:  # become "exhausted" (equivalent to old (6,))
        # Create a ballot with one invalid candidate
        result = np.full(ballot.shape, exhaust_sentinel, dtype=ballot.dtype)
        result[0] = max(candidates) + 1  # Use a candidate ID not in the race
        return result
    
    # Convert back to array format
    result = np.full(ballot.shape, exhaust_sentinel, dtype=ballot.dtype)
    for i, cand in enumerate(bal_list[:len(ballot)]):
        result[i] = cand
    
    return result


In [275]:
from statistics import NormalDist
import math

BAL = noise_cvr_array(CVR, candidates={1,2,3,4,5}, exhaust_sentinel=-1, noise_level=0.05)

N = len(BAL)
n = N//10
# randomly sample n ballots from BAL and CVR, make them into a new array
sample_indices = random.sample(range(N), k=n)
BAL_sample = BAL[sample_indices]
CVR_sample = CVR[sample_indices]

def project_sample(sample_array, elected, hopeful, exhaust_sentinel=-1):
    """
    Project the BAL and CVR samples onto the space defined by the elected and hopeful candidates.
    Also compacts non-sentinel entries to the front of each row.
    """
    # mask all entries in the samples that are not in elected or hopeful with a -1
    valid_candidates = elected.union(hopeful)
    valid_candidates = np.array(list(valid_candidates), dtype=np.int8)
    projected_sample = np.where(np.isin(sample_array, valid_candidates), sample_array, exhaust_sentinel)
    
    # Compact non-sentinel entries to the front of each row
    compacted_sample = np.full_like(projected_sample, exhaust_sentinel)
    
    for i in range(projected_sample.shape[0]):
        row = projected_sample[i]
        non_sentinel_mask = row != exhaust_sentinel
        non_sentinel_values = row[non_sentinel_mask]
        compacted_sample[i, :len(non_sentinel_values)] = non_sentinel_values
    
    return compacted_sample

def deg0_sampler(_BAL, _CVR, hopeful, exhaust_sentinel=-1):
    hopeful_with_sentinel = hopeful.union({exhaust_sentinel})
    projected_BAL = project_sample(_BAL, elected=set(), hopeful=hopeful, exhaust_sentinel=exhaust_sentinel)
    projected_CVR = project_sample(_CVR, elected=set(), hopeful=hopeful, exhaust_sentinel=exhaust_sentinel)
    BAL_FPV = projected_BAL[:, 0]
    CVR_FPV = projected_CVR[:, 0]
    discrepant_mask = BAL_FPV != CVR_FPV
    discrepant_idx = np.where(discrepant_mask)[0]
    
    pi_samples = {i: np.zeros(len(BAL_FPV), dtype=np.int8) for i in hopeful_with_sentinel}
    
    for candidate in hopeful_with_sentinel:
        # Add 1 where BAL has this candidate and there's a discrepancy
        pi_samples[candidate][discrepant_idx] += (BAL_FPV[discrepant_idx] == candidate).astype(np.int8)
        # Subtract 1 where CVR has this candidate and there's a discrepancy
        pi_samples[candidate][discrepant_idx] -= (CVR_FPV[discrepant_idx] == candidate).astype(np.int8)
    
    return pi_samples

def deg1_sampler(_BAL, _CVR, hopeful, w, exhaust_sentinel=-1):
    hopeful_with_sentinel = hopeful.union({exhaust_sentinel})
    projected_BAL = project_sample(_BAL, elected={w}, hopeful=hopeful, exhaust_sentinel=exhaust_sentinel)
    projected_CVR = project_sample(_CVR, elected={w}, hopeful=hopeful, exhaust_sentinel=exhaust_sentinel)
    BAL_FPV = projected_BAL[:, 0]
    CVR_FPV = projected_CVR[:, 0]
    BAL_SPV = projected_BAL[:, 1]
    CVR_SPV = projected_CVR[:, 1]
    winner_fpv_mask = (BAL_FPV == w) & (CVR_FPV == w)
    discrepant_mask = (BAL_SPV != CVR_SPV) & winner_fpv_mask
    discrepant_idx = np.where(discrepant_mask)[0]

    pi_samples = {i: np.zeros(len(BAL_FPV), dtype=np.int8) for i in hopeful_with_sentinel}

    for candidate in hopeful_with_sentinel:
        # Add 1 where BAL has this candidate and there's a discrepancy
        pi_samples[candidate][discrepant_idx] += (BAL_SPV[discrepant_idx] == candidate).astype(np.int8)
        # Subtract 1 where CVR has this candidate and there's a discrepancy
        pi_samples[candidate][discrepant_idx] -= (CVR_SPV[discrepant_idx] == candidate).astype(np.int8)

    return pi_samples

def cand_to_quota_deg0(deg0_samples, i, sentinel, Ti, Tg, N, m, n, epsilon=1e-6, alpha = 0.05, noise_level=0.05):
    SRSWOR_adjustment = (1 - n/N)/n
    C = Ti- epsilon - (N-Tg)/(m+1)
    M_iq_samples = deg0_samples[i] - deg0_samples[sentinel]/(m+1)
    var_iq = np.var(M_iq_samples, ddof=1)
    # if the variance is zero, use the known upper bound on variance
    if var_iq == 0:
        mu_iq_upper = mu_upper_from_nonzero_indicator(
            M_iq_samples, N, alpha, prior_q_max=1.0 - noise_level
        )
        print("Using nonzero indicator mu upper:", mu_iq_upper)
        return C + N*mu_iq_upper
    var_iq *= SRSWOR_adjustment
    mean_iq = np.mean(M_iq_samples)
    print("Mean iq:", mean_iq)
    z = NormalDist().inv_cdf(1 - alpha/2)
    mu_iq_upper = mean_iq + z * np.sqrt(var_iq)
    mu_iq_lower = mean_iq - z * np.sqrt(var_iq)
    upper_bound = C + N*mu_iq_upper
    lower_bound = C + N*mu_iq_lower
    return lower_bound, upper_bound

def cand_to_cand_deg0(deg0_samples, i, j, Ti, Tj, N, n, alpha = 0.05, noise_level=0.05):
    """
    Audit the upper bound on M_ij = T_i - T_j at degree 0.
    """
    SRSWOR_adjustment = (1 - n/N)/n
    M_ij_samples = deg0_samples[i] - deg0_samples[j]
    print(sum(M_ij_samples))
    var_ij = np.var(M_ij_samples, ddof=1)
    # if the variance is zero, use the known upper bound on variance
    if var_ij == 0:
        mu_ij_upper = mu_upper_from_nonzero_indicator(
            M_ij_samples, N, alpha, prior_q_max=1.0 - noise_level
        )
        print("Using nonzero indicator mu upper:", mu_ij_upper)
        return (Ti - Tj) + N*mu_ij_upper
    var_ij *= SRSWOR_adjustment
    mean_ij = np.mean(M_ij_samples)
    z = NormalDist().inv_cdf(1 - alpha)
    mu_ij_upper = mean_ij + z * np.sqrt(var_ij)
    upper_bound = (Ti - Tj) + N*mu_ij_upper
    return upper_bound

def mu_upper_from_nonzero_indicator(x_sample, N: int, alpha: float = 0.05,
                                    prior_q_max: float | None = None) -> float:
    """
    X in {-2,-1,0,1,2}. Let q = Pr(|X|>0). For any distribution, E[X] <= 2 q.
    Uses Wilson+FPC UB for q (or min with prior_q_max if provided).
    """
    xs = list(x_sample); n = len(xs)
    qhat = sum(1 for x in xs if x != 0) / n
    q_ub = _wilson_upper(qhat, n, N, alpha)
    if prior_q_max is not None:
        q_ub = min(q_ub, float(prior_q_max))
    return min(1.0, max(-1.0, 2.0 * q_ub))  # mu in [-2,2] so mu<=2*q<=2, divide by 2? No: mu<=2*q; then clip to [0,2]; but as a mean of X it's in [-2,2]. Here we return mu upper, so clip to 2 via min(2.0, 2*q_ub). For safety, clamp to [-2,2].

def _wilson_upper(phat: float, n: int, N: int, alpha: float) -> float:
    """
    One-sided Wilson UPPER bound for a binomial proportion under SRSWOR via
    FPC-adjusted score test. Returns UB at level 1-alpha.
    """
    phat = min(1.0, max(0.0, phat))
    z = NormalDist().inv_cdf(1 - alpha)
    a = (z*z) * (1.0 - n / N) / n  # FPC-adjusted z^2/n
    center = phat + 0.5 * a
    rad = math.sqrt(a * phat * (1.0 - phat) + 0.25 * a * a)
    denom = 1.0 + a
    ub = (center + rad) / denom
    return min(1.0, max(0.0, ub))

def serfling_upper_for_mean(xbar: float, n: int, N: int, alpha: float,
                            a: float = -2.0, b: float = 2.0) -> float:
    """
    One-sided Serfling-Hoeffding bound:
      P( xbar - mu >= t ) <= exp( - 2 n t^2 / R^2 * (N - n)/(N - 1) )
    Invert to get mu <= xbar + t.
    """
    R = b - a  # here 4.0
    # guard N>n>=1; if N==n, t=0 (full count, no uncertainty)
    if n >= N: 
        return xbar
    c = (N - n) / (N - 1)  # finite-population tightening
    t = R * math.sqrt( math.log(1.0/alpha) / (2.0 * n / c) )  # same as R*sqrt(c*ln(1/Î±)/(2n))
    return xbar + t

deg0_samples = deg0_sampler(BAL_sample, CVR_sample, hopeful={1,2,3,4,5}, exhaust_sentinel=-1)
SRSWOR_adjustment = (1 - n/len(CVR))/n
cand_to_cand_deg0(deg0_samples, 5, 4, Ti=300, Tj=350, N=len(BAL), n=n, alpha=0.05)

1


np.float64(-12.943156552614973)

In [276]:
def cand_to_cand_deg1(deg0_samples, deg1_samples, w, i, j, Ti, Tj, Twi, Twj, Tg, Tw, Twg, N, m, n,epsilon=1e-6, alpha = 0.05, noise_level=0.05,sentinel=-1):
    SRSWOR_adjustment = (1 - n/N)/n
    C0 = Ti - Tj
    C1 = Twi - Twj
    Cu=(n-(Twg+Tg)+(m+1)*epsilon)/N
    Cv = ((m+1)*Tw-Tg)/N

    mu_u_sample = deg1_samples[sentinel] + deg0_samples[sentinel]
    mu_v_sample = (m+1)*deg0_samples[w] - deg1_samples[sentinel]
    mu0_sample = deg0_samples[i] - deg0_samples[j]
    mu1_sample = deg1_samples[i] - deg1_samples[j]

    mu_u = np.mean(mu_u_sample)
    mu_v = np.mean(mu_v_sample)
    mu0 = np.mean(mu0_sample)
    mu1 = np.mean(mu1_sample)

    s_uu = np.var(mu_u_sample, ddof=1)
    s_vv = np.var(mu_v_sample, ddof=1)
    s_uv = np.cov(mu_u_sample, mu_v_sample, ddof=1)[0][1]
    s_0u = np.cov(mu0_sample, mu_u_sample, ddof=1)[0][1]
    s_0v = np.cov(mu0_sample, mu_v_sample, ddof=1)[0][1]
    s_1u = np.cov(mu1_sample, mu_u_sample, ddof=1)[0][1]
    s_1v = np.cov(mu1_sample, mu_v_sample, ddof=1)[0][1]
    s_00 = np.var(mu0_sample, ddof=1)
    s_11 = np.var(mu1_sample, ddof=1)
    s_01 = np.cov(mu0_sample, mu1_sample, ddof=1)[0][1]

    S_theta = np.array([[s_uu, s_uv, s_0u, s_0v],
                        [s_uv, s_vv, s_0v, s_1v],
                        [s_0u, s_0v, s_00, s_01],
                        [s_1u, s_1v, s_01, s_11]])
    S_theta *= SRSWOR_adjustment

    k_hat = (Cu-mu_u)/(Cv + mu_v)

    grad_T = np.array([N, (1-k_hat)*N, (C1+N * mu1)/(Cv + mu_v), k_hat*(C1+ N * mu1)/(Cv + mu_v)])
    var_T = grad_T @ S_theta @ grad_T.T
    SE_T = math.sqrt(var_T)

    if var_T == 0:
        mu0_upper = mu_upper_from_nonzero_indicator(
            mu0_sample, N, alpha, prior_q_max=1.0 - noise_level
        )
        mu1_upper = mu_upper_from_nonzero_indicator(
            mu1_sample, N, alpha, prior_q_max=1.0 - noise_level
        )
        print("Using nonzero indicator mu upper:", mu0_upper, mu1_upper)
        upper_bound = C0 + (1 - k_hat) * C1 + N * mu0_upper + (1 - k_hat) * N * mu1_upper
        return upper_bound

    z = NormalDist().inv_cdf(1 - alpha)
    upper_bound = C0 + (1 - k_hat) * C1 + N * mu0 + (1 - k_hat) * N * mu1 + z * SE_T
    return upper_bound

For deg 1:

$M_{ij}= T_i - T_j + N\mu_0 + (1-k)(T_{wi} - T_{wj} + N\mu_1)$ where $k =\displaystyle\frac{c_u -\mu_u}{c_v+\mu_v}$.

$\vec{\theta}= (\mu_0,\mu_1, \mu_u,\mu_v)$.

In [277]:
# in step 3a, cand 1 is elected, and 5 is eliminated

deg0_samples = deg0_sampler(BAL_sample, CVR_sample, hopeful={1,2,3,4}, exhaust_sentinel=-1)
deg1_samples = deg1_sampler(BAL_sample, CVR_sample, hopeful={2,3,4}, w=1, exhaust_sentinel=-1)

cand_to_cand_deg1(
    deg0_samples,
    deg1_samples,
    w=1,
    i=4,
    j=3,
    Ti=350,
    Tj=850,
    Twi=0,
    Twj=0,
    Tg=100,
    Tw=1000,
    Twg=300,
    N=len(BAL),
    m=2,
    n=n,
    alpha=0.05,
    noise_level=0.05,
    sentinel=-1
)

np.float64(-400.5218361538305)

In [234]:
cand_to_quota_deg0(deg0_samples, 1, sentinel=-1, Ti=700, Tg=100, N=len(BAL), m=2, n=n, alpha=0.05)

Mean iq: 0.00860215053763441


(np.float64(-306.06150238150957), np.float64(-240.6051662851571))

In [267]:
deg0_samples = deg0_sampler(BAL_sample, CVR_sample, hopeful={1,2,3,4,5}, exhaust_sentinel=-1)
deg1_samples = deg1_sampler(BAL_sample, CVR_sample, hopeful={2,3,4,5}, w=1, exhaust_sentinel=-1)
cand_to_cand_deg0(deg0_samples, 5, 4, Ti=300, Tj=350, N=len(BAL), n=n, alpha=0.05)

Discrepant mask: [False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False False False False False False False False
 False False False False False Fal

np.float64(-11.715256310076406)

In [None]:
test_arr = np.array([[3,1,-1],[4,2,-1]], dtype=np.int8)
test_arr1 = np.array([[3,4,-1],[5,2,-1]], dtype=np.int8)
deg0_sampler(test_arr, test_arr1, hopeful={1,2}, exhaust_sentinel=-1)

{1: array([1, 0], dtype=int8),
 2: array([0, 0], dtype=int8),
 -1: array([-1,  0], dtype=int8)}

In [261]:
FPV_BAL = BAL_sample[:,0]
FPV_CVR = CVR_sample[:,0]
same_fpv = FPV_BAL == FPV_CVR
winner_fpv = same_fpv & (FPV_BAL == 1)
SPV_BAL = BAL_sample[:,1]
SPV_CVR = CVR_sample[:,1]
discrepant_spv = (SPV_BAL != SPV_CVR) & winner_fpv
discrepant_idx = np.where(discrepant_spv)[0]
discrepant_idx

array([124])

In [271]:
deg1_samples[3]

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,

In [268]:
#print any row of CVR_sample not equal to the corresponding row of BAL_sample
for i in range(len(CVR_sample)):
    if not np.array_equal(CVR_sample[i], BAL_sample[i]):
        print("Row", i, "differs:")
        print("BAL:", BAL_sample[i])
        print("CVR:", CVR_sample[i])

Row 10 differs:
BAL: [ 6 -1 -1]
CVR: [ 2 -1 -1]
Row 12 differs:
BAL: [ 1 -1 -1]
CVR: [ 2 -1 -1]
Row 65 differs:
BAL: [ 5  3 -1]
CVR: [ 3 -1 -1]
Row 78 differs:
BAL: [ 6 -1 -1]
CVR: [ 4  1 -1]
Row 79 differs:
BAL: [ 6 -1 -1]
CVR: [ 3 -1 -1]
Row 82 differs:
BAL: [4 3 1]
CVR: [ 4  1 -1]
Row 83 differs:
BAL: [ 4  1 -1]
CVR: [-1 -1 -1]
Row 90 differs:
BAL: [ 6 -1 -1]
CVR: [ 4  1 -1]
Row 125 differs:
BAL: [ 2  1 -1]
CVR: [ 1  2 -1]
Row 129 differs:
BAL: [ 2  1 -1]
CVR: [ 1  2 -1]
Row 136 differs:
BAL: [ 4 -1 -1]
CVR: [ 2 -1 -1]
Row 174 differs:
BAL: [ 2  1 -1]
CVR: [ 1  2 -1]
Row 183 differs:
BAL: [ 6 -1 -1]
CVR: [ 5  1 -1]
Row 195 differs:
BAL: [ 1  3 -1]
CVR: [ 1  2 -1]
Row 235 differs:
BAL: [ 6 -1 -1]
CVR: [ 2 -1 -1]
Row 242 differs:
BAL: [ 3  2 -1]
CVR: [ 1  2 -1]
Row 248 differs:
BAL: [ 2  1 -1]
CVR: [ 2 -1 -1]
Row 283 differs:
BAL: [ 1 -1 -1]
CVR: [ 3 -1 -1]
