In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from solve_network import palette
import yaml

### Load results

In [None]:
def load_configuration(config_path):
    """
    Load configuration settings from a YAML file.
    """
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    return config


config = load_configuration("../config.yaml")

run = "247cfe"  # run name

In [None]:
scaling = int(config["time_sampling"][0])  # temporal scaling -- 3/1 for 3H/1H

# Wildcards & Settings
policy = config["scenario"]["policy"][0]
penetration = float(policy[3:]) / 100 if policy != "ref" else 0
year = config["scenario"]["year"][0]

datacenters = config["ci"]["datacenters"]
locations = list(datacenters.keys())
names = list(datacenters.values())

In [None]:
# techs for CFE hourly matching, extracted from palette
palette_techs = palette("p4")

(
    clean_techs,
    storage_techs,
    storage_charge_techs,
    storage_discharge_techs,
) = palette_techs

# renaming technologies for plotting
clean_chargers = [tech.replace(" ", "_") for tech in storage_charge_techs]
clean_dischargers = [tech.replace(" ", "_") for tech in storage_discharge_techs]


def tech_names(base_names, year):
    return [f"{name.replace(' ', '_')}-{year}" for name in base_names]


# expected technology names with year
exp_generators = tech_names(["offwind-ac", "offwind-dc", "onwind", "solar"], year)
exp_links = tech_names(["OCGT"], year)
exp_chargers = tech_names(["battery charger", "H2 Electrolysis"], year)
exp_dischargers = tech_names(["battery discharger", "H2 Fuel Cell"], year)

# Assign colors
tech_colors = config["tech_colors"]

# Rename mappings
rename_ci_cost = pd.Series(
    {
        "onwind": "onshore wind",
        "solar": "solar",
        "grid": "grid imports",
        "revenue": "revenue",
        "battery_storage": "battery",
        "battery_inverter": "battery",
        "battery_charger": "battery",
        "ironair_storage": "iron-air",
        "ironair_inverter": "iron-air",
        "ironair_charger": "iron-air",
        "hydrogen_storage": "hydrogen storage",
        "hydrogen_electrolysis": "hydrogen storage",
        "hydrogen_fuel_cell": "hydrogen storage",
        "adv_geothermal": "advanced dispatchable",
        "allam_ccs": "NG-Allam",
    }
)

rename_ci_capacity = pd.Series(
    {
        "onwind": "onshore wind",
        "solar": "solar",
        "battery_discharger": "battery",
        "battery_charger": "battery",
        "ironair_discharger": "iron-air",
        "ironair_charger": "iron-air",
        "H2_Fuel_Cell": "hydrogen fuel cell",
        "H2_Electrolysis": "hydrogen electrolysis",
        "adv_geothermal": "advanced dispatchable",
        "allam_ccs": "NG-Allam",
    }
)

preferred_order = pd.Index(
    [
        "advanced dispatchable",
        "NG-Allam",
        "Gas OC",
        "offshore wind",
        "onshore wind",
        "solar",
        "battery",
        "iron-air",
        "hydrogen storage",
        "hydrogen electrolysis",
        "hydrogen fuel cell",
    ]
)

In [None]:
def rename_scen(obj, level=0):
    """
    Automatically append a percent sign to numeric string column names if not already present and rename DataFrame columns.

    Returns:
    - None: Modifies the DataFrame in place.
    """
    if isinstance(obj, pd.DataFrame):
        # Handle DataFrame: Modify column names
        modified_dict = {
            key: f"{key}%" if key.isdigit() and "%" not in key else key
            for key in obj.columns.get_level_values(level)
        }
        obj.rename(columns=modified_dict, level=level, inplace=True)

    elif isinstance(obj, pd.Series):
        # Handle Series: Modify index labels
        modified_dict = {
            key: f"{key}%" if "%" not in key else key
            for key in obj.index.get_level_values(level)
        }
        obj.rename(index=modified_dict, level=level, inplace=True)

### Collect results

In [None]:
import glob


def find_summary_files(run):
    path_pattern = f"../results/{run}/**/summary.csv"
    summary_files = glob.glob(path_pattern, recursive=True)
    return summary_files


summary_files = find_summary_files(run)

In [None]:
df = pd.concat(
    [pd.read_csv(f, index_col=0, header=[0, 1]) for f in summary_files],
    axis=1,
    keys=["-".join(f.split("/")[-3:-1]) for f in summary_files],
)

In [None]:
# select single column (scenario)
df = df.loc[:, "p4-cfe100"]

In [None]:
# Data Preparation
ldf = pd.concat(
    [
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_techs]].rename(
            {"ci_cap_" + t: t for t in clean_techs}
        ),
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_dischargers]].rename(
            {"ci_cap_" + t: t for t in clean_dischargers}
        ),
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_chargers]].rename(
            {"ci_cap_" + t: t for t in clean_chargers}
        ),
    ]
)

# Drop chargers for storage technologies with fixed P/E ratio
to_drop = ["battery_charger", "ironair_charger"]
ldf = ldf.drop(index=[row for row in to_drop if row in ldf.index])

# Drop rows with all values less than 0.1
to_drop = ldf.index[(ldf < 0.1).all(axis=1)]
ldf.drop(to_drop, inplace=True)

# Rename columns and indices
rename_scen(ldf, level=0)
ldf.rename(index=rename_ci_capacity, level=0, inplace=True)

# Reorder and sort the final DataFrame
new_index = preferred_order.intersection(ldf.index).append(
    ldf.index.difference(preferred_order)
)
ldf = ldf.loc[new_index]

In [None]:
# E = ldf.xs("NG-Allam").xs("DE1 0", level=1)
# E

### Plotting

In [None]:
FONTSIZE = 14

In [None]:
def experience_curve(E, C0, E0, LR, LR_error):
    """
    Calculate the investment cost c as a function of experience E.

    Parameters (all float):
    E: Invested capacity (updated experience) in MW.
    C0: The initial investment costs when experience E is E0 in EUR/kW.
    E0: Initial capacity (initial experience) in MW.
    LR: The learning rate, i.e., the percentage cost reduction for each doubling of cumulative experience.
    LR_error: The error in the experience exponent. (removed)
    LR_error: The error in the learning rate.

    Returns (also float):
    alpha: The experience exponent
    c(E): The investment cost c for the given capacity (E + E0).
    """

    # Calculate alpha for central, lower, and upper learning rates
    alpha_ct = np.log2(1 / (1 - LR))
    alpha_lo = np.log2(1 / (1 - (LR + LR_error)))
    alpha_up = np.log2(1 / (1 - (LR - LR_error)))

    # Define the function to calculate costs, to avoid repetition
    def calculate_costs(alpha):
        return C0 * ((E + E0) / E0) ** (-alpha)

    # Calculate the central, lower, and upper costs
    c_ct = calculate_costs(alpha_ct)
    c_lo = calculate_costs(alpha_lo)
    c_up = calculate_costs(alpha_up)

    df = pd.DataFrame({"E": E, "C-central": c_ct, "C-upper": c_up, "C-lower": c_lo})

    # df.set_index('E', inplace=True)

    return df

In [None]:
# experience_curve(E=E_allam, C0=1000, E0=1499, LR=0.1, LR_error=0.05)

In [None]:
def monte_carlo_experience_curve(
    E, C0_mean, C0_std, E0_min, E0_max, LR, LR_error, num_simulations=100
):
    """
    Perform Monte Carlo simulation on the experience curve calculation.

    Parameters:
    - E: Invested capacity (updated experience) in MW.
    - C0_mean: The mean of the initial investment costs distribution.
    - C0_std: The standard deviation of the initial investment costs distribution.
    - E0: Initial capacity (initial experience) in MW.
    - LR: The learning rate.
    - LR_error: The error in the learning rate.
    - num_simulations: Number of Monte Carlo simulations to run.

    Returns:
    - Aggregated results of the simulations.
    """

    C0_samples = np.random.normal(loc=C0_mean, scale=C0_std, size=num_simulations)
    E0_samples = np.random.uniform(low=E0_min, high=E0_max, size=num_simulations)

    all_results = []
    for i in range(num_simulations):
        df = experience_curve(E, C0_samples[i], E0_samples[i], LR, LR_error)
        df_melted = df.reset_index().melt(
            id_vars=["E"],
            value_vars=["C-central", "C-upper", "C-lower"],
            var_name="Scenario",
            value_name="Costs",
        )
        all_results.append(df_melted)

    aggregate_df = pd.concat(all_results)

    return aggregate_df

In [None]:
# aggregate_df = monte_carlo_experience_curve(E_allam, **learning_params_allam)

In [None]:
def plot_experience_curve_mc(aggregate_df, E, **kwargs):
    sns.set(style="darkgrid")

    # Create a figure with two subplots
    fig, axs = plt.subplots(
        1,
        2,
        figsize=(15, 6),
        gridspec_kw={"width_ratios": [2, 1], "wspace": 0.05},
        sharey=True,
    )

    C0_mean = kwargs.get("C0_mean")

    # Extract values and corresponding percentage indices
    values = E.values
    percentage_indices = E.index

    # Extract and plot data for the central cost scenario
    central_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-central"]
        .groupby("E")["Costs"]
        .mean()
    )
    axs[0].plot(values, central_costs, label="Central cost scenario", color="blue")

    # Extract and plot data for the lower and upper bounds
    lower_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-lower"].groupby("E")["Costs"].mean()
    )
    upper_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-upper"].groupby("E")["Costs"].mean()
    )
    axs[0].fill_between(
        values,
        lower_costs,
        upper_costs,
        color="gray",
        alpha=0.3,
        label="Range of costs driven by LR assumption",
    )

    axs[0].set_xlim(left=0)
    axs[0].set_ylim(bottom=0)
    axs[0].set_xlabel("C&I participation (%) and resulting experience (MW)")
    axs[0].set_ylabel("Specific investment cost (EUR/kW)")
    axs[0].set_title(f"Experience Curve for {E.name} technology", fontsize=FONTSIZE)
    axs[0].legend(loc="lower right", fontsize=11)

    # Secondary X-axis for percentage indices
    secax = axs[0].secondary_xaxis("top")
    secax.set_xticks(values)
    secax.set_xticklabels(percentage_indices, verticalalignment="bottom")
    # Draw vertical lines for each secondary axis tick
    for val in values:
        axs[0].axvline(x=val, color="grey", linestyle="--", linewidth=1, alpha=0.5)

    # Cost reduction summary box

    cost_reductions = (C0_mean - central_costs) / C0_mean
    cost_reduction_ranges_lower = (C0_mean - lower_costs) / C0_mean
    cost_reduction_ranges_upper = (C0_mean - upper_costs) / C0_mean

    text_lines = []
    for x, reduction, lower, upper in zip(
        percentage_indices[1:],
        cost_reductions[1:],
        cost_reduction_ranges_lower[1:],
        cost_reduction_ranges_upper[1:],
    ):
        line = f"C&I {x}: learning {round(reduction * 100)}% [{round(upper * 100)}%..{round(lower * 100)}%]"
        text_lines.append(line)

    # Join all lines into a single string
    info_text = "\n".join(text_lines)

    # Place the text box on the plot
    plt.figtext(
        0.619,
        0.27,  # Adjust these values to place the text box appropriately on your plot
        info_text,
        ha="right",
        va="center",
        fontsize=11,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.5", edgecolor="black", facecolor="white"),
    )

    # Monte Carlo plot
    max_E = aggregate_df["E"].max()
    filtered_df = aggregate_df[aggregate_df["E"] == max_E]

    violin_palette = {
        "C-central": "blue",  # Central cost in blue
        "C-upper": "grey",  # Upper cost in grey
        "C-lower": "grey",  # Lower cost in grey
    }

    sns.violinplot(
        x="Scenario",
        y="Costs",
        data=filtered_df,
        ax=axs[1],
        inner="quartile",
        palette=violin_palette,
    )
    axs[1].set_title(
        "Monte Carlo simulation \n (initial technology experience & costs)",
        fontsize=FONTSIZE,
    )
    axs[1].set_xlabel("Cost scenario")
    axs[1].set_ylabel("")

    # Construct the params_text string from kwargs and include E with units
    params_text = "\n".join(
        [
            f"E (MW):   up to {int(round(E.max()))} MW",
            f"C0 (EUR/kW):     {kwargs.get('C0_mean', 'N/A'):>6}",
            f"C0_std (EUR/kW): {kwargs.get('C0_std', 'N/A'):>6}",
            f"E0_min (MW):     {kwargs.get('E0_min', 'N/A'):>6}",
            f"E0_max (MW):     {kwargs.get('E0_max', 'N/A'):>6}",
            f"LR (% points):   {kwargs.get('LR', 'N/A') * 100:>6}",
            f"±LR (% points):  {kwargs.get('LR_error', 'N/A') * 100:>6}",
            f"num_samples:     {kwargs.get('num_simulations', 'N/A'):>6}",
        ]
    )

    # Place the text box above the plots, adjusting the y-coordinate as needed
    plt.figtext(
        0.23,
        0.4,
        params_text,
        ha="center",
        va="top",
        fontsize=11,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.5", edgecolor="black", facecolor="white"),
    )

    # plt.show()
    plt.savefig(f"../manuscript/images/e_curve_{E.name}.pdf", bbox_inches="tight")

In [None]:
E_allam = ldf.xs("NG-Allam").xs("DE1 0", level=1)
E_allam.name = "NG-Allam cycle"
E_allam

In [None]:
C0 = config["costs"][f"allam_ccs_overnight_{year}"]

learning_params_allam = {
    "C0_mean": C0,  # EUR/kW
    "C0_std": 100,  # EUR/kW (own assumption)
    "E0_min": 300,  # 1 project from 3 is realized
    "E0_max": 580,  # 2 projects from 3 are realized
    "LR": 0.15,  # assume range from 10% (like wind) to 20% (battery)
    "LR_error": 0.05,
    "num_simulations": 1000,
}

plot_experience_curve_mc(
    aggregate_df=monte_carlo_experience_curve(E_allam, **learning_params_allam),
    E=E_allam,
    **learning_params_allam,
)

In [None]:
E_ironair = ldf.xs("iron-air").xs("DE1 0", level=1)
E_ironair.name = "iron-air storage"
E_ironair

In [None]:
C0 = (
    config["costs"][f"ironair_energy_{year}"]
    * config["costs"]["USD2023_to_EUR2023"]
    * config["costs"]["ironair_duration"]
    + config["costs"][f"ironair_capacity_{year}"]
    * config["costs"]["USD2023_to_EUR2023"]
)

learning_params_ironair = {
    "C0_mean": int(C0),  # EUR/kW
    "C0_std": 100,  # EUR/kW (own assumption)
    "E0_min": 28,  # half of announced capacity
    "E0_max": 56,  # announced capacity
    "LR": 0.15,
    "LR_error": 0.05,
    "num_simulations": 1000,
}

plot_experience_curve_mc(
    aggregate_df=monte_carlo_experience_curve(E_ironair, **learning_params_ironair),
    E=E_ironair,
    **learning_params_ironair,
)

In [None]:
def plot_experience_curve_mc(
    aggregate_df, E, ax_main, ax_violin, FONTSIZE=12, **kwargs
):
    sns.set(style="darkgrid")

    # Extract parameters from kwargs
    C0_mean = kwargs.pop("C0_mean", None)
    C0_std = kwargs.get("C0_std", "N/A")
    E0_min = kwargs.get("E0_min", "N/A")
    E0_max = kwargs.get("E0_max", "N/A")
    LR = kwargs.get("LR", "N/A")
    LR_error = kwargs.get("LR_error", "N/A")
    num_simulations = kwargs.get("num_simulations", "N/A")

    # Extract values and corresponding percentage indices
    values = E.values
    percentage_indices = E.index

    # Extract and plot data for the central cost scenario
    central_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-central"]
        .groupby("E")["Costs"]
        .mean()
    )
    ax_main.plot(values, central_costs, label="Central cost scenario", color="blue")

    # Extract and plot data for the lower and upper bounds
    lower_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-lower"].groupby("E")["Costs"].mean()
    )
    upper_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-upper"].groupby("E")["Costs"].mean()
    )
    ax_main.fill_between(
        values,
        lower_costs,
        upper_costs,
        color="gray",
        alpha=0.3,
        label="Range of costs driven by LR assumption",
    )

    ax_main.set_xlim(left=0)
    ax_main.set_ylim(bottom=0)
    # Remove individual x-labels to use shared x-labels in combined plot
    ax_main.set_xlabel("")  # Empty string to avoid duplicate labels
    ax_main.set_ylabel("Specific investment cost (EUR/kW)")
    ax_main.set_title(f"Experience curve for {E.name} technology", fontsize=FONTSIZE)
    ax_main.legend(loc="lower right", fontsize=11)

    # Secondary X-axis for percentage indices
    secax = ax_main.secondary_xaxis("top")
    secax.set_xticks(values)
    secax.set_xticklabels(percentage_indices, verticalalignment="bottom")
    # Draw vertical lines for each secondary axis tick
    for val in values:
        ax_main.axvline(x=val, color="grey", linestyle="--", linewidth=1, alpha=0.5)

    # Cost reduction summary box
    cost_reductions = (C0_mean - central_costs) / C0_mean
    cost_reduction_ranges_lower = (C0_mean - lower_costs) / C0_mean
    cost_reduction_ranges_upper = (C0_mean - upper_costs) / C0_mean

    text_lines = []
    for x, reduction, lower, upper in zip(
        percentage_indices[1:],
        cost_reductions[1:],
        cost_reduction_ranges_lower[1:],
        cost_reduction_ranges_upper[1:],
    ):
        line = f"C&I {x}: learning {round(reduction * 100)}% [{round(upper * 100)}%..{round(lower * 100)}%]"
        text_lines.append(line)

    # Join all lines into a single string
    info_text = "\n".join(text_lines)

    # Place the cost reduction text box within the main plot axes
    ax_main.text(
        0.59,
        0.25,  # Position relative to Axes (x=1.05 is slightly to the right)
        info_text,
        transform=ax_main.transAxes,
        ha="left",
        va="center",
        fontsize=11,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.5", edgecolor="black", facecolor="white"),
    )

    # Monte Carlo plot
    max_E = aggregate_df["E"].max()
    filtered_df = aggregate_df[aggregate_df["E"] == max_E]

    violin_palette = {
        "C-central": "blue",  # Central cost in blue
        "C-upper": "grey",  # Upper cost in grey
        "C-lower": "grey",  # Lower cost in grey
    }

    sns.violinplot(
        x="Scenario",
        y="Costs",
        data=filtered_df,
        ax=ax_violin,
        inner="quartile",
        palette=violin_palette,
    )
    ax_violin.set_title(
        "Monte Carlo simulation \n (initial technology experience & costs)",
        fontsize=FONTSIZE,
    )
    ax_violin.set_xlabel("Cost scenario")
    ax_violin.set_ylabel("")  # Remove y-label for violin plot to avoid duplication

    # Construct the params_text string from kwargs and include E with units
    params_text = "\n".join(
        [
            f"E (MW):   up to {int(round(E.max()))} MW",
            f"C0 (EUR/kW):     {C0_mean if C0_mean is not None else 'N/A':>6}",
            f"C0_std (EUR/kW): {C0_std:>6}",
            f"E0_min (MW):     {E0_min:>6}",
            f"E0_max (MW):     {E0_max:>6}",
            f"LR (% points):   {LR * 100 if isinstance(LR, (int, float)) else 'N/A':>6}",
            f"±LR (% points):  {LR_error * 100 if isinstance(LR_error, (int, float)) else 'N/A':>6}",
            f"num_samples:     {num_simulations:>6}",
        ]
    )

    # Place the params_text box within the main plot axes
    ax_main.text(
        0.2,
        0.1,  # Position relative to Axes (y=1.05 is slightly above)
        params_text,
        transform=ax_main.transAxes,
        ha="center",
        va="bottom",
        fontsize=11,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.5", edgecolor="black", facecolor="white"),
    )

In [None]:
def plot_combined_experience_curves(
    aggregate_df_allam,
    E_allam,
    learning_params_allam,
    aggregate_df_ironair,
    E_ironair,
    learning_params_ironair,
    filename="combined_experience_curves.pdf",
    FONTSIZE=12,
):
    sns.set(style="darkgrid")

    # Create a figure with two rows and two columns
    fig, axs = plt.subplots(
        2,
        2,  # Two rows (panels), two columns (main plot and violin plot)
        figsize=(15, 12),
        gridspec_kw={"width_ratios": [2, 1], "wspace": 0.05, "hspace": 0.3},
        sharey="row",  # Share y-axis within each row (panel)
    )

    # Plot for Allam CCS (Top Panel)
    plot_experience_curve_mc(
        aggregate_df=aggregate_df_allam,
        E=E_allam,
        ax_main=axs[0, 0],
        ax_violin=axs[0, 1],
        FONTSIZE=FONTSIZE,
        **learning_params_allam,
    )

    # Plot for IronAir Energy (Bottom Panel)
    plot_experience_curve_mc(
        aggregate_df=aggregate_df_ironair,
        E=E_ironair,
        ax_main=axs[1, 0],
        ax_violin=axs[1, 1],
        FONTSIZE=FONTSIZE,
        **learning_params_ironair,
    )

    # Set a shared x-label for both main plots
    fig.text(
        0.38,
        0.07,  # x, y coordinates in figure space
        "C&I participation (%) and resulting experience (MW)",
        ha="center",
        fontsize=FONTSIZE,
    )

    # plt.show()
    plt.savefig(filename, bbox_inches="tight")
    plt.close(fig)

In [None]:
# Top Panel Parameters for Allam CCS
C0_allam = config["costs"][f"allam_ccs_overnight_{year}"]

learning_params_allam = {
    "C0_mean": C0_allam,  # EUR/kW
    "C0_std": 100,  # EUR/kW (own assumption)
    "E0_min": 300,  # 1 project from 3 is realized
    "E0_max": 580,  # 2 projects from 3 are realized
    "LR": 0.15,  # assume range from 10% (like wind) to 20% (battery)
    "LR_error": 0.05,
    "num_simulations": 1000,
}

aggregate_df_allam = monte_carlo_experience_curve(E_allam, **learning_params_allam)

# Bottom Panel Parameters for IronAir
C0_ironair = (
    config["costs"][f"ironair_energy_{year}"]
    * config["costs"]["USD2023_to_EUR2023"]
    * config["costs"]["ironair_duration"]
    + config["costs"][f"ironair_capacity_{year}"]
    * config["costs"]["USD2023_to_EUR2023"]
)

learning_params_ironair = {
    "C0_mean": int(C0_ironair),  # EUR/kW
    "C0_std": 100,  # EUR/kW (own assumption)
    "E0_min": 28,  # half of announced capacity
    "E0_max": 56,  # announced capacity
    "LR": 0.15,
    "LR_error": 0.05,
    "num_simulations": 1000,
}

aggregate_df_ironair = monte_carlo_experience_curve(
    E_ironair, **learning_params_ironair
)

plot_combined_experience_curves(
    aggregate_df_allam=aggregate_df_allam,
    E_allam=E_allam,
    learning_params_allam=learning_params_allam,
    aggregate_df_ironair=aggregate_df_ironair,
    E_ironair=E_ironair,
    learning_params_ironair=learning_params_ironair,
    filename="../manuscript/images/figure_2.pdf",
    FONTSIZE=12,
)