In [2]:
import math
import pandas as pd
import numpy as np
from sklearn.mixture import GaussianMixture
from pathlib import Path
import sys

BASE_PATH = Path.cwd().parent
sys.path.append(str(BASE_PATH))

from model.sample_ballots import get_data, get_positions, fit_gmm, get_samples
from model.social_choice_functions import getPairMajMat

Winners: [1]


In [3]:
# Get Pairwise Majority Matrix - imported from model\social_choice_functions.py

In [4]:
def getTournAdjList(maj_mat: list[list[int]], n_alts: int):
    """
    Input: n*n Pairwise Majority matrix, where entry i,j represents the number of voters who prefer alternative i over j,
        number of alternatives
    Output: Adjacency list for the tournament
    """
    adj_mat = []
    for i in range(n_alts):
        connections = []
        for j in range(n_alts):
            if maj_mat[i][j] > maj_mat[j][i]: # connection only if strictly more voters prefer i over j
                connections.append(j)
        adj_mat.append(connections)  
    return adj_mat


In [5]:
def hasCycle(adj_list: list[list[int]], n_alts: int):
    """
    Input: n*n adjacency matrix for tournament,
        number of alternatives
    Output: Boolean, true is tournament has a cycle
    Idea: Carry out depth-first search (DFS) for each node in graph, if encounter same node twice in any search a cycle has been found
    """
    def DFSBreakOnCycle(adj_list: list[list[int]], visited: list[bool], in_stack: list[bool], start_node: int):
        visited[start_node] = True
        in_stack[start_node] = True # On path that is currently being explored

        for node in adj_list[start_node]:
            if not visited[node]:
                found_cycle = DFSBreakOnCycle(adj_list, visited, in_stack, node)
                if found_cycle:
                    return True
            elif in_stack[node]:
                return True # found cycle (node already visited and on current path)
        in_stack[start_node] = False # finished exploring start_node's paths
        return False


    for i in range(n_alts):
        found_cycle = DFSBreakOnCycle(adj_list, [False for j in range(n_alts)], [False for j in range(n_alts)], i)
        if found_cycle == True:
            return True # found cycle
    return False # not found cycle



**Run Analysis with 2 dimensions**

In [None]:
N_VOTERS = 3 #100000 # Rough size of Bristol constituency
N_SIMS = 100000 # Number of voting simulations

PARTIES = ["Cons", "Lab", "LibDem", "SNP", "Plaid", "Green", "Reform"]
EMISSION_DIST = [0.16, 0.18, 0.12, 0.03, 0.03, 0.15, 0.33] # from ipsos voting intention opinion poll 06/01/26 (other 6% was split evenly between SNP & Plaid)
PARTY_IDS = [1101, 1102, 1104, 1105, 1106, 1107, 1110]
ELECTION_YEAR = 2024
ATTRIBUTES = ["lrecon", "galtan"]

df = get_data(BASE_PATH, PARTY_IDS)
party_positions = get_positions(df, ATTRIBUTES)
gmm = fit_gmm(party_positions, PARTIES, EMISSION_DIST)

n_cycles = 0
for i in range(N_SIMS):
    voter_positions, _, sample_ballots = get_samples(gmm, party_positions, N_VOTERS)
    maj_mat = getPairMajMat(sample_ballots, len(PARTIES))
    adj_list = getTournAdjList(maj_mat, len(PARTIES))
    if hasCycle(adj_list, len(adj_list)):
        n_cycles += 1
        print("Cycle found")
print("Number of cycles =", n_cycles)
print("Probalility of a Condorcet Cycle ≈", n_cycles/N_SIMS)

    



Number of cycles = 0
Probalility of a Condorcet Cycle ≈ 0.0


**Check for single-peakedness**

In [None]:
# Define linear ordering, order alternitives by distance from origin.

linear_ordering = sorted(list(range(len(PARTIES))), key=lambda x: math.sqrt((party_positions[x][0]**2) + (party_positions[x][1]**2)))
for i in linear_ordering:
    print(i, PARTIES[i])

import itertools

def isBallotSinglePeaked(ballot, linear_ordering):
    top_ranked = ballot[0]
    for i in range(len(PARTIES)):
        for j in range(len(PARTIES)):
            if i == j or top_ranked == i or top_ranked == j:
                continue
            increasing_codition = (linear_ordering.index(top_ranked) < linear_ordering.index(i) < linear_ordering.index(j)) == (ballot.index(i) < ballot.index(j))
            decreasing_codition = (linear_ordering.index(top_ranked) > linear_ordering.index(i) > linear_ordering.index(j)) == (ballot.index(i) < ballot.index(j))
            
            if not increasing_codition and not decreasing_codition:
                #print(ballot)
                #print(i,j)
                return False # falsified
    return True

def numBallotsSinglePeaked(profile, linear_ordering):
    n_single_peaked = 0
    for ballot in profile:
        if isBallotSinglePeaked(ballot, linear_ordering):
            n_single_peaked += 1
    return n_single_peaked

#Find linear ordering which has highest proporton of single-peaked ballots
max_n_single_peaked = 0
for linear_ordering in itertools.permutations(list(range(len(PARTIES)))):
    n_single_peaked = numBallotsSinglePeaked(sample_ballots, linear_ordering)
    if n_single_peaked == len(sample_ballots):
        print("valid linear ordering:", linear_ordering)
    if n_single_peaked >= max_n_single_peaked:
        max_n_single_peaked = n_single_peaked

print("Proportion of ballots that are single-peaked under optimal linear ordering:", max_n_single_peaked/len(sample_ballots))

# PCA Analysis

from sklearn.decomposition import PCA
import numpy as np

X = np.array(voter_positions)

pca = PCA()
pca.fit(X)

print("Eigenvalues:", pca.explained_variance_)
print("Explained variance ratio:", pca.explained_variance_ratio_)

1 Lab
5 Green
4 Plaid
3 SNP
2 LibDem
0 Cons
6 Reform
Proportion of ballots that are single-peaked under optimal linear ordering: 0.132


**Alternative method**

In [22]:
from itertools import combinations

def detect_condorcet_cycle(profile):
    """
    Detects if there is a Condorcet cycle in the given profile.

    Args:
        profile (tuple of tuples): Each inner tuple is a ballot (permutation of candidates).

    Returns:
        bool: True if there is a Condorcet cycle, False otherwise.
    """
    if not profile:
        return False

    n = len(profile[0])  # number of candidates
    pairwise_wins = [[0]*n for _ in range(n)]

    # Count pairwise victories
    for ballot in profile:
        for i, candidate_i in enumerate(ballot):
            for candidate_j in ballot[i+1:]:
                pairwise_wins[candidate_i][candidate_j] += 1

    num_voters = len(profile)

    # Determine if there is a Condorcet winner
    condorcet_winner = None
    for i in range(n):
        if all(pairwise_wins[i][j] > num_voters // 2 for j in range(n) if j != i):
            condorcet_winner = i
            break

    if condorcet_winner is not None:
        return False  # No cycle if Condorcet winner exists

    # If no Condorcet winner, there must be a cycle
    # Optionally, we could try to explicitly find the cycle, but existence is enough
    return True


In [24]:
n_cycles = 0
for i in range(N_SIMS):
    _, _, sample_ballots = get_samples(gmm, party_positions, N_VOTERS)
    if detect_condorcet_cycle(sample_ballots):
        n_cycles += 1
        print("Cycle found")
print("Number of cycles =", n_cycles)
print("Probalility of a Condorcet Cycle ≈", n_cycles/N_SIMS)

Number of cycles = 0
Probalility of a Condorcet Cycle ≈ 0.0


**Sample with more Dimensions**

In [8]:
# Only update ATTRIBUTES
ATTRIBUTES = ["lrecon", "eu_position", "immigrate_policy"]

df = get_data(BASE_PATH, PARTY_IDS)
party_positions = get_positions(df, ATTRIBUTES)
gmm = fit_gmm(party_positions, PARTIES, EMISSION_DIST)

n_cycles = 0
for i in range(N_SIMS):
    _, _, sample_ballots = get_samples(gmm, party_positions, N_VOTERS)
    maj_mat = getPairMajMat(sample_ballots, len(PARTIES))
    adj_list = getTournAdjList(maj_mat, len(PARTIES))
    if hasCycle(adj_list, len(adj_list)):
        n_cycles += 1
        print("Cycle found")
print("Number of cycles =", n_cycles)
print("Probalility of a Condorcet Cycle ≈", n_cycles/N_SIMS)

Number of cycles = 0
Probalility of a Condorcet Cycle ≈ 0.0


In [9]:
print("Is profile single-peaked:", isProfileSinglePeaked(sample_ballots, linear_ordering))

[0, 6, 1, 2, 3, 4, 5]
3 4
Is profile single-peaked: False


**Sample voter weights for different attributes as well**

In [10]:
def sample_voter_weights(n_voters: int, n_dims: int, alpha: float):
    """
    Sample per-voter salience weights from a high-variance Dirichlet distribution.
    """
    return np.random.dirichlet([alpha] * n_dims, size=n_voters)

In [11]:
def compute_weighted_utilities(
    voter_positions: np.ndarray,    # shape: (N, d)
    party_positions: np.ndarray,    # shape: (M, d)
    voter_weights: np.ndarray       # shape: (N, d)
):
    """
    Returns utility matrix of shape (N, M)
    """
    N, d = voter_positions.shape
    M = party_positions.shape[0]

    utilities = np.zeros((N, M))

    for i in range(N):
        diff = party_positions - voter_positions[i]          # (M, d)
        utilities[i] = -np.sum(voter_weights[i] * diff**2, axis=1)

    return utilities


In [12]:
def utilities_to_ballots(utilities: np.ndarray):
    """
    Convert utilities to strict preference orderings.
    """
    ballots = []
    for i in range(utilities.shape[0]):
        ranking = np.argsort(-utilities[i])
        ballots.append(ranking.tolist())
    return ballots

In [13]:
ATTRIBUTES = ["lrecon", "eu_position", "immigrate_policy"]
ALPHA = 0.4 # Alpha of the Dirichlet ditribution to sample voter weights, variance scales roughly with 1/APLHA
n_dims = len(ATTRIBUTES)
df = get_data(BASE_PATH, PARTY_IDS)
party_positions = get_positions(df, ATTRIBUTES)
gmm = fit_gmm(party_positions, PARTIES, EMISSION_DIST)

n_cycles = 0

for sim in range(N_SIMS):

    # 1. Sample voters (your existing code)
    voter_positions, _, _ = get_samples(gmm, party_positions, N_VOTERS)

    # 2. Sample high-variance salience weights
    voter_weights = sample_voter_weights(
        n_voters=N_VOTERS,
        n_dims=n_dims,
        alpha=ALPHA   # <-- TUNE THIS
    )

    # 3. Compute weighted utilities
    utilities = compute_weighted_utilities(
        voter_positions,
        party_positions,
        voter_weights
    )

    # 4. Convert to ballots
    sample_ballots = utilities_to_ballots(utilities)

    # 5. Your existing pipeline
    maj_mat = getPairMajMat(sample_ballots, len(PARTIES))
    adj_list = getTournAdjList(maj_mat, len(PARTIES))

    if hasCycle(adj_list, len(PARTIES)):
        n_cycles += 1
        print("Cycle found")

print("Number of cycles =", n_cycles)
print("Probability ≈", n_cycles / N_SIMS)


Cycle found
Cycle found
Number of cycles = 2
Probability ≈ 0.002


In [14]:
"""Testing"""

profile = [
    [0,1,2],
    [1,2,0],
    [2,0,1]
]
maj_mat = getPairMajMat(profile,len(profile))
for row in maj_mat:
    print(row)
print("\n")
adj_list = getTournAdjList(maj_mat, len(maj_mat))
for row in adj_list:
    print(row)
    
print("\n")
print(hasCycle(adj_list, len(adj_list)))

[0, 2, 1]
[1, 0, 2]
[2, 1, 0]


[1]
[2]
[0]


True
