In [49]:
from __future__ import annotations

from pathlib import Path
import numpy as np
import pandas as pd
import yaml


In [50]:

# ---------------------------
# I/O config
# ---------------------------
CONFIG_PATH = Path("naphta_config.yaml")

with CONFIG_PATH.open("r", encoding="utf-8") as f:
    CFG = yaml.safe_load(f)

YEAR_START = int(CFG["years"]["start"])
YEAR_END = int(CFG["years"]["end"])
YEARS = np.arange(YEAR_START, YEAR_END + 1)

INPUT_CSV = Path(CFG["input"]["csv_path"])
BASE_YEAR = int(CFG["input"]["base_year"])

OUT_BASE = Path(CFG["output"]["base_dir"])
SCENARIO_DIRS = {k: OUT_BASE / v for k, v in CFG["output"]["scenario_dirs"].items()}

CONV = CFG["conversion"]
E_PER_N = float(CONV["electricity_per_naphta"])
CH4_PER_N = float(CONV["ch4_per_naphta"])
H2_PER_N = float(CONV["h2_per_naphta"])

# Keep only real scenarios (avoid accidental keys like 'structural_decline')
SC_PARAMS = {
    k: v for k, v in CFG["scenarios"].items()
    if k in SCENARIO_DIRS
}


In [51]:
# ---------------------------
# Helpers (robust & simple)
# ---------------------------
def read_naphta_raw(path: Path) -> pd.DataFrame:
    df = pd.read_csv(path)

    # tolère un fichier sans header
    if not {"country", "demand_naphta"}.issubset(df.columns):
        df = pd.read_csv(path, header=None, names=["country", "demand_naphta"])

    df["country"] = df["country"].astype(str).str.strip().str.upper()
    df["demand_naphta"] = pd.to_numeric(df["demand_naphta"], errors="coerce")
    df = df.dropna(subset=["demand_naphta"])

    return df


def assert_non_negative(df: pd.DataFrame, cols: list[str]) -> None:
    cols = [c for c in cols if c in df.columns]
    if not cols:
        return
    if (df[cols] < -1e-9).any(axis=None):
        raise ValueError("Valeurs négatives détectées:\n" + str(df[cols].min()))


def check_coverage(df: pd.DataFrame, y0: int, y1: int) -> None:
    for c, g in df.groupby(["country", "scenario"]):
        ys = np.sort(g["year"].unique())
        expected_n = (y1 - y0 + 1)
        if ys[0] != y0 or ys[-1] != y1 or len(ys) != expected_n:
            raise ValueError(
                f"Couverture années KO pour {c}: {ys[0]}..{ys[-1]} n={len(ys)} (attendu {expected_n})."
            )


def yoy_factor_series(years: np.ndarray, yoy_blocks: list[dict]) -> np.ndarray:
    """
    Construit une trajectoire multiplicative F(t) avec F(base_year)=1,
    et pour chaque année t>base_year : F(t)=F(t-1)*f_yoy(t)
    """
    f_yoy = np.ones_like(years, dtype=float)
    for b in (yoy_blocks or []):
        y0 = int(b["from"])
        y1 = int(b["to"])
        fac = float(b["factor"])
        mask = (years >= y0) & (years <= y1)
        f_yoy[mask] = fac

    # cumprod à partir de BASE_YEAR (inclu)
    out = np.ones_like(years, dtype=float)
    for i in range(1, len(years)):
        out[i] = out[i - 1] * f_yoy[i]
    return out


def plateau_level_series(years: np.ndarray, plateaus: list[dict], key: str, default: float) -> np.ndarray:
    """
    Série en plateaux : pour chaque bloc {from,to, <key>:val}, on affecte val.
    """
    out = np.full_like(years, float(default), dtype=float)
    for b in (plateaus or []):
        y0 = int(b["from"])
        y1 = int(b["to"])
        val = float(b[key])
        out[(years >= y0) & (years <= y1)] = val
    return out

def structural_factor_series(years: np.ndarray, base_year: int, sd: dict | None) -> np.ndarray:
    """
    Retourne un facteur multiplicatif f(t) tel que :
    - f(base_year)=1
    - pour t >= sd["start"] : décroissance composée par sd["factor"] (ex: 0.99)
    - pour t < start : 1
    """
    if not sd:
        return np.ones_like(years, dtype=float)

    fac = float(sd.get("factor", 1.0))
    start = int(sd.get("start", base_year))

    if fac <= 0.0 or fac > 1.0:
        raise ValueError("structural_decline.factor doit être dans (0,1].")

    # Nombre d'années de décroissance appliquées
    n = np.clip(years - start, 0, None).astype(int)
    return np.power(fac, n)

def build_scenario(tech_scenario: str, base_2019_by_country: dict[str, float], p: dict) -> pd.DataFrame:
    yoy = yoy_factor_series(YEARS, p.get("yoy_factors", []))

    # plateau multiplicatif additionnel (intermediary)
    plateaus = p.get("demand_plateaus", [])
    plateau_level = plateau_level_series(YEARS, plateaus, key="level", default=1.0) if plateaus else np.ones_like(YEARS, float)

    demand_factor = yoy * plateau_level

    sd = p.get("structural_decline", None)
    struct_factor = structural_factor_series(YEARS, base_year=BASE_YEAR, sd=sd)

    # import share (import scenario uniquement)
    import_plateaus = p.get("import_plateaus", [])
    if import_plateaus:
        import_share = plateau_level_series(YEARS, import_plateaus, key="import_share", default=0.0)
        import_share = np.clip(import_share, 0.0, 1.0)
    else:
        import_share = np.zeros_like(YEARS, dtype=float)

    out = []
    for country, base0 in base_2019_by_country.items():
        naphta_total = base0 * demand_factor
        naphta_total = np.clip(naphta_total, 0.0, None)

        # import "de base" (import scenario) sinon 0
        naphta_imported_base = naphta_total * import_share
        naphta_imported_base = np.clip(naphta_imported_base, 0.0, naphta_total)

        naphta_domestic_base = naphta_total - naphta_imported_base
        naphta_domestic_base = np.clip(naphta_domestic_base, 0.0, naphta_total)


        # ---- structural decline (YAML) : appliqué au domestique, compensé par import ----
        if sd is not None:
            naphta_domestic = naphta_domestic_base * struct_factor
            naphta_domestic = np.clip(naphta_domestic, 0.0, naphta_total)

            # compensation import = total - domestic (comptabilité parfaite)
            naphta_imported = naphta_total - naphta_domestic
            naphta_imported = np.clip(naphta_imported, 0.0, naphta_total)
        else:
            naphta_domestic = naphta_domestic_base
            naphta_imported = naphta_imported_base
        # import_share effectif (incluant compensation structurelle)
        import_share_eff = np.zeros_like(naphta_total)

        mask = naphta_total > 0
        import_share_eff[mask] = naphta_imported[mask] / naphta_total[mask]
        # Conversion -> intrants (appliquée seulement au domestique)
        elec = naphta_domestic * E_PER_N
        ch4 = naphta_domestic * CH4_PER_N
        h2 = naphta_domestic * H2_PER_N

        df = pd.DataFrame({
            "country": country,
            "scenario": tech_scenario,
            "year": YEARS,

            "naphta_total": naphta_total,
            "naphta_domestic": naphta_domestic,
            "naphta_imported": naphta_imported,

            "electricity_demand_from_naphta": elec,
            "ch4_demand_from_naphta": ch4,
            "h2_demand_from_naphta": h2,

            # audit
            "demand_factor": demand_factor,
            "import_share": import_share_eff,
        })
        out.append(df)

    df_all = pd.concat(out, ignore_index=True)

    # contrôles
    assert_non_negative(df_all, [
        "naphta_total", "naphta_domestic", "naphta_imported",
        "electricity_demand_from_naphta", "ch4_demand_from_naphta", "h2_demand_from_naphta",
        "demand_factor", "import_share",
    ])

    err = (df_all["naphta_total"] - (df_all["naphta_domestic"] + df_all["naphta_imported"])).abs().max()
    if err > 1e-6:
        raise ValueError(f"Incohérence: total != domestic + import (max err={err}).")

    check_coverage(df_all, YEAR_START, YEAR_END)
    return df_all


def write_outputs(tech_scenario: str, df: pd.DataFrame) -> None:
    out_dir = SCENARIO_DIRS[tech_scenario]
    out_dir.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_dir / "demand_naphta.csv", index=False)



In [52]:

# ---------------------------
# Main
# ---------------------------
df_raw = read_naphta_raw(INPUT_CSV)

# ancre 2019 : uniquement consommation 2019, une ligne par country
# (pas de DE/GA ici, c'est voulu et cohérent avec votre instruction)
base = (
    df_raw.groupby("country", as_index=False)["demand_naphta"]
          .sum()
)

base_2019_by_country = dict(zip(base["country"], base["demand_naphta"]))

OUT_BASE.mkdir(parents=True, exist_ok=True)

all_outputs: dict[str, pd.DataFrame] = {}
for tech_scenario, params in SC_PARAMS.items():
    df_scn = build_scenario(tech_scenario, base_2019_by_country, params)
    write_outputs(tech_scenario, df_scn)
    all_outputs[tech_scenario] = df_scn

with (OUT_BASE / "config_effective.yaml").open("w", encoding="utf-8") as f:
    yaml.safe_dump(CFG, f, sort_keys=False, allow_unicode=True)

print("OK: scénarios naphta générés dans", OUT_BASE)

OK: scénarios naphta générés dans /Users/simonbrigode/Desktop/tp_pommes_kraft/data_country/industry/pommes-h2-network/data-pommes/scenarios-naphta
