# 00 – Tech config → PyPSA CSV bundle

This notebook is the **canonical input workflow** for techno‑economic assumptions.

You provide a YAML file (default: `inputs/tech_config_ammonia_plant.yaml`) with **overnight CAPEX**, **lifetimes**, **interest rates**, and **efficiencies** on an **HHV output basis**.

The notebook then:
1. converts overnight CAPEX into annualised PyPSA `capital_cost` using the annuity payment $\text{Annuity}(r,n)=\frac{r(1+r)^n}{(1+r)^n-1}$ plus fixed O&M fraction,
2. converts link CAPEX from **MW_out** to PyPSA’s **MW_in** basis,
3. writes the updated component tables into `basic_ammonia_plant/` (`generators.csv`, `links.csv`, `stores.csv`).

At runtime the solver reads only the CSV bundle; no YAML is applied by the model code.

In [None]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Literal, Tuple

import math

import pandas as pd
import yaml

# Repo paths (robust: walk upwards until we find the project root)
def find_repo_root(start: Path) -> Path:
    for candidate in [start, *start.parents]:
        if (candidate / "basic_ammonia_plant").exists() and (candidate / "model").exists():
            return candidate
    raise FileNotFoundError("Could not locate repo root (expected basic_ammonia_plant/ and model/)")

repo_root = find_repo_root(Path().resolve())
PLANT_DIR = repo_root / "basic_ammonia_plant"
DEFAULT_TECH_YAML = repo_root / "inputs" / "tech_config_ammonia_plant.yaml"

print("Repo root:", repo_root)
print("Plant CSV dir:", PLANT_DIR)
print("Default tech YAML:", DEFAULT_TECH_YAML)

In [None]:
# ----------------
# USER PARAMETERS
# ----------------
# Point this at your YAML file (same schema as inputs/tech_config_ammonia_plant.yaml).
TECH_YAML = DEFAULT_TECH_YAML

# Set to True to actually overwrite basic_ammonia_plant/*.csv
WRITE_OUTPUTS = True

In [None]:
TechType = Literal["generator", "link", "store"]

@dataclass(frozen=True)
class TechEntry:
    name: str
    component_type: TechType
    overnight_cost_per_mw: float | None = None
    overnight_cost_per_mwh: float | None = None
    lifetime_years: float = 20.0
    interest_rate: float = 0.07
    fixed_om_fraction: float = 0.0
    # For links: optional bus-based recipe per 1 unit of primary output (bus1).
    carriers_in: Dict[str, float] | None = None
    carriers_out: Dict[str, float] | None = None
    overall_efficiency: float | None = None


def annuity_factor(interest_rate: float, lifetime_years: float) -> float:
    if lifetime_years <= 0:
        raise ValueError("lifetime_years must be positive")
    if abs(interest_rate) < 1e-12:
        return 1.0 / lifetime_years
    factor = (1.0 + interest_rate) ** lifetime_years
    return interest_rate * factor / (factor - 1.0)


def annualised_capital_cost(overnight_cost: float, interest_rate: float, lifetime_years: float, fixed_om_fraction: float) -> float:
    crf = annuity_factor(interest_rate, lifetime_years)
    return float(overnight_cost) * (crf + float(fixed_om_fraction))


def link_cost_mw_out_to_mw_in(cost_per_mw_out: float, efficiency_bus0_to_bus1: float) -> float:
    # In PyPSA: p_bus1 = p_nom * efficiency (where p_nom is MW_in on bus0).
    # YAML link costs are quoted per MW_out (bus1). Convert to per MW_in by multiplying by efficiency.
    if efficiency_bus0_to_bus1 <= 0:
        raise ValueError("Link efficiency (bus0->bus1) must be positive")
    return float(cost_per_mw_out) * float(efficiency_bus0_to_bus1)


def _coerce_float_map(value: Any, field: str) -> Dict[str, float] | None:
    if value is None:
        return None
    if not isinstance(value, dict):
        raise ValueError(f"{field} must be a mapping")
    out: Dict[str, float] = {}
    for k, v in value.items():
        if v is None:
            continue
        out[str(k)] = float(v)
    return out or None


def load_tech_yaml(path: Path) -> Dict[str, TechEntry]:
    data = yaml.safe_load(path.read_text()) or {}
    techs = data.get("techs")
    if not isinstance(techs, dict):
        raise ValueError("YAML must have a top-level 'techs:' mapping")
    parsed: Dict[str, TechEntry] = {}
    for name, raw in techs.items():
        if not isinstance(raw, dict):
            raise ValueError(f"techs.{name} must be a mapping")
        component_type = raw.get("component_type")
        if component_type not in ("generator", "link", "store"):
            raise ValueError(f"techs.{name}.component_type must be one of generator/link/store")
        entry = TechEntry(
            name=name,
            component_type=component_type,
            overnight_cost_per_mw=raw.get("overnight_cost_per_mw"),
            overnight_cost_per_mwh=raw.get("overnight_cost_per_mwh"),
            lifetime_years=float(raw.get("lifetime_years", 20.0)),
            interest_rate=float(raw.get("interest_rate", 0.07)),
            fixed_om_fraction=float(raw.get("fixed_om_fraction", 0.0)),
            carriers_in=_coerce_float_map(raw.get("carriers_in"), f"techs.{name}.carriers_in"),
            carriers_out=_coerce_float_map(raw.get("carriers_out"), f"techs.{name}.carriers_out"),
            overall_efficiency=(None if raw.get("overall_efficiency") in (None, "") else float(raw.get("overall_efficiency"))),
        )
        if entry.component_type in ("generator", "link") and entry.overnight_cost_per_mw is None:
            raise ValueError(f"techs.{name} is {entry.component_type} but has no overnight_cost_per_mw")
        if entry.component_type == "store" and entry.overnight_cost_per_mwh is None:
            raise ValueError(f"techs.{name} is store but has no overnight_cost_per_mwh")
        parsed[name] = entry
    return parsed

In [None]:
def load_csv_bundle(plant_dir: Path) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    buses = pd.read_csv(plant_dir / "buses.csv").set_index("name", drop=False)
    gens = pd.read_csv(plant_dir / "generators.csv")
    links = pd.read_csv(plant_dir / "links.csv")
    stores = pd.read_csv(plant_dir / "stores.csv")
    for df_name, df in [("generators", gens), ("links", links), ("stores", stores)]:
        if "name" not in df.columns:
            raise ValueError(f"{df_name}.csv must have a 'name' column")
    gens = gens.set_index("name", drop=False)
    links = links.set_index("name", drop=False)
    stores = stores.set_index("name", drop=False)
    return buses, gens, links, stores


def _recipe_from_link_row(link_row: pd.Series) -> Tuple[Dict[str, float], Dict[str, float]]:
    """Return (carriers_in, carriers_out) per 1 unit of bus1 output, derived from link coefficients."""
    bus0 = str(link_row["bus0"])
    bus1 = str(link_row["bus1"])
    eff1 = float(link_row["efficiency"])
    if eff1 <= 0:
        raise ValueError(f"Link {link_row['name']} has non-positive bus1 efficiency; cannot normalise")

    # p0 required per 1 unit output on bus1
    p0_per_out = 1.0 / eff1
    carriers_in: Dict[str, float] = {bus0: p0_per_out}
    carriers_out: Dict[str, float] = {bus1: 1.0}

    # Optional bus2 handling
    bus2 = link_row.get("bus2")
    if bus2 is not None and str(bus2).strip() not in ("", "nan", "NaN"):
        bus2_name = str(bus2)
        eff2 = link_row.get("efficiency2")
        if eff2 not in (None, "") and not (isinstance(eff2, float) and math.isnan(eff2)):
            eff2 = float(eff2)
            flow_per_out = eff2 / eff1  # p2 per 1 unit bus1 output
            if flow_per_out >= 0:
                carriers_out[bus2_name] = flow_per_out
            else:
                carriers_in[bus2_name] = -flow_per_out

    return carriers_in, carriers_out


def _overall_efficiency(carriers_in: Dict[str, float], carriers_out: Dict[str, float]) -> float | None:
    total_in = sum(float(v) for v in carriers_in.values() if v is not None)
    total_out = sum(float(v) for v in carriers_out.values() if v is not None)
    if total_in <= 0:
        return None
    return total_out / total_in


def _warn_if_recipe_mismatch(
    name: str,
    yaml_in: Dict[str, float] | None,
    yaml_out: Dict[str, float] | None,
    derived_in: Dict[str, float],
    derived_out: Dict[str, float],
    yaml_eff: float | None,
    tolerance: float = 1e-3,
    eff_tolerance: float = 2e-2,
 ) -> None:
    if not yaml_in or not yaml_out:
        return
    # Compare only keys that overlap; warn on missing/extra.
    yaml_in_keys = set(yaml_in.keys())
    yaml_out_keys = set(yaml_out.keys())
    derived_in_keys = set(derived_in.keys())
    derived_out_keys = set(derived_out.keys())

    extra = (yaml_in_keys | yaml_out_keys) - (derived_in_keys | derived_out_keys)
    missing = (derived_in_keys | derived_out_keys) - (yaml_in_keys | yaml_out_keys)
    if extra:
        print(f"WARNING: {name} YAML recipe has keys not on link ports: {sorted(extra)}")
    if missing:
        print(f"WARNING: {name} YAML recipe missing ports present on link: {sorted(missing)}")

    def _cmp(label: str, left: Dict[str, float], right: Dict[str, float]):
        for k in sorted(set(left.keys()) & set(right.keys())):
            lv = float(left[k])
            rv = float(right[k])
            if rv == 0 and lv == 0:
                continue
            denom = max(1.0, abs(rv))
            if abs(lv - rv) / denom > tolerance:
                print(f"WARNING: {name} {label} mismatch for '{k}': yaml={lv:.6g} derived={rv:.6g}")

    _cmp("carriers_in", yaml_in, derived_in)
    _cmp("carriers_out", yaml_out, derived_out)

    if yaml_eff is not None:
        derived_eff = _overall_efficiency(derived_in, derived_out)
        if derived_eff is not None and abs(float(yaml_eff) - derived_eff) > eff_tolerance:
            print(f"WARNING: {name} overall_efficiency differs: yaml={float(yaml_eff):.4f} derived={derived_eff:.4f}")


def apply_tech_entries_to_bundle(
    techs: Dict[str, TechEntry],
    buses: pd.DataFrame,
    generators: pd.DataFrame,
    links: pd.DataFrame,
    stores: pd.DataFrame,
 ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Apply YAML tech entries to the CSV tables; return (gens, links, stores, summary_df)."""
    rows = []
    for name, entry in techs.items():
        if entry.component_type == "generator":
            if name not in generators.index:
                raise ValueError(f"YAML tech '{name}' is generator but not present in generators.csv")
            annual_out = annualised_capital_cost(
                entry.overnight_cost_per_mw, entry.interest_rate, entry.lifetime_years, entry.fixed_om_fraction
            )
            generators.loc[name, "capital_cost"] = annual_out
            rows.append({"tech": name, "component": "generator", "annual_capital_cost_usd_per_mw": annual_out})

        elif entry.component_type == "store":
            if name not in stores.index:
                raise ValueError(f"YAML tech '{name}' is store but not present in stores.csv")
            annual = annualised_capital_cost(
                entry.overnight_cost_per_mwh, entry.interest_rate, entry.lifetime_years, entry.fixed_om_fraction
            )
            stores.loc[name, "capital_cost"] = annual
            rows.append({"tech": name, "component": "store", "annual_capital_cost_usd_per_mwh": annual})

        elif entry.component_type == "link":
            if name not in links.index:
                raise ValueError(f"YAML tech '{name}' is link but not present in links.csv")

            link_row = links.loc[name]
            bus0 = str(link_row["bus0"])
            bus1 = str(link_row["bus1"])
            bus2 = link_row.get("bus2")
            bus2_present = bus2 is not None and str(bus2).strip() not in ("", "nan", "NaN")

            # If YAML provides a recipe, derive link coefficients from it (supports bus2).
            if entry.carriers_in is not None and entry.carriers_out is not None:
                if bus1 not in entry.carriers_out:
                    raise ValueError(
                        f"techs.{name}.carriers_out must include primary output bus '{bus1}' (value typically 1.0)"
                    )
                if bus0 not in entry.carriers_in:
                    raise ValueError(
                        f"techs.{name}.carriers_in must include bus0 '{bus0}' (power/input basis)"
                    )
                power_in = float(entry.carriers_in[bus0])
                if power_in <= 0:
                    raise ValueError(f"techs.{name}.carriers_in.{bus0} must be > 0")
                primary_out = float(entry.carriers_out[bus1])
                if primary_out <= 0:
                    raise ValueError(f"techs.{name}.carriers_out.{bus1} must be > 0")

                eff1 = primary_out / power_in
                links.loc[name, "efficiency"] = eff1

                if bus2_present:
                    bus2_name = str(bus2)
                    if bus2_name in (entry.carriers_in or {}):
                        amt = float(entry.carriers_in[bus2_name])
                        eff2 = -amt / power_in
                        links.loc[name, "efficiency2"] = eff2
                        links.loc[name, "bus2"] = bus2_name
                    elif bus2_name in (entry.carriers_out or {}):
                        amt = float(entry.carriers_out[bus2_name])
                        eff2 = amt / power_in
                        links.loc[name, "efficiency2"] = eff2
                        links.loc[name, "bus2"] = bus2_name
                    else:
                        raise ValueError(
                            f"techs.{name} link has bus2='{bus2_name}' but YAML recipe omits it from carriers_in/out"
                        )

                # Compare YAML recipe against what the coefficients imply (sanity check)
                derived_in, derived_out = _recipe_from_link_row(links.loc[name])
                _warn_if_recipe_mismatch(
                    name,
                    entry.carriers_in,
                    entry.carriers_out,
                    derived_in,
                    derived_out,
                    entry.overall_efficiency,
                )
                eff_ref = _overall_efficiency(derived_in, derived_out)
            else:
                # No recipe: keep existing efficiencies, but require recipes for multi-input links.
                if bus2_present:
                    raise ValueError(
                        f"techs.{name} is a multi-input/output link (has bus2) so YAML must provide carriers_in/carriers_out"
                    )
                eff_ref = None

            annual_out = annualised_capital_cost(
                entry.overnight_cost_per_mw, entry.interest_rate, entry.lifetime_years, entry.fixed_om_fraction
            )
            # YAML link costs are output-basis (MW_out on bus1); convert to PyPSA input-basis (MW_in on bus0).
            annual_in = link_cost_mw_out_to_mw_in(annual_out, float(links.loc[name, "efficiency"]))

            links.loc[name, "capital_cost"] = annual_in
            rows.append(
                {
                    "tech": name,
                    "component": "link",
                    "efficiency_bus0_to_bus1": float(links.loc[name, "efficiency"]),
                    "efficiency2": (None if not bus2_present else float(links.loc[name, "efficiency2"])) ,
                    "annual_cost_usd_per_mw_out": annual_out,
                    "annual_cost_usd_per_mw_in": annual_in,
                    "overall_efficiency_derived": eff_ref,
                }
            )
        else:
            raise ValueError(f"Unsupported component_type: {entry.component_type}")

    summary = pd.DataFrame(rows)
    return generators, links, stores, summary


# Load YAML + bundle, apply, and optionally write
if not TECH_YAML.exists():
    raise FileNotFoundError(f"Tech YAML not found: {TECH_YAML}")
if not PLANT_DIR.exists():
    raise FileNotFoundError(f"Plant directory not found: {PLANT_DIR}")

techs = load_tech_yaml(TECH_YAML)
buses_df, gens_df, links_df, stores_df = load_csv_bundle(PLANT_DIR)

updated_gens, updated_links, updated_stores, summary_df = apply_tech_entries_to_bundle(
    techs, buses_df, gens_df.copy(), links_df.copy(), stores_df.copy()
 )

# Write outputs
if WRITE_OUTPUTS:
    (PLANT_DIR / "generators.csv").write_text(updated_gens.reset_index(drop=True).to_csv(index=False))
    (PLANT_DIR / "links.csv").write_text(updated_links.reset_index(drop=True).to_csv(index=False))
    (PLANT_DIR / "stores.csv").write_text(updated_stores.reset_index(drop=True).to_csv(index=False))
    print("Wrote updated CSV tables to:", PLANT_DIR)
else:
    print("WRITE_OUTPUTS=False: not writing any files")

# Confirm by reloading and comparing key columns for configured techs
re_buses, re_gens, re_links, re_stores = load_csv_bundle(PLANT_DIR)

checks = []
for name, entry in techs.items():
    if entry.component_type == "generator":
        checks.append((name, "generator", float(re_gens.loc[name, "capital_cost"])))
    elif entry.component_type == "store":
        checks.append((name, "store", float(re_stores.loc[name, "capital_cost"])))
    elif entry.component_type == "link":
        checks.append((name, "link", float(re_links.loc[name, "capital_cost"])))

check_df = pd.DataFrame(checks, columns=["tech", "component", "capital_cost_written"])
display(summary_df)
display(check_df.sort_values(["component", "tech"]))