In [3]:
import pandas as pd
import xarray as xr
import numpy as np
import copy
import math
import random
import warnings

import calliope
from calliope.exceptions import ModelWarning

# Suppress the specific ModelWarning from Calliope
warnings.filterwarnings("ignore", category=ModelWarning)

from deap import base
from deap import creator
from deap import tools

In [5]:
model = calliope.Model('C:/Users/Jacob/Desktop/PythonProjects/GAMGA-Calliope v3.9/GAMGA_model/model_14D.yaml')

model.run()

df_total_cost = model.results.cost.to_series().dropna()
total_cost_optimal = df_total_cost.loc[~df_total_cost.index.map(str).str.contains('co2_emissions')].sum()

print(total_cost_optimal)

energy_cap_df = model.results.energy_cap.to_pandas()
filtered_energy_cap_df = energy_cap_df[~energy_cap_df.index.str.contains("demand|transmission")]

print(filtered_energy_cap_df)

#find Max capacity to use for inf values
initial_capacities = filtered_energy_cap_df.values 
max_cap_value = max(filtered_energy_cap_df)

print(max_cap_value)

updates = [
    {'tech': tech, 'loc': loc}
    for loc_tech in filtered_energy_cap_df.index
    for loc, tech in [loc_tech.split("::")]  # Split index by '::' to separate loc and tech
]

print(updates)

29759791.727324422
loc_techs
region2::solarPV2    10321.864197
region2::battery2     2355.664275
region1::battery1        0.000276
region1::ccgt         5000.000000
region1::solarPV1        0.000027
dtype: float64
10321.864196983837
[{'tech': 'solarPV2', 'loc': 'region2'}, {'tech': 'battery2', 'loc': 'region2'}, {'tech': 'battery1', 'loc': 'region1'}, {'tech': 'ccgt', 'loc': 'region1'}, {'tech': 'solarPV1', 'loc': 'region1'}]


In [6]:
#input variables
NUM_SUBPOPS = 3
SUBPOP_SIZE = 10
GENERATIONS = 100


max_slack = 0.2
optimal_value = total_cost_optimal

In [7]:
def update_energy_cap_max_for_individual(model, updates, individual_values):

    # Ensure the length of updates matches the individual's values
    if len(updates) != len(individual_values):
        raise ValueError("Length of updates and individual values must match.")
    
    # Update the model with the individual's capacity values
    for update, new_cap in zip(updates, individual_values):
        tech = update['tech']
        loc = update['loc']
        
        # Construct the location::technology key and update the model
        loc_tech_key = f"{loc}::{tech}"
        model.backend.update_param('energy_cap_max', {loc_tech_key: new_cap})
        model.backend.update_param('energy_cap_min', {loc_tech_key: new_cap})
    
    # Run the model for this individual
    try:
        rerun_model = model.backend.rerun()  # Rerun to capture updated backend parameters

        # Calculate the total cost, excluding emission costs
        cost_op = rerun_model.results.cost.to_series().dropna()
        initial_cost = round(cost_op.loc[~cost_op.index.map(str).str.contains('co2_emissions')].sum(), 2)

        unmet = rerun_model.results.unmet_demand.to_series().dropna()
        unmet_demand = round(unmet.sum() * 30000, 2) #300 is the penalty for unmet demand

        total_cost = initial_cost + unmet_demand
    
    except Exception as e:
        # If solving fails, set total cost to NaN and print a warning
        total_cost = float('inf')
        print("Warning: Model could not be solved for the individual. Assigning cost as infinite.")
    
    return total_cost

def slack_feasibility(individual):
    cost = update_energy_cap_max_for_individual(model, updates, individual)
    individual.cost = cost  # Attach cost attribute to individual
    slack_distance = (cost - optimal_value) / optimal_value

    # Update feasibility condition based on the new criteria
    feasible = slack_distance <= max_slack #and cost >= optimal_value
    
    return feasible 

def centroidSP(subpop):
    centroids = []

    # Iterate over each subpopulation and calculate the centroid
    for sub in subpop.values():
        if not isinstance(sub, list) or not all(isinstance(individual, list) for individual in sub):
            raise TypeError("Each subpopulation must be a list of lists (individuals).")
        
        num_solutions = len(sub)  # Number of solutions in the current subpopulation
        num_variables = len(sub[0])  # Number of decision variables
        
        # Calculate the centroid for each decision variable
        centroid = [sum(solution[i] for solution in sub) / num_solutions for i in range(num_variables)]
        centroids.append(centroid)  # Append each centroid to the main list in the required format
    
    return centroids

def fitness(subpop, centroids):
    distances = []
    minimal_distances = []
    fitness_SP = {}

    # Step 1: Calculate Distances per Variable for each individual
    for q, (subpop_index, subpopulation) in enumerate(subpop.items()):
        subpopulation_distances = []
        
        for individual in subpopulation:
            individual_variable_distances = []
            
            for p, centroid in enumerate(centroids):
                if p != q:  # Skip the centroid of the same subpopulation
                    variable_distances = [abs(individual[i] - centroid[i]) for i in range(len(individual))]
                    individual_variable_distances.append(variable_distances)
            
            subpopulation_distances.append(individual_variable_distances)
        
        distances.append(subpopulation_distances)

    # Step 2: Calculate Minimal Distances per Variable
    for subpopulation_distances in distances:
        subpopulation_minimal = []
        
        for individual_distances in subpopulation_distances:
            min_distance_per_variable = [min(distance[i] for distance in individual_distances) for i in range(len(individual_distances[0]))]
            subpopulation_minimal.append(min_distance_per_variable)
        
        minimal_distances.append(subpopulation_minimal)

    # Step 3: Calculate Fitness SP for each individual
    for sp_index, subpopulation in enumerate(minimal_distances, start=1):
        fitness_values = [(min(individual),) for individual in subpopulation]
        fitness_SP[sp_index] = fitness_values

    return fitness_SP

def load_model_with_scenario(scenario_name):

    model = scenario_name 

    # Calculate the optimal cost after running the model
    df_total_cost = model.results.cost.to_series().dropna()
    total_cost_optimal = df_total_cost.loc[~df_total_cost.index.map(str).str.contains('co2_emissions')].sum()
    
    #Update the global optimal value
    global optimal_value
    optimal_value = total_cost_optimal  # Set optimal_value to the new optimal cost
    
    print(f"Optimal cost updated for scenario '{scenario_name}': {optimal_value}")
    
    return model

def custom_tournament(subpopulation, k, tournsize=2):
    selected = []
    zero_fitness_count = 0  # Counter for individuals with fitness (0,)

    while len(selected) < k:
        # Randomly select `tournsize` individuals for the tournament
        tournament = random.sample(subpopulation, tournsize)

        # Check if all individuals in the tournament have a fitness of (0,)
        if all(ind.fitness.values == (0,) for ind in tournament):
            if zero_fitness_count < 2:
                # Select the individual with the lowest cost if all fitness values are (0,)
                best = min(tournament, key=lambda ind: ind.cost)
                selected.append(best)
                zero_fitness_count += 1
            else:
                # Select a random feasible individual if we've reached the max count of (0,) fitness values
                feasible_individuals = [ind for ind in subpopulation if ind.fitness.values != (0,)]
                if feasible_individuals:
                    best = random.choice(feasible_individuals)
                    selected.append(best)
                else:
                    # If no feasible individuals are available, fallback to random selection to avoid empty selection
                    best = random.choice(subpopulation)
                    selected.append(best)
        else:
            # Select based on fitness if there are feasible individuals in the tournament
            best = max(tournament, key=lambda ind: ind.fitness.values[0])
            selected.append(best)

    return selected

In [None]:
creator.create("FitnessMaxDist", base.Fitness, weights=(1.0,))  # Fitness to maximize distinctiveness
creator.create("IndividualSP", list, fitness=creator.FitnessMaxDist, cost=0)  # Individual structure in DEAP

def generate_individual():
    # Adjust each capacity with ±1, ensuring no values are negative
    adjusted_individual = [max(0, cap + random.choice([-1, 0, 1])) for cap in initial_capacities]
    return adjusted_individual

# DEAP toolbox setup
toolbox = base.Toolbox()

# Register the individual and subpopulation initializers
toolbox.register("individualSP", tools.initIterate, creator.IndividualSP, generate_individual)
toolbox.register("subpopulationSP", tools.initRepeat, list, toolbox.individualSP)

#register the operators
toolbox.register("mate", tools.cxUniform)
toolbox.register("elitism", tools.selBest, fit_attr="fitness.values")
toolbox.register("tournament", custom_tournament)
toolbox.register("mutbound", tools.mutPolynomialBounded)
toolbox.register('mutate', tools.mutGaussian, sigma=0.5)

# # Generate subpopulations with multiple individuals
# subpops_unaltered = [toolbox.subpopulationSP(n=SUBPOP_SIZE) for _ in range(NUM_SUBPOPS)]

# subpops_SP = {}

# for p in range(NUM_SUBPOPS):
#     subpops_SP[p+1] = subpops_unaltered[p]


In [None]:
#calculate centroids and fitness
centroids = centroidSP(subpops_SP)
fitness_populations = fitness(subpops_SP, centroids)

# Combine the fitness values with each individual
for i, subpopulation in subpops_SP.items():
    for individual, fit in zip(subpopulation, fitness_populations[i]):
        individual.fitness.values = fit 

for subpop_index, subpopulation in subpops_SP.items():      
    # Calculate slack feasibility and set fitness accordingly. This is also where the cost gets assigned as an attribute to the individual
    for idx, individual in enumerate(subpopulation):  # Use enumerate to get the index
        slack_validity = slack_feasibility(individual)
        if slack_validity:
            individual.fitness.values = individual.fitness.values
        else:
            individual.fitness.values = (0,)
                
        # Print the required details in one line
        print(f"Feasibility: {slack_validity}, Fitness: {individual.fitness.values}, Cost: {individual.cost}, Values: {individual[:]}, Subpop: {subpop_index}, Ind: {idx + 1}")

In [None]:
# Initialize containers to store the fitness statistics

g = 0
while g < GENERATIONS: 
    g += 1
    print(f"-- Generation {g} --")

    offspring = {}
    for subpop_index, subpopulation in subpops_SP.items():

        # Select the next generation individuals
        # Preserve the top ~% as elites and select the rest through tournament selection
        elite_count = int(0.1 * len(subpopulation))
        elite_selection = toolbox.elitism(subpopulation, elite_count)
        offspring[subpop_index] = (elite_selection + toolbox.tournament(subpopulation, (len(subpopulation) - elite_count)))

        # Clone the selected individuals
        offspring[subpop_index] = list(map(toolbox.clone, offspring[subpop_index]))




        # Apply crossover
        for child1, child2 in zip(offspring[subpop_index][::2], offspring[subpop_index][1::2]): 
            if random.random() < 0.5:  # Use updated crossover probability
                toolbox.mate(child1, child2, indpb=0.5) 
                del child1.fitness.values 
                del child2.fitness.values
                del child1.cost
                del child2.cost 



        # Apply mutation
        for mutant in offspring[subpop_index]:
            if random.random() <= 1:
                # Apply mutPolynomialBounded with shared bounds
                mutant, = toolbox.mutate(mutant, mu=300, indpb=0.1)
                mutant[:] = [max(0, val) for val in mutant]  # Ensure values are non-negative
                
                # Delete fitness to ensure re-evaluation
                if hasattr(mutant.fitness, 'values'):
                    del mutant.fitness.values
                
                if hasattr(mutant.cost, 'values'):
                    del mutant.cost
                



    
    # Calculate slack feasibility and set fitness accordingly
    feasible_individuals = {subpop_index: [] for subpop_index in offspring.keys()}  # Dictionary format
    infeasible_individuals = {subpop_index: [] for subpop_index in offspring.keys()}

    for subpop_index, subpopulation in offspring.items():
        # Step 1: Calculate slack feasibility
        for idx, individual in enumerate(subpopulation):
            slack_validity = slack_feasibility(individual)

            if slack_validity:
                feasible_individuals[subpop_index].append(individual)
                print(f"Feasible - Subpop: {subpop_index}, Ind: {idx + 1}, Values: {individual}, Fitness: {individual.fitness.values}")
            else:
                # Replace infeasible individuals with elites
                if elite_selection:  # Ensure there are elites to replace with
                    replacement = elite_selection.pop(0)  # Take one elite
                    subpopulation[idx] = toolbox.clone(replacement)  # Replace with a clone of the elite
                    feasible_individuals[subpop_index].append(subpopulation[idx])  # Add to feasible
                    print(f"Replaced with Elite - Subpop: {subpop_index}, Ind: {idx + 1}, Values: {subpopulation[idx]}, Fitness: {subpopulation[idx].fitness.values}")
                else:
                    # If no elites are left, assign zero fitness
                    individual.fitness.values = (0,)
                    infeasible_individuals[subpop_index].append(individual)
                    print(f"Infeasible - Subpop: {subpop_index}, Ind: {idx + 1}, Values: {individual}, Fitness: {individual.fitness.values}")


        
    # Step 2: Calculate centroids and fitness for feasible individuals
    if feasible_individuals:  # Ensure there are feasible individuals
        centroids_offspring = copy.deepcopy(centroidSP(feasible_individuals))
        fitness_SP_offspring = fitness(feasible_individuals, centroids_offspring)

        # Debug print: Check centroids and fitness before assignment
        print("Centroids for Offspring:", centroids_offspring)
        print("Fitness Calculated for Feasible Individuals:", fitness_SP_offspring)

        # Assign calculated fitness to feasible individuals
        for subpop_index, individuals in feasible_individuals.items():
            print(f"Assigning fitness to Subpopulation {subpop_index}:")  # Debug print
            if individuals:  # Ensure there are individuals to process
                for idx, individual in enumerate(individuals):
                    print(f"Before Assignment - Ind: {idx + 1}, Fitness: {individual.fitness.values}")
                    individual.fitness.values = fitness_SP_offspring[subpop_index][idx]
                    print(f"After Assignment - Ind: {idx + 1}, Fitness: {individual.fitness.values}")
            else:
                print(f"Warning: No feasible individuals in Subpopulation {subpop_index}")


    # Combine feasible and infeasible individuals to form the new offspring
    for subpop_index in offspring.keys():
        print(f"--- Debugging Subpopulation {subpop_index} ---")
        
        # Print counts of feasible and infeasible individuals
        feasible_count = len(feasible_individuals[subpop_index])
        infeasible_count = len(infeasible_individuals[subpop_index])
        print(f"Feasible Count: {feasible_count}, Infeasible Count: {infeasible_count}")

        # Validate total population size
        original_size = len(offspring[subpop_index])
        combined_size = feasible_count + infeasible_count
        print(f"Original Population Size: {original_size}, Combined Size After Merge: {combined_size}")
        assert combined_size == original_size, (
            f"Mismatch in population size for Subpopulation {subpop_index}: "
            f"Original: {original_size}, Combined: {combined_size}"
        )
        
        # Print a few individuals for validation
        print("Feasible Individuals:")
        for idx, ind in enumerate(feasible_individuals[subpop_index][:5], start=1):  # Print up to 5 feasible individuals
            print(f"  Ind {idx}: Fitness: {ind.fitness.values}, Values: {ind}")
        
        print("Infeasible Individuals:")
        for idx, ind in enumerate(infeasible_individuals[subpop_index][:5], start=1):  # Print up to 5 infeasible individuals
            print(f"  Ind {idx}: Fitness: {ind.fitness.values}, Values: {ind}")
        
        # Combine and update offspring
        offspring[subpop_index] = feasible_individuals[subpop_index] + infeasible_individuals[subpop_index]
        print(f"New Offspring Size: {len(offspring[subpop_index])}")
        print("-" * 40)




    # Step 3: Print generation summary for all subpopulations
    for subpop_index, subpopulation in offspring.items():
        for idx, individual in enumerate(subpopulation):
            cost = getattr(individual, 'cost', 'N/A')  # Safeguard for missing cost attribute
            print(f"Fitness: {individual.fitness.values}, Cost: {cost}, Values: {individual}, Subpop: {subpop_index}, Ind: {idx + 1}")
    print("-" * 40)


    # Save offspring as the current generation
    subpops_SP = offspring


In [9]:
input_params = model.backend.access_model_inputs()

# Access energy_cap_max and energy_cap_min
energy_cap_max = input_params['energy_cap_max']
energy_cap_min = input_params['energy_cap_min']

# Convert to DataFrame for filtering
energy_cap_max_df = energy_cap_max.to_dataframe()
energy_cap_min_df = energy_cap_min.to_dataframe()

# Filter out rows with 'demand' or 'free' in the index
energy_cap_max_filtered = energy_cap_max_df[~energy_cap_max_df.index.get_level_values('loc_techs').str.contains("demand|transmission")]
energy_cap_min_filtered = energy_cap_min_df[~energy_cap_min_df.index.get_level_values('loc_techs').str.contains("demand|transmission")]

lowup = [
    [energy_cap_min_filtered.loc[loc_tech, 'energy_cap_min'], energy_cap_max_filtered.loc[loc_tech, 'energy_cap_max']]
    for loc_tech in energy_cap_max_filtered.index
]

low_up_bound = copy.deepcopy(lowup)

for i, (low, up) in enumerate(low_up_bound):
    if up == float('inf'):
        print(f"technology {i} has 'inf' as the upper bound.")
        low_up_bound[i][1] = max_cap_value
    
    else:
        print(f"technology {i} has a finite upper bound: {up}")

print("Updated low_up_bound:", low_up_bound)

technology 0 has a finite upper bound: 15000.0
technology 1 has a finite upper bound: 10000.0
technology 2 has a finite upper bound: 10000.0
technology 3 has a finite upper bound: 5000.0
technology 4 has a finite upper bound: 15000.0
Updated low_up_bound: [[0.0, 15000.0], [0.0, 10000.0], [0.0, 10000.0], [0.0, 5000.0], [0.0, 15000.0]]


In [22]:
creator.create("FitnessMaxDist", base.Fitness, weights=(1.0,))  # Fitness to maximize distinctiveness
creator.create("IndividualSP", list, fitness=creator.FitnessMaxDist, cost=0)  # Individual structure in DEAP

def generate_individual():
    # Generate a new individual with capacities within the defined bounds
    adjusted_individual = [
        max(low, min(up, cap + random.choice([-10000, 0, 1000])))
        for cap, (low, up) in zip(initial_capacities, low_up_bound)]
    return adjusted_individual

# DEAP toolbox setup
toolbox = base.Toolbox()

# Register the individual and subpopulation initializers
toolbox.register("individualSP", tools.initIterate, creator.IndividualSP, generate_individual)
toolbox.register("subpopulationSP", tools.initRepeat, list, toolbox.individualSP)


A class named 'FitnessMaxDist' has already been created and it will be overwritten. Consider deleting previous creation of that class or rename it.


A class named 'IndividualSP' has already been created and it will be overwritten. Consider deleting previous creation of that class or rename it.



In [23]:
# Generate subpopulations with multiple individuals
subpops_unaltered = [toolbox.subpopulationSP(n=SUBPOP_SIZE) for _ in range(NUM_SUBPOPS)]

subpops_SP = {}

for p in range(NUM_SUBPOPS):
    subpops_SP[p+1] = subpops_unaltered[p]

In [24]:
subpops_SP

{1: [[11321.864196983837, 2355.6642747130004, 0.0002756470546050674, 0.0, 0.0],
  [321.86419698383725, 0.0, 0.0, 0.0, 2.7097687678931826e-05],
  [10321.864196983837,
   0.0,
   0.0002756470546050674,
   4999.999999992175,
   2.7097687678931826e-05],
  [11321.864196983837, 0.0, 1000.0002756470547, 0.0, 0.0],
  [321.86419698383725, 0.0, 0.0002756470546050674, 0.0, 0.0],
  [11321.864196983837, 0.0, 1000.0002756470547, 4999.999999992175, 0.0],
  [11321.864196983837,
   2355.6642747130004,
   0.0002756470546050674,
   4999.999999992175,
   1000.0000270976877],
  [11321.864196983837,
   2355.6642747130004,
   0.0002756470546050674,
   4999.999999992175,
   1000.0000270976877],
  [10321.864196983837, 0.0, 0.0, 0.0, 2.7097687678931826e-05],
  [10321.864196983837, 0.0, 1000.0002756470547, 5000.0, 0.0]],
 2: [[11321.864196983837, 2355.6642747130004, 0.0, 0.0, 1000.0000270976877],
  [321.86419698383725, 0.0, 0.0, 4999.999999992175, 2.7097687678931826e-05],
  [321.86419698383725,
   2355.664274713