In [1]:
# Standard library imports
import sqlite3
import json
import datetime
import random

# Third-party library imports
import pandas as pd
import numpy as np
from collections import defaultdict

# Local application/library imports
from models import clustering
from utils.sql import get_table
from utils.utilities import generate_f1_calendar

In [2]:
def get_circuits_for_population(n=None, seed=None, season=None):
    """
    Generate a DataFrame based on the provided seed or season.

    Args:
        n (int, optional): Number of circuits to sample.
        seed (int, optional): Seed value for random operations.
        season (int, optional): Season year for filtering or processing.

    Returns:
        pd.DataFrame: ['geo_id', 'code', 'circuit', 'city', 'country', 'latitude', 'longitude',
       'first_gp_probability', 'last_gp_probability', 'cluster_id'] 
    """
    if (seed is not None and season is not None) or (seed is None and season is None):
        raise ValueError("Exactly one of 'seed' or 'season' must be provided.")

    if seed is not None:
        circuit_names_random = clustering.get_random_sample(n, seed=seed, info=True)
        circuits_random = clustering.get_random_sample(n, seed=seed, info=False)
        clustersized_circuits_random = clustering.clusterize_circuits(df=circuits_random, verbose=False)
        prereq_random = pd.merge(circuit_names_random, clustersized_circuits_random[['city', 'cluster_id']], on='city', how='left')
        prereq_random.columns = ['geo_id', 'code', 'circuit', 'city', 'country', 'latitude', 'longitude',
                                 'first_gp_probability', 'last_gp_probability', 'cluster_id']
        return prereq_random
    if season is not None:
        circuit_names = clustering.get_historical_cities(season, info=True)
        circuits = clustering.get_historical_cities(season, info=False)
        clustersized_circuits = clustering.clusterize_circuits(year=season)
        prereq = pd.merge(circuit_names, clustersized_circuits[['city', 'cluster_id']], on='city', how='left')
        return prereq

In [3]:
initial_circuits = get_circuits_for_population(season=2025)[['code','cluster_id','first_gp_probability','last_gp_probability']]
initial_circuits.columns = ['circuit_name', 'cluster_id', 'start_freq_prob', 'end_freq_prob']

In [4]:
def shuffle_respecting_clusters(circuits_to_shuffle, cluster_assignments, verbose=False):
    """
    Shuffles a list of circuits while keeping circuits from the same cluster consecutive.

    Args:
        circuits_to_shuffle (list): List of circuit names to shuffle.
        cluster_assignments (dict): Mapping {circuit_name: cluster_id}.
        verbose (bool): If True, prints detailed information about the process.

    Returns:
        list: A shuffled list of circuit names respecting clusters.
    """
    if not circuits_to_shuffle:
        if verbose:
            print("No circuits to shuffle. Returning an empty list.")
        return []

    clusters = defaultdict(list)
    for circuit in circuits_to_shuffle:
        cluster_id = cluster_assignments.get(circuit, 'unknown')
        clusters[cluster_id].append(circuit)

    if verbose:
        print(f"Clusters before shuffling: {dict(clusters)}")

    # Shuffle circuits within each cluster
    for cluster_id in clusters:
        random.shuffle(clusters[cluster_id])  # Uses 'random' module
        if verbose:
            print(f"Shuffled cluster {cluster_id}: {clusters[cluster_id]}")

    # Shuffle the order of cluster IDs
    cluster_order = list(clusters.keys())
    if 'unknown' in cluster_order:
        cluster_order.remove('unknown')
        random.shuffle(cluster_order)  # Uses 'random' module
        if clusters['unknown']:
            cluster_order.append('unknown')
    else:
        random.shuffle(cluster_order)  # Uses 'random' module

    if verbose:
        print(f"Shuffled cluster order: {cluster_order}")

    # Concatenate based on shuffled cluster order
    final_sequence = []
    for cluster_id in cluster_order:
        final_sequence.extend(clusters[cluster_id])

    if verbose:
        print(f"Final shuffled sequence: {final_sequence}")

    return final_sequence


def generate_initial_population(circuits_df, population_size, seed=None, verbose=False):
    """
    Generates an initial population for the GA using a mix of strategies,
    with an option for reproducible results using a seed.

    Args:
        circuits_df (pd.DataFrame): DataFrame containing circuit information.
                                     Must include columns: 'circuit_name', 'cluster_id',
                                     'start_freq_prob', 'end_freq_prob'.
        population_size (int): The total number of individuals (calendars) to generate.
        seed (int, optional): A random seed for reproducibility. If None, results
                              will vary on each run. Defaults to None.
        verbose (bool): If True, prints detailed information about the process.

    Returns:
        list: A list of lists, where each inner list is a chromosome (calendar sequence).
    """
    if seed is not None:
        random.seed(seed)       # Seed for the 'random' module
        np.random.seed(seed)    # Seed for the 'numpy.random' module
        if verbose:
            print(f"Random seeds set to: {seed}")
    else:
        if verbose:
            print("No random seed provided, results will vary.")

    population = []

    required_cols = {'circuit_name', 'cluster_id', 'start_freq_prob', 'end_freq_prob'}
    if not required_cols.issubset(circuits_df.columns):
        raise ValueError(f"Input DataFrame missing required columns: {required_cols - set(circuits_df.columns)}")

    circuit_list = circuits_df['circuit_name'].tolist()
    cluster_assignments = pd.Series(circuits_df.cluster_id.values,
                                    index=circuits_df.circuit_name).to_dict()

    start_df = circuits_df[circuits_df['start_freq_prob'] > 0]
    start_circuits = []
    start_probs = []
    if not start_df.empty:
        start_circuits = start_df['circuit_name'].tolist()
        start_probs = (start_df['start_freq_prob'] / start_df['start_freq_prob'].sum()).tolist()

    end_df = circuits_df[circuits_df['end_freq_prob'] > 0]
    end_circuits = []
    end_probs = []
    if not end_df.empty:
        end_circuits = end_df['circuit_name'].tolist()
        end_probs = (end_df['end_freq_prob'] / end_df['end_freq_prob'].sum()).tolist()

    num_b = int(population_size * 0.60)  # Cluster-Respecting Random
    num_c_start = int(population_size * 0.10)  # Historical Opener
    num_c_start_end = int(population_size * 0.20)  # Historical Opener & Finale
    num_c_end = population_size - num_b - num_c_start - num_c_start_end  # Historical Finale

    if verbose:
        print(f"Population size: {population_size}")
        print(f"Method B (Cluster-Respecting Random): {num_b}")
        print(f"Method C-Start (Historical Opener): {num_c_start}")
        print(f"Method C-StartEnd (Historical Opener & Finale): {num_c_start_end}")
        print(f"Method C-End (Historical Finale): {num_c_end}")

    # Method B: Cluster-Respecting Random (60%)
    if verbose:
        print(f"Generating {num_b} individuals using Method B...")
    for _ in range(num_b):
        population.append(shuffle_respecting_clusters(circuit_list, cluster_assignments, verbose=verbose))

    # Method C-Start: Historical Opener (10%)
    if verbose:
        print(f"Generating {num_c_start} individuals using Method C-Start...")
    if start_circuits:
        for _ in range(num_c_start):
            start_circuit = np.random.choice(start_circuits, p=start_probs)  # Uses numpy.random
            remaining = [c for c in circuit_list if c != start_circuit]
            middle = shuffle_respecting_clusters(remaining, cluster_assignments, verbose=verbose)
            population.append([start_circuit] + middle)
    else:
        if verbose:
            print("Warning: No start frequencies > 0 provided, using Method B instead for C-Start.")
        for _ in range(num_c_start):
            population.append(shuffle_respecting_clusters(circuit_list, cluster_assignments, verbose=verbose))

    # Method C-StartEnd: Historical Opener & Finale (20%)
    if verbose:
        print(f"Generating {num_c_start_end} individuals using Method C-StartEnd...")
    if start_circuits and end_circuits and len(circuit_list) > 1:
        for _ in range(num_c_start_end):
            start_circuit = np.random.choice(start_circuits, p=start_probs)  # Uses numpy.random
            end_circuit = np.random.choice(end_circuits, p=end_probs)        # Uses numpy.random
            attempts = 0
            while end_circuit == start_circuit and attempts < 10:
                end_circuit = np.random.choice(end_circuits, p=end_probs)
                attempts += 1

            remaining = [c for c in circuit_list if c != start_circuit and c != end_circuit]
            middle = shuffle_respecting_clusters(remaining, cluster_assignments, verbose=verbose)
            population.append([start_circuit] + middle + [end_circuit])
    else:
        if verbose:
            print("Warning: No start/end frequencies or not enough circuits, using Method B instead for C-StartEnd.")
        for _ in range(num_c_start_end):
            population.append(shuffle_respecting_clusters(circuit_list, cluster_assignments, verbose=verbose))

    # Method C-End: Historical Finale (10%)
    if verbose:
        print(f"Generating {num_c_end} individuals using Method C-End...")
    if end_circuits:
        for _ in range(num_c_end):
            end_circuit = np.random.choice(end_circuits, p=end_probs)  # Uses numpy.random
            remaining = [c for c in circuit_list if c != end_circuit]
            middle = shuffle_respecting_clusters(remaining, cluster_assignments, verbose=verbose)
            population.append(middle + [end_circuit])
    else:
        if verbose:
            print("Warning: No end frequencies > 0 provided, using Method B instead for C-End.")
        for _ in range(num_c_end):
            population.append(shuffle_respecting_clusters(circuit_list, cluster_assignments, verbose=verbose))

    random.shuffle(population)

    final_population = population[:population_size]
    if verbose:
        print(f"Generated total population size: {len(final_population)}")

    return final_population

In [5]:
pop = generate_initial_population(initial_circuits,population_size=200,seed=42,verbose=True) 

Random seeds set to: 42
Population size: 200
Method B (Cluster-Respecting Random): 120
Method C-Start (Historical Opener): 20
Method C-StartEnd (Historical Opener & Finale): 40
Method C-End (Historical Finale): 20
Generating 120 individuals using Method B...
Clusters before shuffling: {'2': ['BAHSAK', 'SAUJED', 'QATLUS', 'UAEYAS'], '1': ['AUSMEL'], '5': ['CHISHA', 'JAPSUZ'], '3': ['USAMIA', 'CANMON', 'USAAUS', 'MEXMEX', 'USALAS'], '0': ['ITAIMO', 'MONMON', 'SPACAT', 'AUSSPI', 'UKGSIL', 'BELSPA', 'HUNBUD', 'NETZAN', 'ITAMON', 'AZEBAK'], '6': ['SINMAR'], '4': ['BRASAO']}
Shuffled cluster 2: ['QATLUS', 'SAUJED', 'UAEYAS', 'BAHSAK']
Shuffled cluster 1: ['AUSMEL']
Shuffled cluster 5: ['JAPSUZ', 'CHISHA']
Shuffled cluster 3: ['MEXMEX', 'USAMIA', 'USAAUS', 'USALAS', 'CANMON']
Shuffled cluster 0: ['AUSSPI', 'SPACAT', 'UKGSIL', 'AZEBAK', 'BELSPA', 'NETZAN', 'ITAIMO', 'HUNBUD', 'MONMON', 'ITAMON']
Shuffled cluster 6: ['SINMAR']
Shuffled cluster 4: ['BRASAO']
Shuffled cluster order: ['5', '6', '2

In [6]:
# Create a DataFrame from the population list
pop_df = pd.DataFrame(pop)

# Optionally, rename the columns to indicate the sequence order
pop_df.columns = [f"Position_{i+1}" for i in range(pop_df.shape[1])]

pop[0]

['SAUJED',
 'BAHSAK',
 'QATLUS',
 'UAEYAS',
 'CHISHA',
 'JAPSUZ',
 'AUSSPI',
 'BELSPA',
 'UKGSIL',
 'ITAMON',
 'AZEBAK',
 'SPACAT',
 'HUNBUD',
 'MONMON',
 'NETZAN',
 'ITAIMO',
 'AUSMEL',
 'BRASAO',
 'CANMON',
 'USAMIA',
 'USAAUS',
 'MEXMEX',
 'USALAS',
 'SINMAR']

In [7]:
def fragmentation_score(lst: list[int], non_linear_power: float = 2.0) -> float:
    if not lst:
        return 0.0  # empty list = perfect grouping

    total_score = 0
    max_score = 0

    for digit in set(lst):
        indices = [i for i, x in enumerate(lst) if x == digit]
        if len(indices) == 1:
            total_score += 0  # perfectly grouped
            max_score += 1
            continue

        groups = 1
        for i in range(1, len(indices)):
            if indices[i] != indices[i-1] + 1:
                groups += 1

        # Raw grouping score: 0 (best) to 1 (worst)
        raw_score = (groups - 1) / (len(indices) - 1)
        total_score += raw_score
        max_score += 1

    normalized_score = total_score / max_score if max_score > 0 else 0.0

    # Non-linear scaling: square or higher power makes low scores harder to reach
    scaled_score = normalized_score ** (1 / non_linear_power)
    return scaled_score

In [12]:
def calculate_fitness(circuits_df, circuits_seq: list, season=2026, regression=False, clusters=False, verbose=False):
    """
    Calculate the fitness of a given circuit list based on cluster assignments.

    Args:
        circuit_df (dataframe): DataFrame containing circuit information.
            1. 'circuit_name': Name of the circuit.
            2. 'cluster_id': Cluster ID of the circuit.
        circuits_seq (list): Sequence of circuit codes.
        season (int): Season year to simulate calendar dates.
        regression (bool): If True, uses regression-based fitness calculation.
        clusters (bool): If True, uses cluster-based fitness calculation.
        verbose (bool): If True, prints detailed information about the process.

    Returns:
        tuple: Fitness score for the given circuit sequence.
            inf: If the circuit sequence is empty or too short.
            float: Fitness score based on emissions and penalties.
    """
    if not circuits_seq or len(circuits_seq) < 15:
        if verbose:
            print("Circuit sequence is empty or too short. Returning fitness score of 0.")
        return float('inf')

    total_emissions = 0.0
    total_penalties = 0.0
    
    if not regression:
        if verbose:
            print('Regression is set to False. Using synthetic data for fitness calculation.')
            print('Getting travel logistics...')
        travel_logistic_keys = [(circuits_seq[i], circuits_seq[i+1]) for i in range(len(circuits_seq) - 1)]
        travel_logistic_keys = [f"{travel_logistic_key[0]}-{travel_logistic_key[1]}" for travel_logistic_key in travel_logistic_keys]
        if verbose:
            print('Travel logistics keys:', travel_logistic_keys)
        # Fetch travel logistics data from the database
        travel_logistics_df = get_table("travel_logistic")

        # Filter the DataFrame for rows where 'code' matches the travel logistic keys
        filtered_logistics = travel_logistics_df[travel_logistics_df['codes'].isin(travel_logistic_keys)]

        # Extract the effort scores
        effort_scores = filtered_logistics['effort_score'].tolist()
        total_emissions = round(sum(effort_scores), 2)
        
        if verbose:
            print("Effort scores:", effort_scores)
            print("Total emissions:", total_emissions)
    
    total_cluster_penalties = 0.0
    
    if clusters:
        cluster_dict = circuits_df.groupby('cluster_id')['circuit_name'].apply(list).to_dict()
        if verbose:
            print("Cluster dictionary has been created with the following keys:", cluster_dict.keys())
        cluster_ids = [key for circuit in circuits_seq for key, value in cluster_dict.items() if circuit in value]
        if verbose:
            print("Cluster IDs for the given circuit sequence:", cluster_ids)
        fragmentation_score_value = fragmentation_score(cluster_ids, non_linear_power=1.0)
        if verbose:
            print("Fragmentation score:", fragmentation_score_value)
        weight = travel_logistics_df['effort_score'].mean() if not travel_logistics_df.empty else 0
        if verbose:
            print("Weight:", weight)
        total_cluster_penalties = round(fragmentation_score_value * weight, 2)
    
    total_conflict_penalties = 0.0
    calendar = generate_f1_calendar(year=season, n=len(circuits_seq), verbose=False)
    if calendar:
        if verbose:
            print("Generated calendar:", calendar)
        # Assign each circuit in circuits_seq to a date in the calendar
        circuit_date_mapping = {circuit: calendar[i] for i, circuit in enumerate(circuits_seq)}

        # Fetch the fone_geography table
        fone_geography_df = get_table("fone_geography")

        # Check for month conflicts
        total_conflicts = 0
        for circuit, date in circuit_date_mapping.items():
            month_assigned = int(date[-2:].lstrip("0"))
            months_to_avoid = fone_geography_df.loc[fone_geography_df['code_6'] == circuit, 'months_to_avoid'].values
            
            if months_to_avoid and isinstance(months_to_avoid.item(), str) and months_to_avoid.item().strip():
                months_to_avoid = [int(x) for x in months_to_avoid.item().strip('[]').split(',') if x.strip().isdigit()]
            else:
                months_to_avoid = []
            if month_assigned in months_to_avoid:
                total_conflicts += 1
                if verbose:
                    print(f"Conflict for circuit {circuit}: assigned month {month_assigned} is in months to avoid {months_to_avoid}.")
        if total_conflicts > 0:
            total_conflict_penalties = total_emissions
    
    total_penalties = total_cluster_penalties + total_conflict_penalties

    return (total_emissions + total_penalties,)

calculate_fitness(initial_circuits, pop[163], season=2026, regression=False, clusters=True, verbose=True)

Regression is set to False. Using synthetic data for fitness calculation.
Getting travel logistics...
Travel logistics keys: ['BAHSAK-AUSMEL', 'AUSMEL-SINMAR', 'SINMAR-CANMON', 'CANMON-USALAS', 'USALAS-MEXMEX', 'MEXMEX-USAAUS', 'USAAUS-USAMIA', 'USAMIA-CHISHA', 'CHISHA-NETZAN', 'NETZAN-AZEBAK', 'AZEBAK-UKGSIL', 'UKGSIL-AUSSPI', 'AUSSPI-BELSPA', 'BELSPA-ITAIMO', 'ITAIMO-SPACAT', 'SPACAT-ITAMON', 'ITAMON-MONMON', 'MONMON-HUNBUD', 'HUNBUD-SAUJED', 'SAUJED-UAEYAS', 'UAEYAS-QATLUS', 'QATLUS-BRASAO', 'BRASAO-JAPSUZ']
Effort scores: [142.82999999999998, 3387.787340273505, 30.1455, 18737.387598574023, 12112.617739660875, 6057.645989098569, 8867.643456932945, 89.91359999999999, 85.1373, 374.60249999999996, 14805.756414053345, 192.30119999999997, 103.68539999999999, 135.5985, 4030.5833650412123, 94.43789999999998, 167.17499999999998, 3652.580425070676, 11883.64602224478, 13248.545775544215, 117.1638, 265.16880000000003, 56.1123]
Total emissions: 98638.47
Cluster dictionary has been created with 

(198692.01,)

In [9]:
import random

# --- Selection Operator ---

def tournament_selection(population, fitnesses, k, num_parents, verbose=False):
    """
    Selects parents using Tournament Selection.

    Args:
        population (list): The current population of chromosomes (lists).
        fitnesses (list): A list of fitness scores corresponding to the population.
                          Lower scores are assumed to be better.
        k (int): The size of the tournament (e.g., 3, 5).
        num_parents (int): The number of parents to select.
        verbose (bool): If True, prints detailed information about the selection process.

    Returns:
        list: A list containing the selected parent chromosomes.
    """
    selected_parents = []
    population_size = len(population)
    
    if population_size == 0:
        if verbose:
            print("Population is empty. Returning an empty list.")
        return []

    if len(fitnesses) != population_size:
        raise ValueError("Population and fitnesses list must have the same size.")

    for parent_idx in range(num_parents):
        tournament_indices = random.sample(range(population_size), k)
        if verbose:
            print(f"Tournament {parent_idx + 1}: Selected indices {tournament_indices}")

        best_index_in_tournament = -1
        min_fitness = float('inf')
        
        for index in tournament_indices:
            if fitnesses[index] < min_fitness:
                min_fitness = fitnesses[index]
                best_index_in_tournament = index
        
        if verbose:
            print(f"Tournament {parent_idx + 1}: Winner index {best_index_in_tournament} with fitness {min_fitness}")

        if best_index_in_tournament != -1:
            selected_parents.append(population[best_index_in_tournament])
        else:
            selected_parents.append(population[tournament_indices[0]])

    return selected_parents

def order_crossover(parent1, parent2, verbose=False):
    """
    Performs Order Crossover (OX1) on two parent permutations.

    Args:
        parent1 (list): The first parent chromosome (list of circuits).
        parent2 (list): The second parent chromosome (list of circuits).
        verbose (bool): If True, prints detailed information about the crossover process.

    Returns:
        list: The resulting offspring chromosome.
    """
    size = len(parent1)
    offspring = [None] * size

    cut1, cut2 = sorted(random.sample(range(size), 2))
    if verbose:
        print(f"Crossover points: {cut1}, {cut2}")

    offspring[cut1:cut2+1] = parent1[cut1:cut2+1]
    if verbose:
        print(f"Copied segment from parent1: {offspring[cut1:cut2+1]}")

    parent2_elements = []
    for i in range(size):
        idx = (cut2 + 1 + i) % size
        element = parent2[idx]
        if element not in offspring[cut1:cut2+1]:
            parent2_elements.append(element)

    if verbose:
        print(f"Remaining elements from parent2: {parent2_elements}")

    offspring_idx = (cut2 + 1) % size
    parent2_idx = 0
    while None in offspring:
        if offspring[offspring_idx] is None:
            offspring[offspring_idx] = parent2_elements[parent2_idx]
            parent2_idx += 1
        offspring_idx = (offspring_idx + 1) % size

    if verbose:
        print(f"Final offspring: {offspring}")

    return offspring

def swap_mutation(chromosome, verbose=False):
    """
    Performs Swap Mutation on a chromosome (permutation).

    Args:
        chromosome (list): The chromosome (list of circuits) to mutate.
        verbose (bool): If True, prints detailed information about the mutation process.

    Returns:
        list: A new list representing the mutated chromosome.
              Returns the original if length < 2.
    """
    mutated_chromosome = chromosome[:]
    size = len(mutated_chromosome)
    
    if size < 2:
        if verbose:
            print("Chromosome size is less than 2. No mutation performed.")
        return mutated_chromosome

    idx1, idx2 = random.sample(range(size), 2)
    if verbose:
        print(f"Swapping indices {idx1} and {idx2}: {mutated_chromosome[idx1]} <-> {mutated_chromosome[idx2]}")

    mutated_chromosome[idx1], mutated_chromosome[idx2] = mutated_chromosome[idx2], mutated_chromosome[idx1]

    if verbose:
        print(f"Mutated chromosome: {mutated_chromosome}")

    return mutated_chromosome


In [10]:
%pip install deap





[notice] A new release of pip is available: 25.0.1 -> 25.1
[notice] To update, run: python.exe -m pip install --upgrade pip
