In [None]:
import itertools
import pandas as pd
import numpy as np

import pulp

# Reading constraints and preferences

In [None]:
def read_voorkeuren(filename='voorkeuren.xlsx') -> pd.DataFrame:
    """Read the file containing the voorkeuren

    This handles the MultiIndex that must be read
    TO DO: validate the input

    Parameters
    ----------
    filename : str, optional
        path to the file, by default 'voorkeuren.xlsx'

    Returns
    -------
    pandas DataFrame
        DataFrame with the correct index and columns
    """
    df = pd.read_excel(filename, header=None, index_col=0)
    df.index.name = 'Leerling'

    df.iloc[0] = df.iloc[0].ffill()
    df.iloc[1] = df.iloc[1].ffill()
    df.columns = pd.MultiIndex.from_arrays([df.iloc[0], df.iloc[1], df.iloc[2]], names=['TypeWens', 'Nr', 'TypeWaarde'])
    df = df.iloc[3:]
    return df

def restructure_voorkeuren(df: pd.DataFrame) -> pd.DataFrame:
    """
    Restructure the input from wide to long format and do basic cleaning
    """

    df = df.stack(['TypeWens', 'Nr']).fillna({'Gewicht': 1})
    return df

leerlingen = leerlingen = read_voorkeuren('voorkeuren.xlsx')
voorkeuren = leerlingen.pipe(restructure_voorkeuren)
#TODO: validate all Gewicht > 0
#TODO: validate all Waardes are either a leerling or a group
#TODO: validate sum of gewichten < M
#TODO: validate eps < gewicht precision
#TODO: validate "liever niet" is not filled

# Setting up the problem

In [None]:
M = 1_000_000 # A very big number, so that constraints are never larger than 1
EPS = 0.001 # A small number to correct for numerical inaccuracies


MBGROEPEN = ['Blauw', 'Groen', 'Oranje', 'Geel']
leerling_per_obgroep = leerlingen['Groep'].squeeze().rename('Groep').reset_index().groupby('Groep')['Leerling'].agg(list).to_dict()
obgroepen = list(leerling_per_obgroep.keys())

In [None]:
def powerset(iterable):
    "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    return itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)+1))

def all_unique_sums(iterable):
    """Calculate all possible sums from sublists from the iterable"""
    return {sum(l) for l in powerset(iterable)}

def get_possible_weighted_wishes(voorkeuren: pd.DataFrame) -> set:
    """
    Get all the possible number of weighted wishes

    This will be used to know for which values a satisfaction score must be calculated
    and which dictionary values must be calculated per leerling. By minimizing this number,
    we make the problem calculation as fast as possible, while allowing for arbitrary precision

    Parameters
    ----------
    voorkeuren: pd.DataFrame
        The DataFrame containing the preferences of the leerlingen, must have a MultiIndex
        with levels ("Leerling", "TypeWens") with columns ("Waarde" & "Gewicht")  
    """
    unique_weighted_wishes_per_ll = (voorkeuren.xs('Graag met', level='TypeWens')
                                   .groupby('Leerling')
                                   ['Gewicht'].apply(all_unique_sums)
                                   )
    
    unique_weighted_wishes = set()
    for ww in unique_weighted_wishes_per_ll:
        unique_weighted_wishes.update(ww)
    return unique_weighted_wishes

unique_weighted_wishes = get_possible_weighted_wishes(voorkeuren)

In [None]:
def get_satisfaction_integral(x_a: float, x_b: float) -> float:
    """
    Calculate the extra satisfaction from granting x_b wishes instead of x_a

    This is the (scaled) integral of 0.5**x. This satisfaction function ensures everybody
    first gets their first wish, then everybody their second wish, etc.
    In principle, we should probably only specify the satisfaction function and then 
    have this just be a numerical integration for optimal flexibility, but since this
    flexibility isn't required yet, we're using a analytical integration.

    Parameters
    ----------
    x_a: float
        The number of (weighted) wishes as the basic satisfaction of the leerling
    x_b: float 
        The number of (weighted) wishes as the goal satisfaction of the leerling

    Returns
    -------
        The added satisfaction score of the leerling
    """
    return (-0.5 **x_b) - (-.5 ** x_a)

def get_preference_value(unique_weighted_wishes: set = unique_weighted_wishes) -> dict:
    """
    Calculate the score of getting all possible weighted_wishes values accounted for
    """
    # Sorting is important since we're going to difference!
    uww = sorted(list(unique_weighted_wishes))
    preference_value = {}
    # The 0 value is deliberately not taken into account! This would lead to ZeroDivisionErrors
    for last_ww, ww in zip(uww[:-1], uww[1:]):
        preference_value[ww] = get_satisfaction_integral(last_ww, ww)
    return preference_value

n_wishes_max = voorkeuren.groupby(['Leerling', "TypeWens"]).size().max()  # Size of dictionary per leerling
preference_value = get_preference_value(unique_weighted_wishes)

# Creating and solving the model

In [None]:
def solve_problem(voorkeuren: pd.DataFrame, leerlingen=leerlingen.index, groepen=MBGROEPEN, max_kliekje=5, optimize='llsatisfaction', preference_value=preference_value):
    """
    Create a problem to distribute leerlingen over groepen

    Parameters
    ----------
    voorkeuren: pd.DataFrame
        A DataFrame with as MultiIndex with (Leerling, Type, Nr) and a value, where
        Leerling is the Name, Type is either "Graag met", "Niet in" or "Liever niet"
        Waarde is then a column with either a Leerling or Group name. In combination with
        Niet In only a Group name is allowed
    leerlingen: Iterable
        An Iterable containing all names of the leerlingen
    groepen: Iterable
        An interable containing all names of the groepen to which leerlingen can be sent
    max_kliekje, int (default = 5)
        The number of leerlingen that can go to the same group
    optimize, str (default = "llsatisfaction")
        What to optimize for: "llsatisfaction" (basically, the least happy student is the most happy),
        "n_wishes" or "weighted_wishes" 
    preference_value, dct
        Keys: number of weighted_wishes granted
        Values: the extra satisfaction of getting key granted wrt getting key - 1 granted
    
    Returns
    -------
        pulp.LpProblem - the solved problem
    
    Raises
    ------
    ValueError
        If the value of optimize is invalid
    RuntimeError
        If problem is unsolvable

    """
    prob = pulp.LpProblem('leerlingindeling', pulp.LpMaximize)

    # Binary representations make more sense than a single integer per student which is basically an index
    in_group = pulp.LpVariable.dicts('group', itertools.product(leerlingen, groepen), cat='Binary')

    # Every student must be in exactly one group
    for ll in leerlingen:
        prob += pulp.lpSum([in_group[(ll, gr)] for gr in MBGROEPEN]) == 1

    # Every group can have a max number of students from an earlier group (no kliekjes)
    from_group_to_group = pulp.LpVariable.dicts('from_group_to_group', itertools.product(obgroepen, groepen), cat='Integer')
    for mbgroep in groepen:
        for obgroep in obgroepen:
            prob += from_group_to_group[(obgroep, mbgroep)] == pulp.lpSum([in_group[(ll, mbgroep)] for ll in leerling_per_obgroep[obgroep]])
            prob += from_group_to_group[(obgroep, mbgroep)] <= max_kliekje

    # Every group should have approximately equal number of new leerlingen
    min_in_group = int(len(leerlingen) / 4 - 1.5)
    max_in_group = int(len(leerlingen) / 4 + 1.5)
    new_students_in_group = pulp.LpVariable.dict('new_students_in_group', groepen, cat='Integer')
    for mbgroep in groepen:
        prob += new_students_in_group[mbgroep] == pulp.lpSum([in_group[(ll, mbgroep)] for ll in leerlingen])
        prob += new_students_in_group[mbgroep] <= max_in_group  #TODO: make this customizable or even add this as something to optimize
        prob += new_students_in_group[mbgroep] >= min_in_group

    # Some students can not move int other groups (e.g. a brother/sister is already there)
    for i, row in voorkeuren.query('TypeWens == "Niet in"').iterrows():
        ll, type_, nr = i
        gr = row['Waarde']
        prob += in_group[(ll, gr)] == 0

    # Now it's really starting: who prefers to be with whom - this we want to optimize
    graag_met = voorkeuren.xs('Graag met', level='TypeWens')
    weights = graag_met['Gewicht'].to_dict()
    satisfied = pulp.LpVariable.dicts("Satisfied", graag_met.index.to_list(), cat="Binary")
    weighted_satisfied = pulp.LpVariable.dicts("WeightedSatisfied", graag_met.index.to_list(), cat="Continuous", lowBound=0)

    # checks whether the preference is for a single groep (e.g. Blauw)
    pref_per_group = list(itertools.chain(*[[(ll, nr, gr) for gr in MBGROEPEN] for ll, nr in graag_met.index]))
    satisfied_per_group = pulp.LpVariable.dicts("Satisfied_per_group", pref_per_group, cat="Binary")
    for i, row in graag_met.iterrows():
        ll, nr = i
        if row['Waarde'] not in groepen:
            other_ll = row['Waarde']
            for gr in groepen:
                # Matching preferences are an XNOR problem, see https://yetanothermathprogrammingconsultant.blogspot.com/2022/06/xnor-as-linear-inequalities.html
                prob += satisfied_per_group[(ll, nr, gr)] >= 1 - in_group[(ll, gr)] - in_group[(other_ll, gr)]  # Allebei niet in deze groep ==> satisfied = 1
                prob += satisfied_per_group[(ll, nr, gr)] <= 1 + in_group[(ll, gr)] - in_group[(other_ll, gr)]  # ll niet in groep, ander wel ==> satisfied = 0
                prob += satisfied_per_group[(ll, nr, gr)] <= 1 - in_group[(ll, gr)] + in_group[(other_ll, gr)]  # ll in groep, ander niet ==> satisfied = 0
                prob += satisfied_per_group[(ll, nr, gr)] >= in_group[(ll, gr)] + in_group[(other_ll, gr)] - 1  # allebei in deze groep ==> satisfied = 1

                # The total preference is only satisfied if it is at least correct for this group
                # AND definition: see https://yetanothermathprogrammingconsultant.blogspot.com/2022/06/xnor-as-linear-inequalities.html
                prob += satisfied[i] <= satisfied_per_group[(ll, nr, gr)]  
            # The preference is satisfied if it is correct for every group
            prob += satisfied[i] >= pulp.lpSum([satisfied_per_group[(ll, nr, gr)] for gr in groepen]) - len(groepen) + 1  
        else:
            gr = row['Waarde']
            prob += (in_group[(ll, gr)] >= satisfied[i])
            prob += (in_group[(ll, gr)] <= satisfied[i])
        prob += weighted_satisfied[i] == (satisfied[i] * weights[i])

    # We do not want to optimize the number of matches: at least 1 match for a student is more valuable than the third
    satisfaction_per_ll = pulp.LpVariable.dict("LLSatisfaction", leerlingen, cat='Continuous')
    # Per ll whether at least i preferences are satisfied
    n_satisfied_per_ll = pulp.LpVariable.dicts("llassignedprefs", itertools.product(leerlingen, (i for i in range(1, n_wishes_max + 1))), cat='Binary')
    ww_satisfied_per_ll = pulp.LpVariable.dicts("llassignedweights", itertools.product(leerlingen, preference_value.keys()), cat='Binary')
    for ll in leerlingen:
        ll_prefs = []
        ll_weighted = []
        for i in range(1, n_wishes_max + 1):
            try:
                ll_prefs.append(satisfied[(ll, i)])
                ll_weighted.append(weighted_satisfied[(ll, i)])
            except KeyError:
                break
        n_satisfied = pulp.lpSum(ll_prefs)
        ww_satisfied = pulp.lpSum(ll_weighted)

        for i in range(1, n_wishes_max + 1):
            # n_satisfied(i) for each leerling is 0 if less than `i` preferences are satisfied
            prob += n_satisfied_per_ll[(ll, i)] <= n_satisfied / i # The division works because n_true_per_ll is binary, so can never be larger than 1
            # n_satisfied(i) for each leerling is 1 if at least i preferences are satisfied
            prob += n_satisfied_per_ll[(ll, i)] >= (n_satisfied - (i - 1) - EPS) / M  # M ensures the constraint is never larger than 1

        for n_ww in preference_value.keys():
            # ww_satisfied_per_ll(i) for each leerling is 0 if less than `weights` are satisfied
            prob += ww_satisfied_per_ll[(ll, n_ww)] <= ww_satisfied / n_ww + EPS # The division works because ww_satisfied_per_ll is binary, so can never be larger than 1
            # ww_satisfied_per_ll(i) for each leerling is 1 if at least n_ww preferences are satisfied
            prob += ww_satisfied_per_ll[(ll, n_ww)] >= (ww_satisfied - (n_ww - EPS)) / M  # M ensures the constraint is never larger than 1

        satisfaction_per_ll[ll] = sum(val * ww_satisfied_per_ll[(ll, n_ww)] for n_ww, val in preference_value.items())
    # TODO: implement "liever niet"
    if optimize == 'llsatisfaction':
        prob += pulp.lpSum(satisfaction_per_ll)
    elif optimize == 'n_wishes':
        prob += pulp.lpSum(satisfied)
    elif optimize == 'weighted_wishes':
        prob += pulp.lpSum(weighted_satisfied)
    else:
        raise ValueError(f'optimize must be in ["satisfaction", "n_wishes", "weighted_wishes"], not {optimize!r}')

    # TODO: add zorgkinderen as optimization
    prob.solve()
    if pulp.LpStatus[prob.status] != 'Optimal':
        raise RuntimeError(f'Could not solve LP-problem, status {pulp.LpStatus[prob.status]!r}')
    return prob

In [None]:
prob = solve_problem(voorkeuren)

# Get the outcome

In [None]:
def get_outcome(vars_) -> pd.DataFrame:
    """
    Restructure the Problem Variables in a nice DataFrame

    Parameters
    ----------
    vars: list of pulp.LpVariables
        The result of prob.variables()
    """
    chosen_groups = [v.name for v in vars_ if v.value() == 1 and v.name.startswith('group')]
    df = pd.DataFrame(chosen_groups)
    df[['Naam', 'Group']] = df[0].str.extract("group_\('(.*)',_'(.*)'\)")
    return df.drop(columns=[0])

def display_outcome_nicely(df):
    """
    Transform DataFrame so that leerlingen are grouped by the group in which they are placed
    """
    df = (outcome.assign(nr = lambda df: df.groupby('Group').cumcount().add(1))
                 .set_index(['Group', 'nr'])
                 ['Naam']
                 .unstack('Group', fill_value='')
        )
    return df

outcome = get_outcome(prob.variables())
outcome.pipe(display_outcome_nicely)

# Check solution

In [None]:
def probvars_to_series(prob: pulp.LpProblem, name: str, not_in_name: str) -> pd.Series:
    """
    Extract (accounted) preferences from problem to a series

    Will extract leerling name and preference number as index, and whether accounted for as value
    Parameters
    ----------
    prob: pulp.LpProblem
        The problem containing the variables
    name: str
        The beginning of the variable name, will also be the name of the Series
    not_in_name: str

    """
    constraints = {v.name: v.value() for v in prob.variables() if v.name.startswith(name) and not not_in_name in v.name}
    series = pd.Series(constraints, name=name)
    ix = (series.index.to_series()
          .str.extract(f"{name}_\('(?P<ll>.*)',_(?P<Nr>.*)\)")
            .set_index(['ll', 'Nr'])
            .index
            )
    
    series.index = ix
    return series

def calculate_satisfied_constraints(prob: pulp.LpProblem) -> pd.DataFrame:
    """
    Calculate which constraints and for whom are accommodated

    Parameters
    ----------
    prob: pulp.LpProblem
        The result of prob.variables()
    
    Returns
    -------
        pd.DataFrame with Satisfied and WeightedSatisfied preferences
    """
    satisfied = probvars_to_series(prob, 'Satisfied', 'per_group').astype('boolean')
    weightedsatisfied = probvars_to_series(prob, 'WeightedSatisfied', 'per_group')
    return pd.concat([satisfied, weightedsatisfied], axis='columns')


def calculate_performance_per_leerling(satisfied_constraints):
    """
    Calculate basic performance metrics per leerling

    Performance is better when more preferences are more accommodated

    Parameters
    ----------
    satisfied_constraints: pd.DataFrame
        The output of calculate_satisfied_constraints
    """
    df = (satisfied_constraints.groupby('ll').agg(NrPreferences = ('Satisfied', 'count'),
                                                 AccountedPreferences = ('Satisfied', 'sum'),
                                                 PctAccounted = ('Satisfied', 'mean'),
                                                 AccountedWeightedPreferences = ('WeightedSatisfied', 'sum'),
                                                 )
        .assign(NrWeightedPreferences = voorkeuren.xs('Graag met', level='TypeWens').groupby('Leerling')['Gewicht'].sum(),
                PctWeightedAccounted = lambda df: df['AccountedWeightedPreferences'] / df['NrWeightedPreferences']
                
                )                                   
        )

    possible_satisfaction = np.array([get_satisfaction_integral(0, nprefs) for nprefs in df['NrPreferences']])
    actual_satisfaction = np.array([get_satisfaction_integral(0, nprefs) for nprefs in df['AccountedPreferences']])
    df = df.assign(PossibleSatisfaction = lambda df: df['NrWeightedPreferences'].map(lambda x: get_satisfaction_integral(0, x)),
                    ActualSatisfaction = lambda df: df['AccountedWeightedPreferences'].map(lambda x: get_satisfaction_integral(0, x)),
                    RelativeSatisfaction =  lambda df: df['ActualSatisfaction']/df['PossibleSatisfaction'])
    
    return df

def calculate_solution_performance(ll_performance):
    """
    Calculate the performance of the general model

    Parameters
    ----------
    ll_performance: pd.DataFrame
        The output of calculate_performance_per_leerling
    """
    cols = ['NrPreferences', 'AccountedPreferences', 'NrWeightedPreferences', 'AccountedWeightedPreferences', 'PossibleSatisfaction', 'ActualSatisfaction']
    solution_performance = (ll_performance[cols]
                                          .sum()
                                          .to_frame().transpose()
                                          .assign(PctAccountedPreferences = lambda df: df['AccountedPreferences'] / df['NrPreferences'],
                                                  PctAccountedWeightedPreferences = lambda df: df['AccountedWeightedPreferences'] / df['NrWeightedPreferences'],
                                                  RelativeSatisfaction = lambda df: df['ActualSatisfaction'] / df['PossibleSatisfaction']
                                                  )                
                        ).to_dict('records')[0]
    return solution_performance

satisfied_constraints = calculate_satisfied_constraints(prob)
ll_performance = calculate_performance_per_leerling(satisfied_constraints)
solution_performance = calculate_solution_performance(ll_performance)  
print(solution_performance)
display(ll_performance)

# Analysis

In [None]:
def get_solution_performance(voorkeuren: pd.DataFrame, n_students_max: int = 5, optimize: str = 'llsatisfaction'):
    """
    Convenience function which combines problem solving to get all metrics
    """
    prob = solve_problem(voorkeuren, max_kliekje=n_students_max, optimize=optimize)
    satisfied_constraints = calculate_satisfied_constraints(prob)
    ll_performance = calculate_performance_per_leerling(satisfied_constraints)
    return calculate_solution_performance(ll_performance)  

solution_performance_overview = dict()
for optimize in ['llsatisfaction', 'n_wishes', 'weighted_wishes']:
    for n_students_max in range(4, 7):
        solution_performance_overview[(optimize, n_students_max)] = get_solution_performance(voorkeuren, n_students_max, optimize)

In [None]:
pd.DataFrame.from_dict(solution_performance_overview, orient='index')