In [None]:
import pandas as pd, numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.ticker import FormatStrFormatter
from solve_network import palette

In [None]:
if __name__ == "__main__":
    # Detect running outside of snakemake and mock snakemake for testing
    if "snakemake" not in globals():
        from _helpers import mock_snakemake

        snakemake = mock_snakemake(
            "plot_summary", year="2030", zone="IE", palette="p3", policy="cfe100"
        )

    config = snakemake.config
    scaling = int(config["time_sampling"][0])  # temporal scaling -- 3/1 for 3H/1H

    # Wildcards & Settings
    policy = snakemake.wildcards.policy[:3]
    penetration = float(snakemake.wildcards.policy[3:]) / 100 if policy != "ref" else 0
    tech_palette = snakemake.wildcards.palette
    zone = snakemake.wildcards.zone
    year = snakemake.wildcards.year

    datacenters = snakemake.config["ci"]["datacenters"]
    locations = list(datacenters.keys())
    names = list(datacenters.values())
    flexibilities = snakemake.config["ci"]["flexibility"]

    # techs for CFE hourly matching, extracted from palette
    palette_techs = palette(tech_palette)

    (
        clean_techs,
        storage_techs,
        storage_charge_techs,
        storage_discharge_techs,
    ) = palette_techs

    # renaming technologies for plotting
    clean_chargers = [tech.replace(" ", "_") for tech in storage_charge_techs]
    clean_dischargers = [tech.replace(" ", "_") for tech in storage_discharge_techs]

    def tech_names(base_names, year):
        return [f"{name.replace(' ', '_')}-{year}" for name in base_names]

    # expected technology names with year
    exp_generators = tech_names(["offwind-ac", "offwind-dc", "onwind", "solar"], year)
    exp_links = tech_names(["OCGT"], year)
    exp_chargers = tech_names(["battery charger", "H2 Electrolysis"], year)
    exp_dischargers = tech_names(["battery discharger", "H2 Fuel Cell"], year)

    # Assign colors
    tech_colors = snakemake.config["tech_colors"]

    # Rename mappings
    rename_ci_cost = pd.Series(
        {
            "onwind": "onshore wind",
            "solar": "solar",
            "grid": "grid imports",
            "revenue": "revenue",
            "battery_storage": "battery",
            "battery_inverter": "battery",
            "battery_discharger": "battery",
            "hydrogen_storage": "hydrogen storage",
            "hydrogen_electrolysis": "hydrogen storage",
            "hydrogen_fuel_cell": "hydrogen storage",
            "adv_geothermal": "advanced dispatchable",
            "allam_ccs": "NG-Allam",
        }
    )

    rename_ci_capacity = pd.Series(
        {
            "onwind": "onshore wind",
            "solar": "solar",
            "battery_discharger": "battery",
            "H2_Fuel_Cell": "hydrogen fuel cell",
            "H2_Electrolysis": "hydrogen electrolysis",
            "adv_geothermal": "advanced dispatchable",
            "allam_ccs": "NG-Allam",
        }
    )

    rename_scen = {
        "0": "0%",
        "5": "05%",
        "10": "10%",
        "15": "15%",
        "20": "20%",
        "25": "25%",
    }

    preferred_order = pd.Index(
        [
            "advanced dispatchable",
            "NG-Allam",
            "Gas OC",
            "offshore wind",
            "onshore wind",
            "solar",
            "battery",
            "hydrogen storage",
            "hydrogen electrolysis",
            "hydrogen fuel cell",
        ]
    )

## Data Preparation

In [None]:
df = pd.read_csv(snakemake.input.summary, index_col=0, header=[0, 1])

In [None]:
ldf = pd.concat(
    [
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_techs]].rename(
            {"ci_cap_" + t: t for t in clean_techs}
        ),
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_dischargers]].rename(
            {"ci_cap_" + t: t for t in clean_dischargers}
        ),
        df.loc[["ci_cap_" + t.replace(" ", "_") for t in clean_chargers]]
        .rename({"ci_cap_" + t: t for t in clean_chargers})
        .drop(["battery_charger"]),
    ]
)

# Drop rows with all values less than 0.1
to_drop = ldf.index[(ldf < 0.1).all(axis=1)]
ldf.drop(to_drop, inplace=True)

# Rename columns and indices
ldf.rename(columns=rename_scen, level=0, inplace=True)
ldf.rename(index=rename_ci_capacity, level=0, inplace=True)

# Reorder and sort the final DataFrame
new_index = preferred_order.intersection(ldf.index).append(
    ldf.index.difference(preferred_order)
)
ldf = ldf.loc[new_index].sort_index(
    axis="columns", level=[1, 0], ascending=[False, True]
)

In [None]:
ldf

In [None]:
E = ldf.xs("NG-Allam").xs("IE5 0", level=1)
type(E)
E

# Learning curve

In [None]:
def experience_curve(E, C0, E0, LR):
    """
    Calculate the investment cost c as a function of experience E.

    Parameters (all float):
    E: Invested capacity (updated experience) in MW.
    C0: The initial investment costs when experience E is E0 in EUR/kW.
    E0: Initial capacity (initial experience) in MW.
    LR: The learning rate, i.e., the percentage cost reduction for each doubling of cumulative experience.

    Returns (also float):
    alpha: The experience exponent
    c(E): The investment cost c for the given capacity (E + E0).
    """

    alpha = np.log2(1 / (1 - LR))

    c = C0 * ((E + E0) / E0) ** (-alpha)

    return round(alpha, 3), c

In [None]:
C0 = 2500  # EUR/kW
E0 = 500  # MW
LR = 0.2  # 20%

### (sketch) Results for CFE score = 100% and C&I consumers in Ireland 

In [None]:
E

In [None]:
experience_curve(E, C0=2500, E0=500, LR=0.2)

In [None]:
experience_curve(E, C0=2500, E0=1000, LR=0.2)

In [None]:
experience_curve(E, C0=2500, E0=1000, LR=0.1)

### (sketch) Results for CFE score = 100% and C&I consumers in Germany 

In [None]:
E_germany = E * 38410 / 2200
E_germany

In [None]:
experience_curve(E=E_germany, C0=2500, E0=500, LR=0.2)

In [None]:
experience_curve(E=E_germany, C0=2500, E0=1000, LR=0.2)

In [None]:
experience_curve(E=E_germany, C0=2500, E0=1000, LR=0.1)

In [None]:
# to get CCGT parity
experience_curve(E=2e4, C0=2500, E0=500, LR=0.2)

### Add experience exponent standard error


In [None]:
def experience_curve(E, C0, E0, LR, exp_error):
    """
    Calculate the investment cost c as a function of experience E.

    Parameters (all float):
    E: Invested capacity (updated experience) in MW.
    C0: The initial investment costs when experience E is E0 in EUR/kW.
    E0: Initial capacity (initial experience) in MW.
    LR: The learning rate, i.e., the percentage cost reduction for each doubling of cumulative experience.
    exp_error: The error in the experience exponent.

    Returns (also float):
    alpha: The experience exponent
    c(E): The investment cost c for the given capacity (E + E0).
    """

    alpha = np.log2(1 / (1 - LR))

    # Define the function to calculate costs, to avoid repetition
    def calculate_costs(alpha):
        return C0 * ((E + E0) / E0) ** (-alpha)

    # Calculate the central, lower, and upper costs
    c_ct = calculate_costs(alpha)
    c_lo = calculate_costs(alpha + exp_error)
    c_up = calculate_costs(alpha - exp_error)

    df = pd.DataFrame({"E": E, "C": c_ct, "C-upper": c_up, "C-lower": c_lo})

    # df.set_index('E', inplace=True)

    return df

In [None]:
experience_curve(E, C0=2500, E0=500, LR=0.2, exp_error=0.1).to_dict()

In [None]:
experience_curve(E, C0=2500, E0=500, LR=0.2, exp_error=0.01)

In [None]:
def plot_experience_curve(df):
    """
    Plots the experience curve with uncertainty range.
    """
    sns.set(style="darkgrid")

    plt.figure(figsize=(10, 6))
    plt.plot(df["E"], df["C"], label="Central Cost", color="blue")
    plt.fill_between(
        df["E"],
        df["C-lower"],
        df["C-upper"],
        color="gray",
        alpha=0.3,
        label="Uncertainty Range",
    )

    plt.ylim(bottom=0)
    plt.xlim(left=0)
    plt.xlabel("Experience (MW)")
    plt.ylabel("Cost (EUR/kW)")
    plt.title("Experience Curve")
    plt.legend()

    # plt.grid(True, alpha=0.3)

    plt.show()

In [None]:
E

In [None]:
experience_curve(E, C0=2500, E0=500, LR=0.2, exp_error=0.2)

In [None]:
plot_experience_curve(df=experience_curve(E, C0=2500, E0=200, LR=0.2, exp_error=0.2))

## Add monte carlo simulation

In [None]:
def monte_carlo_experience_curve(
    E, C0_mean, C0_std, E0_left, E0_right, LR, exp_error, num_simulations=100
):
    """
    Perform Monte Carlo simulation on the experience curve calculation.

    Parameters:
    - E: Invested capacity (updated experience) in MW.
    - C0_mean: The mean of the initial investment costs distribution.
    - C0_std: The standard deviation of the initial investment costs distribution.
    - E0: Initial capacity (initial experience) in MW.
    - LR: The learning rate.
    - exp_error: The error in the experience exponent.
    - num_simulations: Number of Monte Carlo simulations to run.

    Returns:
    - Aggregated results of the simulations.
    """

    C0_samples = np.random.normal(loc=C0_mean, scale=C0_std, size=num_simulations)
    E0_samples = np.random.uniform(low=E0_left, high=E0_right, size=num_simulations)

    all_results = []
    for i in range(num_simulations):
        df = experience_curve(E, C0_samples[i], E0_samples[i], LR, exp_error)
        df_melted = df.reset_index().melt(
            id_vars=["E"],
            value_vars=["C", "C-upper", "C-lower"],
            var_name="Scenario",
            value_name="Costs",
        )
        all_results.append(df_melted)

    aggregate_df = pd.concat(all_results)

    return aggregate_df


# df = monte_carlo_experience_curve(E, C0_mean=2500, C0_std=1, E0=200, LR=0.2, exp_error=0.2, num_simulations=100)
# df.loc[(df["Scenario"] == "C") & (df["E"] == 0)].Costs.mean()
# df

In [None]:
def plot_experience_curve_mc(aggregate_df, E, **kwargs):
    sns.set(style="darkgrid")

    # Create a figure with two subplots
    fig, axs = plt.subplots(
        1,
        2,
        figsize=(15, 6),
        gridspec_kw={"width_ratios": [2, 1], "wspace": 0.05},
        sharey=True,
    )

    # Unique 'E' values
    unique_E = aggregate_df["E"].unique()

    # Extract and plot data for the central cost scenario
    central_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C"].groupby("E")["Costs"].mean()
    )
    axs[0].plot(unique_E, central_costs, label="Central Cost", color="blue")

    # Extract and plot data for the lower and upper bounds
    lower_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-lower"].groupby("E")["Costs"].mean()
    )
    upper_costs = (
        aggregate_df[aggregate_df["Scenario"] == "C-upper"].groupby("E")["Costs"].mean()
    )
    axs[0].fill_between(
        unique_E,
        lower_costs,
        upper_costs,
        color="gray",
        alpha=0.3,
        label="Uncertainty Range",
    )

    # Filter the DataFrame for the maximum 'E' value
    max_E = aggregate_df["E"].max()
    filtered_df = aggregate_df[aggregate_df["E"] == max_E]

    axs[0].set_xlim(left=0)
    axs[0].set_ylim(bottom=0)
    axs[0].set_xlabel("Experience (MW)")
    axs[0].set_ylabel("Cost (EUR/kW)")
    axs[0].set_title("Experience Curve")
    axs[0].legend()

    sns.violinplot(
        x="Scenario", y="Costs", data=filtered_df, ax=axs[1], inner="quartile"
    )
    axs[1].set_title("Monte Carlo simulation")
    axs[1].set_xlabel("Cost scenario")
    axs[1].set_ylabel("")

    # Construct the params_text string from kwargs and include E with units
    params_text = "\n".join(
        [
            f"E (MW):    [{int(E.min())}, .., {int(E.max())}]",
            f"C0 (EUR/kW):     {kwargs.get('C0_mean', 'N/A'):>6}",
            f"C0_std (EUR/kW): {kwargs.get('C0_std', 'N/A'):>6}",
            f"E0_left (MW):    {kwargs.get('E0_left', 'N/A'):>6}",
            f"E0_right (MW):   {kwargs.get('E0_right', 'N/A'):>6}",
            f"LR (% points):   {kwargs.get('LR', 'N/A') * 100:>6.2f}",
            f"exp_error:       ±{kwargs.get('exp_error', 'N/A'):>5}",
            f"num_samples:     {kwargs.get('num_simulations', 'N/A'):>6}",
        ]
    )

    # Place the text box above the plots, adjusting the y-coordinate as needed
    plt.figtext(
        0.23,
        0.4,
        params_text,
        ha="center",
        va="top",
        fontsize=11,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.5", edgecolor="black", facecolor="white"),
    )

    plt.show()

In [None]:
learning_params = {
    "C0_mean": 2500,
    "C0_std": 100,
    "E0_left": 200,
    "E0_right": 400,
    "LR": 0.2,
    "exp_error": 0.15,
    "num_simulations": 1000,
}

plot_experience_curve_mc(
    aggregate_df=monte_carlo_experience_curve(E * 10, **learning_params),
    E=E * 10,
    **learning_params
)