In [15]:
###Function to generate random Rankings, which have low dispersion between one another, to simulate a real ranking, where there are always "top" universities

In [12]:
import random
from typing import List, Optional

def generate_rankings_correlated(n: int, m: int,*,phi: float = 0.3,top_k: int = 0,seed: Optional[int] = None) -> List[List[str]]:
    """
    Generate `n` rankings of `m` items that are similar to each other.

    Rankings are sampled from a Mallows model (Kendall distance) centered at a base
    order. Lower `phi` => rankings stay close to the center; higher `phi` => more noise.

    Args:
        n: Number of rankings to generate.
        m: Number of items per ranking.
        phi: Dispersion in [0, 1]. 0 -> identical to center; 1 -> near-uniform.
        top_k: If >0, the first `top_k` labels (C1, C2, C3, ...) sit at the top of the
               center order, so they tend to remain near the top (but can shuffle
               among themselves).
        seed: Optional random seed for reproducibility.

    Returns:
        List of `n` permutations (each a list of str).
    """
    if not (0.0 <= phi <= 1.0):
        raise ValueError("phi must be in [0, 1]")
    if top_k < 0 or top_k > m:
        raise ValueError("top_k must be in [0, m]")
    if seed is not None:
        random.seed(seed)

    labels = [f"C{i+1}" for i in range(m)]
    center = labels[:top_k] + labels[top_k:]  # base order (C1..Ctop_k first)

    def _sample_mallows_kendall(center_order: List[str], phi_val: float) -> List[str]:
        # Repeated Insertion Model (RIM) sampler for Mallows (Kendall) distribution
        perm: List[str] = []
        for item in reversed(center_order):
            L = len(perm)
            if phi_val == 1.0:
                j = random.randint(0, L)
            else:
                weights = [phi_val ** k for k in range(L + 1)]
                s = sum(weights)
                r = random.random() * s
                acc = 0.0
                j = 0
                for idx, w in enumerate(weights):
                    acc += w
                    if r <= acc:
                        j = idx
                        break
            perm.insert(j, item)
        return perm

    return [_sample_mallows_kendall(center, phi) for _ in range(n)]

In [13]:
#test
A=generate_rankings_correlated(4,4)
print(A)
B=generate_rankings_correlated(4,4)
print(B)
C=generate_rankings_correlated(5,6)
print(C)

[['C1', 'C2', 'C3', 'C4'], ['C2', 'C3', 'C1', 'C4'], ['C1', 'C3', 'C2', 'C4'], ['C1', 'C4', 'C2', 'C3']]
[['C3', 'C1', 'C4', 'C2'], ['C3', 'C1', 'C2', 'C4'], ['C1', 'C2', 'C3', 'C4'], ['C1', 'C2', 'C4', 'C3']]
[['C1', 'C2', 'C6', 'C3', 'C4', 'C5'], ['C1', 'C3', 'C2', 'C4', 'C5', 'C6'], ['C1', 'C2', 'C3', 'C5', 'C4', 'C6'], ['C1', 'C2', 'C4', 'C3', 'C5', 'C6'], ['C1', 'C2', 'C6', 'C4', 'C5', 'C3']]


In [None]:
###Function to build pairwise matrixes: This function merges all rankings together into a pairwise comparison table
##to know how many times each university wins against each other.

In [14]:
from typing import Iterable, Mapping, Sequence, Dict, List, Optional, Tuple, Union, Set
import itertools

# ---- helpers (used only by build_pairwise_matrix) ----

RankLike = Union[Sequence[str], Mapping[str, int]]
MarginMatrix = Dict[str, Dict[str, float]]

#Turns a list (ranking) into a dictionary, wto know the position of each item
def _as_position_map(ranking: RankLike) -> Dict[str, float]:
    """Accepts list/tuple (1 is best) or dict {item: position} (lower is better)."""
    if isinstance(ranking, Mapping):
        return {str(k): float(v) for k, v in ranking.items()}
    elif isinstance(ranking, Sequence):
        return {str(x): float(i + 1) for i, x in enumerate(ranking)}
    raise TypeError("ranking must be a sequence or a mapping")

#Checks for  each ranking if "a" is ranked higher than "b":
def _pair_pref(pos: Mapping[str, float], a: str, b: str,
               *, missing: str, default_pos: float) -> Optional[int]:
    """
    Preference of a over b in one ranking:
      +1 if a<b, -1 if b<a, 0 if tie, None if ignore due to missing.
    """
    a_in, b_in = a in pos, b in pos
    if missing == "ignore" and (not a_in or not b_in):
        return None
    pa = pos.get(a, default_pos)
    pb = pos.get(b, default_pos)
    if pa < pb:  return 1
    if pb < pa:  return -1
    return 0

# ---- main function ----

#It does this for all pairs of items (A vs B, A vs C, B vs C, etc.)
#and sums the results from all rankings. 
#If in most rankings, A is above B, then it returns a positive number.
#If B beats A more often, the is negative
def build_pairwise_matrix(
    rankings: Iterable[RankLike],
    *,
    weights: Optional[Iterable[float]] = None,
    items: Optional[Iterable[str]] = None,
    missing: str = "bottom",
) -> Tuple[List[str], MarginMatrix]:
    """
    Merge all rankings into the pairwise *margins* matrix M.

    M[i][j] = (weighted # times i preferred over j) - (weighted # times j over i)

    Robust to:
      - different ranking lengths,
      - missing items (controlled by `missing`: 'bottom' or 'ignore').

    Returns:
        items_list, M
    """
    ranks = [_as_position_map(r) for r in rankings]
    if not ranks:
        return [], {}

    # Universe of items
    if items is None:
        universe: Set[str] = set().union(*[set(r.keys()) for r in ranks])
    else:
        universe = set(map(str, items))
    items_list = sorted(universe)

    # Weights
    if weights is None:
        w = [1.0] * len(ranks)
    else:
        w = [float(x) for x in weights]
        if len(w) != len(ranks):
            raise ValueError("weights length must match number of rankings")

    # Default positions for missing='bottom'
    defaults = [(max(r.values()) if r else 0.0) + 1.0 for r in ranks]

    # Initialize margins
    M: MarginMatrix = {i: {j: 0.0 for j in items_list if j != i} for i in items_list}

    # Tally pairwise preferences
    for a, b in itertools.permutations(items_list, 2):
        margin = 0.0
        for r, wt, defpos in zip(ranks, w, defaults):
            pref = _pair_pref(r, a, b, missing=missing, default_pos=defpos)
            if pref is None:
                continue
            margin += wt * (1 if pref == 1 else (-1 if pref == -1 else 0))
        M[a][b] = margin

    return items_list, M

In [15]:
#Test
Matrix,M=build_pairwise_matrix(A)
#Getting the elements of the rankings, then the pairwise margins matrix
#Each row shows how many more times one item beats another.
print(Matrix)
print(M)

['C1', 'C2', 'C3', 'C4']
{'C1': {'C2': 2.0, 'C3': 2.0, 'C4': 4.0}, 'C2': {'C1': -2.0, 'C3': 2.0, 'C4': 2.0}, 'C3': {'C1': -2.0, 'C2': -2.0, 'C4': 2.0}, 'C4': {'C1': -4.0, 'C2': -2.0, 'C3': -2.0}}


In [None]:
#Kemeny Score
#For every pair (i,j) in your candidate ranking, if i appears before j in your order, it adds M[i][j] to the total.
#It’s the sum of all pairwise agreements with your candidate order.
    #The larger the number, the better this order fits all the rankings.
    #A smaller (or negative) score means your order disagrees more often.

In [16]:
from typing import Sequence, Dict

MarginMatrix = Dict[str, Dict[str, float]]

def calculate_kemeny_score(
    order: Sequence[str], #Proposed matrix
    matrix: MarginMatrix,  #Total points ( inverse of kendall tau distance)
    *,
    strict: bool = False
) -> float:
    """
    Kemeny objective: sum M[i][j] for all pairs where i appears before j in `order`.
    Higher score = better agreement with the input rankings.

    Args:
        order: candidate ranking (best -> worst). Can be any iterable of labels.
        matrix: pairwise *margins* matrix as returned by build_pairwise_matrix().
        strict: if True, require `order` to contain exactly the matrix items
                (raise on missing/extra). If False, extra labels are ignored and
                missing items are appended to the end in a stable way.

    Returns:
        Total Kemeny score (float).
    """
    items = list(matrix.keys())
    order = [str(x) for x in order]

    if strict:
        s_order, s_items = set(order), set(items)
        if s_order != s_items:
            missing = sorted(s_items - s_order)
            extra   = sorted(s_order - s_items)
            raise ValueError(
                f"Order must contain exactly these items: {sorted(s_items)}; "
                f"missing={missing}, extra={extra}"
            )
        full_order = order
    else:
        # keep only valid labels, deduplicate while preserving given order
        seen = set()
        full_order = [x for x in order if x in matrix and not (x in seen or seen.add(x))]
        # append any items not mentioned
        full_order += [x for x in items if x not in full_order]

    pos = {x: i for i, x in enumerate(full_order)}

    score = 0.0
    for i in items:
        for j, m_ij in matrix[i].items():
            if i == j:
                continue
            if pos[i] < pos[j]:
                score += m_ij
    return float(score)

In [17]:
#test with proposed matrixes and the  score board M
Score1 = calculate_kemeny_score(['C1', 'C2', 'C3', 'C4'], M)
print(Score1)
Score2 = calculate_kemeny_score(['C2', 'C1', 'C3', 'C4'], M)
print(Score2)
Score3 = calculate_kemeny_score(['C3', 'C1', 'C4', 'C2'], M)
print(Score3)

14.0
10.0
2.0


In [None]:
# Copeland Method: Ranks candidates by number of pairwise wins
# Used as the initial "educated guess" for the Kemeny ranking heuristics
def copeland_method(matrix: MarginMatrix) -> List[str]:
    """
    Compute ranking using Copeland method based on pairwise comparison matrix.
    
    For each candidate, count how many other candidates they beat (M[i][j] > 0).
    Rank candidates by number of wins (descending).
    
    Args:
        matrix: Pairwise margins matrix from build_pairwise_matrix().
    
    Returns:
        Ranking (best to worst) as list of candidate labels.
    """
    items = list(matrix.keys())
    wins = {item: 0 for item in items}
    
    # Count pairwise wins for each candidate
    for i in items:
        for j in items:
            if i != j and matrix[i][j] > 0:
                wins[i] += 1
    
    # Sort by wins (descending), then by label for tie-breaking
    ranked = sorted(items, key=lambda x: (-wins[x], x))
    return ranked


In [None]:
# Borda Count: Ranks candidates by sum of position-based points
# alternative method to copeland for starting point
def borda_count(rankings: Iterable[RankLike]) -> List[str]:
    """
    Compute ranking using Borda count method.
    For each ranking, assign points: (m - position) where m is number of candidates.
    Sum points across all rankings and rank by total score (descending).
    
    Args:
        rankings: Iterable of rankings (each a list/tuple of candidate labels).
    
    Returns:
        Ranking (best to worst) as list of candidate labels.
    """
    # Convert rankings to position maps
    ranks = [_as_position_map(r) for r in rankings]
    if not ranks:
        return []
    
    # Get all unique items
    all_items = set()
    for r in ranks:
        all_items.update(r.keys())
    items = sorted(all_items)
    
    if not items:
        return []
    
    # Calculate Borda scores
    scores = {item: 0.0 for item in items}
    
    for r in ranks:
        m = len(r)  
        for item, pos in r.items():
            points = m - pos
            scores[item] += points
    
    # Sort by score (descending), then by label for tie-breaking
    ranked = sorted(items, key=lambda x: (-scores[x], x))
    return ranked


In [None]:
### Helper Functions

# Kendall Tau Distance: Calculate distance between two rankings
def kendall_tau_distance(ranking1: Sequence[str], ranking2: Sequence[str]) -> int:
    """
    Calculate Kendall tau distance between two rankings.
    Counts the number of pairs that are in different relative order.
    Distance = number of inversions (discordant pairs).
    
    Args:
        ranking1: First ranking (list of candidate labels).
        ranking2: Second ranking (list of candidate labels).
    
    Returns:
        Kendall tau distance (number of inversions).
    """
    # Create position maps
    pos1 = {item: i for i, item in enumerate(ranking1)}
    pos2 = {item: i for i, item in enumerate(ranking2)}
    
    # Get all items that appear in both rankings
    items = [item for item in ranking1 if item in pos2]
    
    if len(items) < 2:
        return 0
    
    # Count inversions: pairs (i, j) where i < j in ranking1 but j < i in ranking2
    inversions = 0
    for i in range(len(items)):
        for j in range(i + 1, len(items)):
            a, b = items[i], items[j]
            # Check if order is different
            if (pos1[a] < pos1[b] and pos2[a] > pos2[b]) or \
               (pos1[a] > pos1[b] and pos2[a] < pos2[b]):
                inversions += 1
    
    return inversions


In [None]:
# Generate Neighbors: Create neighboring rankings for local search
def generate_neighbors(ranking: List[str], neighbor_type: str = "both") -> List[List[str]]:
    """
    Generate neighboring rankings for local search.
    
    Args:
        ranking: Current ranking (list of candidate labels).
        neighbor_type: Type of neighbors to generate:
            - "swap": Only adjacent swaps
            - "insert": Only insert moves
            - "both": Both types (default)
    
    Returns:
        List of neighboring rankings.
    """
    neighbors = []
    n = len(ranking)
    
    if neighbor_type in ("swap", "both"):
        # Adjacent swaps: swap positions i and i+1
        for i in range(n - 1):
            neighbor = ranking.copy()
            neighbor[i], neighbor[i + 1] = neighbor[i + 1], neighbor[i]
            neighbors.append(neighbor)
    
    if neighbor_type in ("insert", "both"):
        # Insert moves: move item from position i to position j (i != j)
        for i in range(n):
            for j in range(n):
                if i != j:
                    neighbor = ranking.copy()
                    item = neighbor.pop(i)
                    neighbor.insert(j, item)
                    neighbors.append(neighbor)
    
    return neighbors


In [None]:
### Phase 3: Local Search Solver

# Local Search with Hill Climbing
def local_search_kemeny(
    matrix: MarginMatrix,
    initial_ranking: List[str],
    neighbor_type: str = "both",
    max_iterations: int = 1000
) -> Tuple[List[str], float]:
    """
    Find optimal Kemeny ranking using local search (hill climbing).
    
    Starts from initial_ranking and iteratively moves to best neighbor
    until no improvement is found.
    
    Args:
        matrix: Pairwise margins matrix from build_pairwise_matrix().
        initial_ranking: Starting ranking (e.g., from Copeland or Borda).
        neighbor_type: Type of neighbors ("swap", "insert", or "both").
        max_iterations: Maximum number of iterations to prevent infinite loops.
    
    Returns:
        Tuple of (best_ranking, best_score).
    """
    current_ranking = initial_ranking.copy()
    current_score = calculate_kemeny_score(current_ranking, matrix, strict=True)
    
    iterations = 0
    improved = True
    
    while improved and iterations < max_iterations:
        improved = False
        neighbors = generate_neighbors(current_ranking, neighbor_type=neighbor_type)
        
        best_neighbor = None
        best_score = current_score
        
        # Evaluate all neighbors
        for neighbor in neighbors:
            try:
                score = calculate_kemeny_score(neighbor, matrix, strict=True)
                if score > best_score:
                    best_score = score
                    best_neighbor = neighbor
                    improved = True
            except ValueError:
                # Skip invalid neighbors (missing items, etc.)
                continue
        
        # Move to best neighbor if improvement found
        if improved and best_neighbor is not None:
            current_ranking = best_neighbor
            current_score = best_score
        
        iterations += 1
    
    return current_ranking, current_score


In [None]:
### Phase 4: Exact Solver (for Verification)

# Brute Force Exact Solver
def brute_force_kemeny(
    matrix: MarginMatrix,
    max_candidates: int = 10
) -> Tuple[List[str], float]:
    """
    Find optimal Kemeny ranking by trying all permutations (exact method).
    
    WARNING: Only use for small problems! Complexity is O(n! * n^2).
    
    Args:
        matrix: Pairwise margins matrix from build_pairwise_matrix().
        max_candidates: Maximum number of candidates allowed (safety check).
    
    Returns:
        Tuple of (optimal_ranking, optimal_score).
    
    Raises:
        ValueError: If number of candidates exceeds max_candidates.
    """
    import itertools
    
    items = list(matrix.keys())
    n = len(items)
    
    if n > max_candidates:
        raise ValueError(
            f"Too many candidates ({n}) for brute force. "
            f"Maximum allowed is {max_candidates}. Use heuristic method instead."
        )
    
    if n == 0:
        return [], 0.0
    
    # Try all permutations
    best_ranking = None
    best_score = float('-inf')
    
    for perm in itertools.permutations(items):
        perm_list = list(perm)
        score = calculate_kemeny_score(perm_list, matrix, strict=True)
        
        if score > best_score:
            best_score = score
            best_ranking = perm_list
    
    return best_ranking, best_score


In [None]:
### Phase 5: Main Solver Function

# Main Kemeny Ranking Finder
def find_kemeny_ranking(
    rankings: Iterable[RankLike],
    *,
    method: str = "auto",
    use_exact_for_small: bool = True,
    exact_max_candidates: int = 10,
    neighbor_type: str = "both",
    max_iterations: int = 1000,
    weights: Optional[Iterable[float]] = None,
    items: Optional[Iterable[str]] = None,
    missing: str = "bottom"
) -> Tuple[List[str], float, str]:
    """
    Find optimal Kemeny ranking that aggregates all input rankings.
    
    Automatically chooses between exact (brute force) and heuristic (local search)
    methods based on problem size.
    
    Args:
        rankings: Iterable of rankings to aggregate.
        method: Starting point method for local search:
            - "auto": Use Copeland (default)
            - "copeland": Use Copeland method
            - "borda": Use Borda count
        use_exact_for_small: If True, use exact solver for small problems (n ≤ exact_max_candidates).
        exact_max_candidates: Maximum candidates for exact solver (default 10).
        neighbor_type: Type of neighbors for local search ("swap", "insert", or "both").
        max_iterations: Maximum iterations for local search.
        weights: Optional weights for each ranking.
        items: Optional list of all items (if None, inferred from rankings).
        missing: How to handle missing items ("bottom" or "ignore").
    
    Returns:
        Tuple of (optimal_ranking, kemeny_score, method_used).
        method_used is one of: "exact", "local_search_copeland", "local_search_borda".
    """
    # Build pairwise matrix
    items_list, matrix = build_pairwise_matrix(
        rankings,
        weights=weights,
        items=items,
        missing=missing
    )
    
    if not items_list:
        return [], 0.0, "empty"
    
    n = len(items_list)
    
    # Use exact solver for small problems
    if use_exact_for_small and n <= exact_max_candidates:
        try:
            ranking, score = brute_force_kemeny(matrix, max_candidates=exact_max_candidates)
            return ranking, score, "exact"
        except ValueError:
            # Fall through to heuristic if exact fails
            pass
    
    # Use heuristic (local search)
    # Determine starting point
    if method == "auto" or method == "copeland":
        initial_ranking = copeland_method(matrix)
        method_used = "local_search_copeland"
    elif method == "borda":
        initial_ranking = borda_count(rankings)
        method_used = "local_search_borda"
    else:
        raise ValueError(f"Unknown method: {method}. Use 'auto', 'copeland', or 'borda'.")
    
    # Run local search
    ranking, score = local_search_kemeny(
        matrix,
        initial_ranking,
        neighbor_type=neighbor_type,
        max_iterations=max_iterations
    )
    
    return ranking, score, method_used


In [None]:
# Test the complete Kemeny ranking system
print("=== Testing Kemeny Ranking Solver ===\n")

# Test with small problem (should use exact solver)
print("Test 1: Small problem (4 candidates) - should use exact solver")
test_rankings = [
    ['C1', 'C2', 'C3', 'C4'],
    ['C2', 'C1', 'C4', 'C3'],
    ['C1', 'C3', 'C2', 'C4'],
    ['C4', 'C1', 'C2', 'C3']
]

ranking, score, method = find_kemeny_ranking(test_rankings)
print(f"Optimal ranking: {ranking}")
print(f"Kemeny score: {score}")
print(f"Method used: {method}\n")

# Test starting point methods
print("Test 2: Compare starting point methods")
items_list, M = build_pairwise_matrix(test_rankings)
copeland_rank = copeland_method(M)
borda_rank = borda_count(test_rankings)
print(f"Copeland ranking: {copeland_rank}")
print(f"Borda ranking: {borda_rank}")
print(f"Copeland score: {calculate_kemeny_score(copeland_rank, M)}")
print(f"Borda score: {calculate_kemeny_score(borda_rank, M)}\n")

# Test with larger problem (should use local search)
print("Test 3: Larger problem (6 candidates) - should use local search")
test_rankings_large = generate_rankings_correlated(10, 6, phi=0.4, seed=42)
ranking_large, score_large, method_large = find_kemeny_ranking(test_rankings_large)
print(f"Optimal ranking: {ranking_large}")
print(f"Kemeny score: {score_large}")
print(f"Method used: {method_large}")
