In [None]:
import logging
from pathlib import Path

logging.basicConfig(filename=snakemake.log[0], encoding="utf-8", level=logging.INFO)
logger = logging.getLogger(__name__)

import hvplot.pandas
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pypsa
import seaborn as sns

sns.set_style("whitegrid")

In [None]:
fps = [Path(s) for s in snakemake.input["networks"]]

# Whether to disaggregate nuclear technology into
# [disaggregate_nuclear = False]
# nuclear heat + generator (partial capacity) and generator (excess capacity) + TES
# or [disaggregate_nuclear = False]
# all nuclear parts into the same category
# or [disaggregate_nuclear = "automatic"]
# determines if nuclear heat + generator + TES are part of the networks and if yes set's itself to true
disaggregate_nuclear = "automatic"

# Open all network files (fps) and extract cost information on technologies
dfs = list()
for fp in fps:
    logging.info(f"Reading: {fp} ...")
    n = pypsa.Network(str(fp))

    if disaggregate_nuclear == "automatic":
        if "nuclear heat source" in n.generators.index:
            disaggregate_nuclear = True
        else:
            disaggregate_nuclear = False

        logger.info(
            f"disaggregating nuclear cost (nuclear, TES+gen): {disaggregate_nuclear}"
        )

    # Calculate cost contribution per category based on capital and marginal cost
    data = [
        (
            n.generators["p_nom_opt"] * n.generators["capital_cost"]
            + n.generators["marginal_cost"] * n.generators_t["p"].sum()
        ),
        (
            n.stores["capital_cost"] * n.stores["e_nom_opt"]
            + n.stores["marginal_cost"] * n.stores_t["p"].where(lambda x: x > 0).sum()
        ),
        (
            n.links["capital_cost"] * n.links["p_nom_opt"]
            + n.links["marginal_cost"] * n.links_t["p0"].sum()
        ),
    ]

    if disaggregate_nuclear:
        # Disaggregated nuclear generator (baseload and extra capacity) for optional analysis
        data.append(
            pd.Series(
                n.links["capital_cost"]["nuclear generator"]
                * n.generators["p_nom_opt"]["nuclear heat source"],
                index=["nuclear generator baseload"],
            )
        )

        data.append(
            pd.Series(
                (
                    (
                        n.links["p_nom_opt"]["nuclear generator"]
                        - n.generators["p_nom_opt"]["nuclear heat source"]
                    )
                    * n.links["capital_cost"]["nuclear generator"]
                    + (n.links_t["p0"].sum() * n.links["marginal_cost"])[
                        "nuclear generator"
                    ]
                ),
                index=["nuclear generator extra"],
            )
        )

    # Convert to pandas series
    ds = pd.concat(data)

    # Cost relative to total generation
    ds /= n.loads_t["p"].sum().sum()

    # add scenario identifier
    ds["emission reduction constraint [%]"] = (
        1 - float(fp.parts[-2].replace("co2limit_", ""))
    ) * 100
    ds["total system cost"] = n.objective / n.loads_t["p"].sum().sum()

    ds.index.name = ""

    dfs.append(ds)

df = pd.DataFrame(dfs)
df = df.set_index("emission reduction constraint [%]").sort_index()
logging.info(f"All networks successfully read.")

In [None]:
## Aggregation to categories (+ custom category colours), sort order

sort_order = [
    "Natural gas",
    "Natural gas with CCS",
    "Solar",
    "Wind",
]

# Default aggregation dictionary for df.columns; adjusted case-wise
agg_dict = {
    "natural gas": "Natural gas",
    "natural gas with CCS": "Natural gas with CCS",
    "solar": "Solar",
    "wind": "Wind",
    "battery storage": "Battery storage",
    "battery charge": "Battery storage",
    "battery discharge": "Battery storage",
    "load shedding": "Unmet demand",
    "H2 electrolysis": "PGP",
    "H2 cavern storage": "PGP",
    "H2 fuel cell": "PGP",
}

if disaggregate_nuclear:
    df = df.drop(columns=["nuclear generator"])

    agg_dict.update(
        {
            "nuclear heat storage": "TES plus extra generator",
            "nuclear generator extra": "TES plus extra generator",
            "nuclear heat source": "Advanced nuclear",
            "nuclear generator baseload": "Advanced nuclear",
        }
    )

    # Check if all technologies are handled via agg_dict
    t = set(df.columns).difference(agg_dict).difference({"total system cost"})
    if len(t) > 0:
        print(f"Unhandled technology(ies) in aggregation: {t}")
        assert False

    # Construct custom sort order
    sort_order.append("Advanced nuclear")
    sort_order.append("TES plus extra generator")

else:

    df = df.drop(
        columns=["nuclear generator baseload", "nuclear generator extra"],
        errors="ignore",
    )
    agg_dict.update(
        {
            "nuclear heat storage": "Nuclear",
            "nuclear heat source": "Nuclear",
            "nuclear generator": "Nuclear",
            "nuclear": "Nuclear",
        }
    )

    # Check if all technologies are handled via agg_dict
    t = set(df.columns).difference(agg_dict).difference({"total system cost"})
    if len(t) > 0:
        print(f"Unhandled technology(ies) in aggregation: {t}")
        assert False

    # Construct custom sort order
    sort_order.append("Nuclear")


# Aggregate individual technologies into technology groups using constructed agg_dict
df = df.groupby(
    agg_dict,
    axis="columns",
).sum()

# Add Power-to-Gas-to-Power if in the scenario
if "PGP" in df.columns:
    sort_order.append("PGP")

# Unmet demand and battery are last in plotting order (cf. figure 2)
sort_order.append("Battery storage")
sort_order.append("Unmet demand")

# Apply the custom sort order as used by Duan et al
df = df[sort_order]

# Create list of colours in appropriate order
colour_candidates = {
    "Natural gas": "black",
    "Natural gas with CCS": "#808080",
    "Solar": "#f5deb3",
    "Wind": "#87ceeb",
    "Battery storage": "#f296e7",
    "Unmet demand": "#5f9ea0",
    "Advanced nuclear": "#ff6347",
    "TES plus extra generator": "#4b0082",
    "Nuclear": "#ff6347",
    "PGP": "#007f00",
}

duan_colors = [colour_candidates[k] for k in sort_order]

In [None]:
## Finally: Plotting

df.plot.area(color=duan_colors, figsize=(8, 5), lw=0)

plt.ylabel("System cost [USD/kWh]")
plt.title(
    f"{snakemake.wildcards['template']} /  "
    f"{snakemake.wildcards['technologydata']} scenario for "
    f"{snakemake.wildcards['country']} ({snakemake.wildcards['years']})"
)

# Select x-axis scaling methods
if snakemake.config["plotting"]["system_cost"]["x_axis_scaling"] == "duan_et_al":
    # Functions for custom scaling of x axis with matplotlib
    # The scaling is taken from Duan et al, see "Postprocess_fig_automake.py" script
    # and adjusted to match the reverse emission reduction scheme values
    def x_scale_forward(x):
        return 1 - 0.5 ** (np.log((100 - x) / 100) / np.log(0.2))

    def x_scale_inverse(a):
        return (100 * ((a - 1) ** (np.log(0.2) / np.log(0.5))) - 100) * (-1)

    # Custom scaling of xaxis to match Duan et al paper figures
    plt.xscale("function", functions=(x_scale_forward, x_scale_inverse))

    # Custom xtick locations to match Duan et al paper figures
    plt.xticks([0, 50, 80, 90, 96, 99, 100])
elif snakemake.config["plotting"]["system_cost"]["x_axis_scaling"] == "log":
    
    # Log scaling to emphasize on the 90-100% range
    # rather than on the 0-10% range
    # Use 0.9999 factor to avoid "0" values in np.log
    def x_scale_forward(x):
        return (-1)*np.log(100-x*0.9999) + 100
    def x_scale_inverse(a):
        return (np.exp((a-100)*(-1))-100)*(-1)/0.9999
    
    # Custom scaling of xaxis to match Duan et al paper figures
    plt.xscale("function", functions=(x_scale_forward, x_scale_inverse))
    
else:
    # Use automatic scaling function and automaticly determined xticks
    plt.xscale(snakemake.config["plotting"]["system_cost"]["x_axis_scaling"])

# Mirror xaxis by specifying limits in reverse
plt.xlim(0, 100)

plt.ylim(0, 0.1)

# Reverse legend sort order and place to right of plot
ax = plt.gca()
handles, labels = ax.get_legend_handles_labels()
ax.legend(handles[::-1], labels[::-1], bbox_to_anchor=(1, 0.5))

for fn in snakemake.output["figure"]:
    plt.savefig(fn, dpi=300, bbox_inches="tight")