In [68]:
import random

import numpy as np
import pandas as pd
from numba import boolean, njit

In [69]:
# Load data
data = pd.read_csv('data/family_data.csv')
submission = pd.read_csv('data/sample_submission.csv')

matrix = data[[f'choice_{i}' for i in range(10)]].to_numpy()
family_size = data['n_people'].to_numpy()

N_FAMILIES = len(family_size)
N_DAYS = 100
MAX_OCCUPANCY = 300
MIN_OCCUPANCY = 125

In [70]:
# Precompute penalties as a NumPy array
penalties_array = np.zeros((family_size.max() + 1, 11), dtype=np.float64)
for n in range(family_size.max() + 1):
    penalties_array[n] = [
        0,
        50,
        50 + 9 * n,
        100 + 9 * n,
        200 + 9 * n,
        200 + 18 * n,
        300 + 18 * n,
        300 + 36 * n,
        400 + 36 * n,
        500 + 36 * n + 199 * n,
        500 + 36 * n + 398 * n
    ]

In [71]:
# Convert matrix into choice ranking lookup:
choice_rank = -np.ones((N_FAMILIES, N_DAYS), dtype=np.int8)
for i in range(matrix.shape[0]):
    for rank, day in enumerate(matrix[i]):
        choice_rank[i, day - 1] = rank  # days are 1-based

In [72]:
@njit
def cost_function(prediction, family_size, choice_rank, penalties_array):
    daily_occupancy = np.zeros(N_DAYS, dtype=np.int32)
    penalty = 0.0

    for i in range(prediction.shape[0]):
        d = prediction[i] - 1  # adjust for 0-based index
        n = family_size[i]
        daily_occupancy[d] += n

        rank = choice_rank[i, d]
        if rank == -1:
            penalty += penalties_array[n, 10]
        else:
            penalty += penalties_array[n, rank]

    # Soft constraints
    for v in daily_occupancy:
        if v < MIN_OCCUPANCY or v > MAX_OCCUPANCY:
            penalty += 1e8

    # Accounting cost
    acc = max(0, ((daily_occupancy[N_DAYS-1] - 125.0) / 400.0) * (daily_occupancy[N_DAYS-1]**0.5))
    yesterday = daily_occupancy[N_DAYS-1]

    for i in range(N_DAYS-2, -1, -1):
        today = daily_occupancy[i]
        diff = abs(today - yesterday)
        acc += max(0, ((today - 125.0) / 400.0) * (today ** (0.5 + diff / 50.0)))
        yesterday = today

    penalty += acc
    return penalty

In [73]:
class Chromosome:
    def __init__(self, num_days, num_families):
        self.num_days = num_days
        self.num_families = num_families
        self.assigned_days = np.zeros(num_families, dtype=np.int32)
        self.daily_attendance = np.zeros(num_days, dtype=np.int32)

    def copy(self):
        new_chromo = Chromosome(self.num_days, self.num_families)
        new_chromo.assigned_days = self.assigned_days.copy()
        new_chromo.daily_attendance = self.daily_attendance.copy()
        return new_chromo

    def is_swap_valid(self, family_idx, new_day, family_size_arr) -> boolean:
        current_day = self.assigned_days[family_idx]
        family_size = family_size_arr[family_idx]

        reduced = self.daily_attendance[current_day - 1] - family_size
        increased = self.daily_attendance[new_day - 1] + family_size

        return (reduced >= MIN_OCCUPANCY and reduced <= MAX_OCCUPANCY and
                increased >= MIN_OCCUPANCY and increased <= MAX_OCCUPANCY)

    def update_attendance(self, family_idx, old_day, new_day, family_size_arr):
        self.daily_attendance[old_day - 1] -= family_size_arr[family_idx]
        self.daily_attendance[new_day - 1] += family_size_arr[family_idx]

## Initialization of chromosome
In this section we initialize chromosomes probabilistically assigning some day to each family

In [74]:
# Create list of choice columns ('choice_0' to 'choice_9')
cols = [f'choice_{i}' for i in range(10)]
# Convert choices to dictionary format: {day: choice_rank}}
choice_dict = data[cols].T.to_dict()

# Contains array of dictionaries with format: {family_id: {day: choice_rank}}
# This helps quickly lookup a day's preference rank for each family
choice_dict_num = [{vv:i for i, vv in enumerate(di.values())} for di in choice_dict.values()]

# dictionary with format family_id: family_size
family_size_dict = data[['n_people']].to_dict()['n_people']
# An array with the size of families
family_size_ls = list(family_size_dict.values())


def initialize_chromosome() -> Chromosome:
    """
    Initializes a valid chromosome (solution) for the genetic algorithm.
    
    Creates a random assignment of families to days that:
    - Respects daily capacity constraints (≤300 people)
    - Prefers better-ranked choices with higher probability
    - Penalizes assignments that would make days nearly full
    
    Returns:
        Chromosome: A new valid solution chromosome
    """
    people_in_day = {day:0 for day in range(1, N_DAYS + 1)}
    chromosome = np.zeros(len(family_size_dict), dtype=np.int32)
    for i in range(len(chromosome)):
        # family's preferred days (from their 10 choices)
        items = list(choice_dict_num[i].keys()).copy()
        
        # Add one random non-preferred day to consider
        while True:
            new_number = random.randint(1, N_DAYS)
            if new_number not in items:
                items += [new_number]
                break
        
        # Get penalty weights for this family size. The smaller the size and the preferred day, the smaller the penalty
        weights = penalties_array[family_size_dict[i]]

        inverse_weights = []
        for j in range(len(weights)):
            c = 50
            # Additional penalty based on current day occupancy in order to decrease choosing the most occupated days
            if people_in_day[items[j]] in range(MIN_OCCUPANCY, MAX_OCCUPANCY + 1):
                c += (people_in_day[items[j]] - MIN_OCCUPANCY) * 1000
            # Final weight = inverse of (base penalty + occupancy penalty) < 1
            inverse_weights.append(1 / (weights[j] + c))
        
        # Filter to only feasible day options (wouldn't violate restrictions)
        new_items = []
        new_inverse_weights = []
        for j in range(len(items)):
            if people_in_day[items[j]] + family_size_ls[i] <= MAX_OCCUPANCY:
                new_items.append(items[j])
                new_inverse_weights.append(inverse_weights[j])
                
        # Select a day probabilistically based on weights
        chromosome[i] = random.choices(new_items, new_inverse_weights, k=1)[0]
        # Update the day's occupancy count
        people_in_day[chromosome[i]] += family_size_ls[i]

    # Create Chromosome object and populate its data
    chromosome_cls = Chromosome(N_DAYS, N_FAMILIES)
    chromosome_cls.assigned_days = chromosome

    # Transfer occupancy counts to Chromosome object
    for key, value in people_in_day.items():
        chromosome_cls.daily_attendance[key - 1] = value

    return chromosome_cls

## Tournament selection.
Each selected individual won in tounament with n=tournament_size cometitors based on fitness value

In [75]:
def selection(population, selection_size, tournament_size):
    """
    Performs tournament selection to choose individuals for reproduction.
    
    Tournament selection works by:
    1. Randomly selecting a subset (tournament) of individuals
    2. Choosing the best one from each tournament
    3. Repeating until we've selected the desired number
    
    Args:
        population (list): List of Chromosome objects in current generation
        selection_size (int): Number of individuals to select
        tournament_size (int): Number of participants in each tournament
        
    Returns:
        list: Selected Chromosome objects for reproduction
    """
    pop_size = len(population)
    costs = np.array([cost_function(ind.assigned_days, family_size, choice_rank, penalties_array) for ind in population])
    selected = []

    for _ in range(selection_size):
        # Randomly select n=tournament_size individuals
        indices = np.random.choice(pop_size, tournament_size, replace=False)
        # Choose the best from this tournament
        best_idx = indices[np.argmin(costs[indices])]
        selected.append(population[best_idx])

    return selected

In [76]:
def perform_swap(child1, child2, family_idx, day1, day2):
    """
    Executes a reciprocal day swap between two child chromosomes for a specific family.
    
    Swaps the assigned days between two children for the given family index,
    while maintaining consistent daily attendance counts in both chromosomes.

    Args:
        child1 (Chromosome): First child chromosome to modify
        child2 (Chromosome): Second child chromosome to modify
        family_idx (int): Index of the family (gene) being swapped
        day1 (int): Current day assignment in child1 (will be moved to child2)
        day2 (int): Current day assignment in child2 (will be moved to child1)
        
    Effects:
        - Updates both children's assigned_days arrays
        - Modifies both children's daily_attendance counts
        - No return value (modifies chromosomes in-place)
    """
    child1.update_attendance(family_idx, day1, day2, family_size)
    child2.update_attendance(family_idx, day2, day1, family_size)
    child1.assigned_days[family_idx] = day2
    child2.assigned_days[family_idx] = day1

## Crossover
Each gene (day in chromosome) in offspring is obtained probabilistically based on swapping of genes between two parent chromosomes.

In [77]:
def crossover(parent1, parent2, p=1.0, allow_single_swap=False, random_order=False):
    """
    Performs crossover between two parent chromosomes to produce two offspring.
    
    This is a uniform crossover that swaps family-day assignments between parents,
    with validity checks to maintain feasible solutions.
    
    Args:
        parent1, parent2 (Chromosome): Parent solutions to recombine
        p (float): Probability [0-1] of performing a swap for each family (default: 1.0)
        allow_single_swap (bool): If True, allows one-way swaps when two-way isn't possible
        random_order (bool): If True, processes families in random order to avoid bias
    
    Returns:
        tuple: Two new Chromosome objects (child1, child2)
    """
    #  Creating copies of the parents as the initial descendants
    child1 = parent1.copy()
    child2 = parent2.copy()
    
    # 2. Preparing family indexes for processing
    indices = np.arange(len(parent1.assigned_days))
    if random_order:
        np.random.shuffle(indices)

    for family_idx in indices:
        day1 = parent1.assigned_days[family_idx]
        day2 = parent2.assigned_days[family_idx]
        
        # We check whether the exchange will be acceptable for both descendants.
        valid1 = child1.is_swap_valid(family_idx, day2, family_size)
        valid2 = child2.is_swap_valid(family_idx, day1, family_size)
        
        # if exchange is acceptable then swap two genes
        if valid1 and valid2 and np.random.rand() < p:
            perform_swap(child1, child2, family_idx, day1, day2)
        # Partial swap
        elif allow_single_swap:
            if valid1 and np.random.rand() < p: # One-way swap into child1 (if valid)
                child1.update_attendance(family_idx, day1, day2, family_size)
                child1.assigned_days[family_idx] = day2
            elif valid2 and np.random.rand() < p: # One-way swap into child2 (if valid)
                child2.update_attendance(family_idx, day2, day1, family_size)
                child2.assigned_days[family_idx] = day1

    return child1, child2

## Two iplementations of mutation function
Provides an improvement in a family's preferred day (gene)

In [78]:
def mutation(chromosome, mutation_rate=0.1):
    """
    Performs a random valid mutation on a chromosome with given probability.
    
    With probability mutation_rate, selects a random family and moves it to a 
    different valid day that satisfies capacity constraints. The new day is 
    selected randomly from all possible valid alternatives. May not improve solution

    Args:
        chromosome (Chromosome): The solution chromosome to potentially mutate
        mutation_rate (float): Probability [0-1] of performing a mutation
        
    Modifies:
        The input chromosome in-place if mutation occurs
    """
    if np.random.rand() >= mutation_rate:
        return

    family_idx = np.random.randint(len(chromosome.assigned_days))
    current_day = chromosome.assigned_days[family_idx]

    # Random order of days (excluding current)
    possible_days = np.delete(np.arange(1, N_DAYS + 1), current_day - 1)
    np.random.shuffle(possible_days)

    for new_day in possible_days:
        if chromosome.is_swap_valid(family_idx, new_day, family_size):
            chromosome.update_attendance(family_idx, current_day, new_day, family_size)
            chromosome.assigned_days[family_idx] = new_day
            break

In [79]:
def mutation1(chromosome, mutation_rate=0.1):
    """
    Performs a directed mutation that attempts to improve a family's day assignment.
    
    With probability mutation_rate, selects a random family and tries to move it to:
    - A better preferred day (lower rank, i.e. improves solution)
    - That satisfies capacity constraints
    
    Args:
        chromosome (Chromosome): The solution chromosome to mutate
        mutation_rate (float): Probability [0-1] of attempting mutation
        
    Modifies:
        The input chromosome in-place if mutation occurs
    """
    if np.random.rand() >= mutation_rate:
        return

    family_idx = np.random.randint(len(chromosome.assigned_days))
    current_day = chromosome.assigned_days[family_idx]

    # Get the family's preference mapping {day: rank}
    preference_ranks = choice_dict_num[family_idx]

    # Rank of the current assigned day (default to worst if not in prefs)
    current_rank = preference_ranks.get(current_day, 10)

    # Shuffle possible days excluding current
    possible_days = np.delete(np.arange(1, N_DAYS + 1), current_day - 1)
    np.random.shuffle(possible_days)

    for new_day in possible_days:
        new_rank = preference_ranks.get(new_day, 10)
        # Only mutate if it's valid AND strictly better in preference
        if new_rank < current_rank and chromosome.is_swap_valid(family_idx, new_day, family_size):
            chromosome.update_attendance(family_idx, current_day, new_day, family_size)
            chromosome.assigned_days[family_idx] = new_day
            break

In [80]:
def reproduction(
        mutation_func,
        parents,
        crossover_proba=1.0,
        allow_single_swap=False,
        random_order=False,
        mutation_rate=0.01
):
    """
    Generates offspring population through crossover and mutation.
    
    Args:
        mutation_func (function): Mutation operator to apply
        parents (list): Selected parent chromosomes
        crossover_proba (float): Probability of performing crossover [0-1]
        allow_single_swap (bool): If True, allows one-way swaps when two-way isn't possible
        random_order (bool): Whether to process families in random order during crossover
        mutation_rate (float): Probability of mutation for each child
        
    Returns:
        list: New generation of chromosomes
    """
    next_generation = []
    for i in range(0, len(parents), 2):
        parent1, parent2 = parents[i], parents[i + 1]
        # Create children through crossover
        child1, child2 = crossover(parent1, parent2, p=crossover_proba, allow_single_swap=allow_single_swap, random_order=random_order)
        # Apply mutation to both children (independently
        mutation_func(child1, mutation_rate)
        mutation_func(child2, mutation_rate)
        next_generation.extend([child1, child2])
    return next_generation

In [81]:
def epoch_optimal(population):
    """
    Finds the best individual in popultation and its fitness value
    
    Args:
        population: set of chomosomes representing possible solution (array of days assigned to each family)
    
    Returns:
        tuple: best individual, its fitness value
    """
    costs = [cost_function(ind.assigned_days, family_size, choice_rank, penalties_array) for ind in population]
    best_idx = np.argmin(costs)
    return population[best_idx], costs[best_idx]

## Genetic algorihm core
Repeats all the steps in the genetic algorithms until the maximum number of generations is reached.

In [82]:
def genetic_algorithm(
        mutation_func,
        pop_size=100,
        num_generations=200,
        tournament_size=5,
        crossover_proba=1.0,
        allow_single_swap=False,
        random_order=False,
        mutation_rate=0.1,
        elitism_ratio=0.1
):
    """
    Executes the complete genetic algorithm optimization process.
    
    Implements a steady-state genetic algorithm with:
    - Tournament selection
    - Customizable crossover and mutation
    - Elitism preservation
    - Generational replacement
    
    Args:
        mutation_func (function): Mutation operator function
        pop_size (int): Number of individuals in population
        num_generations (int): Maximum generations to run
        tournament_size (int): Size of selection tournaments
        crossover_proba (float): Probability of performing crossover [0-1]
        allow_single_swap (bool): If True, allows one-way swaps when two-way isn't possible
        random_order (bool): Randomize family processing order in crossover
        mutation_rate (float): Probability of mutation per individual
        elitism_ratio (float): Proportion of elites to preserve [0-1]
        
    Returns:
        tuple: (best_chromosome, best_cost) found during evolution
    """
    
    # Create starting population of random valid solutions
    population = [initialize_chromosome() for _ in range(pop_size)]
    # Evaluate initial fitness (cost) for all individuals
    costs = [cost_function(ind.assigned_days, family_size, choice_rank, penalties_array) for ind in population]

    # Sort population by fitness (ascending - lower cost is better)
    sorted_indices = np.argsort(costs)
    population = [population[i] for i in sorted_indices]
    costs = [costs[i] for i in sorted_indices]
    
    # Initialize global best tracking
    best_chromosome = population[0].copy()
    best_cost = costs[0]

    for generation in range(num_generations):
        # Elitism: preserve top individuals
        elite_size = max(1, int(elitism_ratio * pop_size))
        elites = [population[i].copy() for i in range(elite_size)]

        # Selection
        parents = selection(population, pop_size - elite_size, tournament_size)

        # Reproduction
        offspring = reproduction(mutation_func, parents, crossover_proba, allow_single_swap, random_order, mutation_rate)

        # Combine elites and offspring
        population = elites + offspring

        # Find the best in the current generation
        current_best, current_cost = epoch_optimal(population)

        # Update the best solution
        if current_cost < best_cost:
            best_chromosome, best_cost = current_best.copy(), current_cost

        print(f"Generation {generation + 1}: Best Cost = {best_cost}")
    return best_chromosome, best_cost

In [83]:
# launching the genetic algorithm
best_ch, best_c = genetic_algorithm(
        pop_size=100,
        num_generations=4000,
        tournament_size=5,
        crossover_proba=0.5,
        allow_single_swap=True,
        random_order=True,
        mutation_func=mutation1,
        mutation_rate=0.3,
        elitism_ratio=0.1
    )

print("Best Chromosome:", best_ch)
print("Best Cost:", best_c)

Generation 1: Best Cost = 1873972.2528206974
Generation 2: Best Cost = 1868166.3210432974
Generation 3: Best Cost = 1868166.3210432974
Generation 4: Best Cost = 1859596.2188656176
Generation 5: Best Cost = 1855132.6328094492
Generation 6: Best Cost = 1849227.4019925247
Generation 7: Best Cost = 1831967.0279586224


KeyboardInterrupt: 

In [None]:
# saving the best solution
submission['assigned_day'] = best_ch.assigned_days
submission.to_csv('data/submission.csv', index=False)