In [1]:
# =========================
# 0) Imports
# =========================
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time

try:
    import gurobipy as gp
    from gurobipy import GRB
except Exception as e:
    raise RuntimeError(
        "This notebook cell requires gurobipy + a working Gurobi license.\n"
        f"Import error: {repr(e)}"
    )

# -------------------------
# Helpers
# -------------------------
def pw_factor(r: float, H: int) -> float:
    """Present-worth factor P/A(r,H)."""
    if H <= 0:
        return 0.0
    if abs(r) < 1e-12:
        return float(H)
    return (1.0 - (1.0 + r) ** (-H)) / r

def as_2d(a):
    a = np.asarray(a)
    if a.ndim == 1:
        return a[None, :]
    return a

# -------------------------
# Cashflow helpers (construction delays, drift, CAPEX schedules)
# -------------------------
def pw_factor_shifted(r: float, H: int, start_delay_years: int = 0) -> float:
    """Present-worth factor for an annual cashflow received at end of each year,
    starting after `start_delay_years` full years (i.e., first cashflow at end of year start_delay_years+1)."""
    start_delay_years = int(max(0, start_delay_years))
    if H <= start_delay_years:
        return 0.0
    return (1.0 + r) ** (-start_delay_years) * pw_factor(r, H - start_delay_years)

def pw_factor_growth_shifted(r: float, g: float, H: int, start_delay_years: int = 0) -> float:
    """Present-worth factor for an annual cashflow received at end of each year that grows by (1+g) per year.
    Growth applies to the *cashflow level* in year k relative to year 0 (k=0..H-1).
    Cashflows start after `start_delay_years` full years (first at end of year start_delay_years+1)."""
    start_delay_years = int(max(0, start_delay_years))
    if H <= start_delay_years:
        return 0.0

    # PV = sum_{k=start_delay..H-1} (1+g)^k / (1+r)^{k+1} = (1/(1+r)) * sum_{k=start_delay..H-1} q^k
    if abs(r) < 1e-12:
        # r ~ 0: PV = sum_{k=start_delay..H-1} (1+g)^k
        if abs(g) < 1e-12:
            return float(H - start_delay_years)
        return ((1.0 + g) ** start_delay_years) * ((1.0 + g) ** (H - start_delay_years) - 1.0) / g

    q = (1.0 + g) / (1.0 + r)
    if abs(q - 1.0) < 1e-12:
        # q ~ 1 => each term is ~1/(1+r); start_delay adds a q^start_delay factor ~1
        return (H - start_delay_years) / (1.0 + r)

    geom = (q ** start_delay_years) * (1.0 - q ** (H - start_delay_years)) / (1.0 - q)
    return geom / (1.0 + r)

def build_capex_multiplier_series(
    H: int,
    lifetime_years: int | None,
    construction_years: int = 0,
    capex_payment_profile: list[float] | None = None,
    replacement_required: bool = True,
    capex_escalation_annual: float = 0.0,
) -> np.ndarray:
    """Return an array mult[y] such that PV(CAPEX) = CAPEX_base * sum_y mult[y] / (1+r)^y.

    - CAPEX_base is the (discounted/grant-adjusted) CAPEX associated with the chosen design (E,Pch,Pdis).
    - construction_years spreads each build over multiple years (payments at the *start* of each year y).
    - lifetime_years triggers reinvestment every `lifetime_years` (if replacement_required=True).
    - capex_escalation_annual scales replacement CAPEX at install year y0 by (1+capex_escalation_annual)^y0.
    """
    H = int(H)
    if H <= 0:
        return np.zeros(0, dtype=float)

    construction_years = int(max(0, construction_years))
    if construction_years == 0:
        construction_years = 1

    if capex_payment_profile is None:
        capex_payment_profile = [1.0 / construction_years] * construction_years
    if len(capex_payment_profile) != construction_years:
        raise ValueError("capex_payment_profile length must equal construction_years (after zero->one adjustment).")
    if abs(sum(capex_payment_profile) - 1.0) > 1e-6:
        # Normalise safely
        s = float(sum(capex_payment_profile))
        capex_payment_profile = [float(x) / s for x in capex_payment_profile]

    if lifetime_years is None or lifetime_years <= 0 or (not replacement_required):
        install_years = [0]
    else:
        L = int(lifetime_years)
        install_years = list(range(0, H, L))

    mult = np.zeros(H, dtype=float)
    for y0 in install_years:
        growth = (1.0 + capex_escalation_annual) ** y0
        for j, frac in enumerate(capex_payment_profile):
            y = y0 + j
            if y >= H:
                break
            mult[y] += float(frac) * growth
    return mult


In [2]:
def build_hourly_profile_from_month_hour_csv(
    csv_path: str = "gemini_avg_power_month_hour_2017_2019_long.csv",
    month_col: str = "month",
    hour_col: str = "hour",
    value_col: str = "avg_power",
    base_year: int = 2021,     # non-leap year => 8760 hours
    tz: str = "UTC",
) -> np.ndarray:
    """
    Converts a (month, hour)->value table (288 rows) into a chronological 8760-hour profile.

    Returns
    -------
    profile : np.ndarray shape (8760,)
        Hourly values for a typical year.
    """
    df = pd.read_csv(csv_path)

    required = {month_col, hour_col, value_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns in {csv_path}: {missing}. Found: {list(df.columns)}")

    df = df[[month_col, hour_col, value_col]].copy()
    df[month_col] = df[month_col].astype(int)
    df[hour_col] = df[hour_col].astype(int)
    df[value_col] = df[value_col].astype(float)

    # Ensure full 12x24 coverage
    all_pairs = pd.MultiIndex.from_product(
        [range(1, 13), range(0, 24)],
        names=[month_col, hour_col]
    )
    df = df.set_index([month_col, hour_col]).reindex(all_pairs)

    if df[value_col].isna().any():
        missing_pairs = df[df[value_col].isna()].index.tolist()[:10]
        raise ValueError(
            f"Incomplete month-hour table in {csv_path}. Missing examples: {missing_pairs}. "
            "Need all 12*24=288 combinations."
        )

    # Create hourly timestamps for a non-leap year
    idx = pd.date_range(
        start=f"{base_year}-01-01",
        end=f"{base_year+1}-01-01",
        freq="h",
        inclusive="left",
        tz=tz
    )
    if len(idx) != 8760:
        raise RuntimeError(f"Expected 8760 hours, got {len(idx)} for base_year={base_year}.")

    tmp = pd.DataFrame({"ts": idx})
    tmp["month"] = tmp["ts"].dt.month
    tmp["hour"] = tmp["ts"].dt.hour

    lut = df.reset_index().rename(columns={value_col: "val"})
    tmp = tmp.merge(lut, on=["month", "hour"], how="left")

    if tmp["val"].isna().any():
        raise RuntimeError("Mapping month-hour -> hourly series failed (NaNs after merge).")

    profile = tmp["val"].to_numpy(dtype=float)
    profile = np.clip(profile, 0.0, None)
    return profile

In [3]:
# =========================
# 1) Price scenarios + surplus
#
# Goal:
# - Keep the optimization model at "one-year resolution" (T=8760) but allow long horizons (H_years up to 100)
# - Provide: (i) scenario year-profiles (best/normal/bad) and (ii) a drift parameter for long-run price level change
# - Provide optional explicit multi-year price paths for reporting/plotting (not used directly in the MILP)
# =========================
import numpy as np
import pandas as pd
from pathlib import Path

dt = 1.0
hours_per_year = 8760

# -------------------------
# Paths (work both in notebook folder and in /mnt/data)
# -------------------------
def _resolve_path(p: str) -> str:
    pth = Path(p)
    if pth.exists():
        return str(pth)
    alt = Path("/mnt/data") / p
    if alt.exists():
        return str(alt)
    return str(pth)  # let pandas raise a clear error

price_path = _resolve_path("day_ahead_prices_2020_2025_combined.csv")
ts_col = "timestamp"
price_col = "price"

# Gemini profile (month, hour) -> 8760 hourly profile
gemini_power_path = _resolve_path(r"C:\Users\User\OneDrive\Univerity of Twente\IEM\Module 6\CM&E\Project\CM-E-Project\gemini_avg_power_month_hour_2017_2019_long.csv")

# -------------------------
# Test mode (optional)
# -------------------------
TEST_MODE = False          # True => use a short window
TEST_START_UTC = "2022-01-03 00:00:00"  # inclusive, UTC
TEST_DAYS = 7              # 7 days = 168 hours
TEST_TILE_TO_YEAR = False  # True => repeat the short window to 8760 hours

# -------------------------
# Scenario mode
# -------------------------
# "historical_years"   => each full calendar year in the dataset becomes a scenario
# "synthetic_3cases"   => build best/normal/bad year-profiles + estimate drift for long horizons
SCENARIO_MODE = "synthetic_3cases"

# Manual override if you want (example: -0.03 for -3%/yr)
PRICE_DRIFT_ANNUAL_OVERRIDE = None

# Volatility scaling inside each scenario profile (keeps seasonality, changes sd)
BEST_VOL_MULT = 0.85
NORMAL_VOL_MULT = 1.00
BAD_VOL_MULT = 1.35

# Optional level shifts (€/MWh) applied after volatility scaling
BEST_LEVEL_SHIFT = 0.0
NORMAL_LEVEL_SHIFT = 0.0
BAD_LEVEL_SHIFT = 0.0

# Scenario probabilities (must sum to 1)
SCENARIO_PROB = {"BEST": 1/3, "NORMAL": 1/3, "BAD": 1/3}

# -------------------------
# Load and clean prices (hourly grid, interpolate gaps, drop Feb-29)
# -------------------------
dfp = pd.read_csv(r"C:\Users\User\OneDrive\Univerity of Twente\IEM\Module 6\CM&E\Project\CM-E-Project\day_ahead_prices_2020_2025_combined.csv")
dfp[ts_col] = pd.to_datetime(dfp[ts_col], utc=True, errors="coerce")
dfp = dfp.dropna(subset=[ts_col, price_col]).sort_values(ts_col).set_index(ts_col)

# If testing, cut down early (faster)
if TEST_MODE:
    start_ts = pd.Timestamp(TEST_START_UTC, tz="UTC")
    end_ts = start_ts + pd.Timedelta(days=TEST_DAYS)
    dfp = dfp[(dfp.index >= start_ts) & (dfp.index < end_ts)]

price_series = (
    dfp[price_col].astype(float)
    .resample("h").mean()
    .interpolate("time")
)

# Drop Feb 29 so any "calendar year" is exactly 8760 hours
is_feb29 = (price_series.index.month == 2) & (price_series.index.day == 29)
price_series = price_series[~is_feb29]

# -------------------------
# Build scenario year-profiles
# -------------------------
def _extract_full_year_arrays(ps: pd.Series) -> dict[int, np.ndarray]:
    years = sorted(ps.index.year.unique())
    out = {}
    for y in years:
        ys = ps[ps.index.year == y]
        if len(ys) == hours_per_year:
            out[int(y)] = ys.to_numpy(dtype=float)
    return out

def _robust_drift_estimate(year_means: pd.Series) -> float:
    '''
    Estimate annual drift as a geometric growth rate from annual average prices.
    Uses a robust outlier filter (MAD) so a single spike-year (e.g., 2022) doesn't dominate.
    Returns g such that expected level scales by (1+g)^year.
    '''
    if len(year_means) < 2:
        return 0.0

    med = float(year_means.median())
    mad = float(np.median(np.abs(year_means.values - med)))
    if mad <= 1e-12:
        keep = year_means
    else:
        sigma = 1.4826 * mad
        keep = year_means[np.abs(year_means - med) <= 3.0 * sigma]
        if len(keep) < 2:
            keep = year_means

    x = (keep.index.to_numpy(dtype=float) - float(keep.index.min()))
    y = np.log(np.clip(keep.to_numpy(dtype=float), 1e-6, None))
    slope = float(np.polyfit(x, y, 1)[0])
    return float(np.exp(slope) - 1.0)

def _vol_level_transform(p: np.ndarray, vol_mult: float, level_shift: float) -> np.ndarray:
    mu = float(np.mean(p))
    return (mu + vol_mult * (p - mu) + level_shift).astype(float)

full_years = _extract_full_year_arrays(price_series)

if TEST_MODE and not TEST_TILE_TO_YEAR:
    # One scenario = the week (T=168)
    prices = price_series.to_numpy(dtype=float)[None, :]
    kept_years = [f"TEST_WEEK_FROM_{TEST_START_UTC}"]
    scenario_names = kept_years
    price_drift_annual = 0.0
else:
    if TEST_MODE and TEST_TILE_TO_YEAR:
        # Repeat test window to 8760 (index no longer needed for optimization)
        reps = hours_per_year // len(price_series)
        rem = hours_per_year - reps * len(price_series)
        tiled = np.concatenate([
            np.tile(price_series.to_numpy(float), reps),
            price_series.to_numpy(float)[:rem],
        ])
        prices = tiled[None, :]
        kept_years = [f"TEST_TILED_FROM_{TEST_START_UTC}"]
        scenario_names = kept_years
        price_drift_annual = 0.0

    else:
        if len(full_years) == 0:
            raise ValueError("No full 8760-hour years found in the price dataset after cleaning.")

        year_means = pd.Series({y: float(np.mean(arr)) for y, arr in full_years.items()}).sort_index()

        # Identify best and bad years from available full years
        bad_year = 2022 if 2022 in full_years else int(year_means.idxmax())
        best_year = int(year_means.idxmin())

        if SCENARIO_MODE == "historical_years":
            kept_years = sorted(full_years.keys())
            prices = np.vstack([full_years[y] for y in kept_years])
            scenario_names = [str(y) for y in kept_years]

        elif SCENARIO_MODE == "synthetic_3cases":
            # Build NORMAL from a mean seasonal profile (hour-of-year) + a block-bootstrap residual.
            years_for_normal = [y for y in full_years.keys() if y != bad_year] or list(full_years.keys())
            mat = np.vstack([full_years[y] for y in years_for_normal])  # (Ny,8760)
            seasonal_mean = mat.mean(axis=0)
            resid = mat - seasonal_mean  # (Ny,8760)

            rng = np.random.default_rng(42)
            block = 24  # daily blocks
            n_blocks = hours_per_year // block  # 365
            resid_blocks = resid.reshape(resid.shape[0], n_blocks, block)
            pick_y = rng.integers(0, resid.shape[0], size=n_blocks)
            pick_b = rng.integers(0, n_blocks, size=n_blocks)
            boot = resid_blocks[pick_y, pick_b].reshape(-1)

            normal_raw = seasonal_mean + boot
            best_raw = full_years[best_year]
            bad_raw = full_years[bad_year]

            best = _vol_level_transform(best_raw, BEST_VOL_MULT, BEST_LEVEL_SHIFT)
            normal = _vol_level_transform(normal_raw, NORMAL_VOL_MULT, NORMAL_LEVEL_SHIFT)
            bad = _vol_level_transform(bad_raw, BAD_VOL_MULT, BAD_LEVEL_SHIFT)

            prices = np.vstack([best, normal, bad])
            kept_years = [f"BEST_{best_year}", "NORMAL_SYNTH", f"BAD_{bad_year}"]
            scenario_names = kept_years
        else:
            raise ValueError(f"Unknown SCENARIO_MODE={SCENARIO_MODE}")

        # Drift estimate (unless overridden)
        drift_est = _robust_drift_estimate(year_means)
        price_drift_annual = float(drift_est if PRICE_DRIFT_ANNUAL_OVERRIDE is None else PRICE_DRIFT_ANNUAL_OVERRIDE)

# Scenario probabilities vector aligned with 'scenario_names'
if (not TEST_MODE) and (SCENARIO_MODE == "synthetic_3cases"):
    scenario_prob = np.array([SCENARIO_PROB["BEST"], SCENARIO_PROB["NORMAL"], SCENARIO_PROB["BAD"]], dtype=float)
    scenario_prob = scenario_prob / scenario_prob.sum()
else:
    S_tmp = int(np.asarray(prices).shape[0])
    scenario_prob = np.ones(S_tmp, dtype=float) / S_tmp

prices = np.asarray(prices, dtype=float)
S, T = prices.shape
year_scale = float(hours_per_year / (T * dt))  # =1.0 for T=8760; >1.0 for test windows

print(f"Scenario mode: {SCENARIO_MODE} | scenarios={scenario_names}")
print(f"prices shape: (S,T)=({S},{T}), year_scale={year_scale:.6g}")
print(f"price stats: min={prices.min():.2f}, mean={prices.mean():.2f}, max={prices.max():.2f}")
print(f"Estimated price drift (annual): {price_drift_annual:+.4%}  (override via PRICE_DRIFT_ANNUAL_OVERRIDE)")

# -------------------------
# Surplus (choose proxy from low prices OR Gemini profile)
# -------------------------
def make_surplus_proxy_from_prices(prices_ST: np.ndarray, surplus_max_mw: float = 50.0, low_quantile: float = 0.20) -> np.ndarray:
    thresh = np.quantile(prices_ST, low_quantile, axis=1, keepdims=True)
    return np.where(prices_ST <= thresh, surplus_max_mw, 0.0).astype(float)

# Build Gemini 8760-hour profile (avg power by month-hour)
gemini_profile = build_hourly_profile_from_month_hour_csv(
    gemini_power_path,
    month_col="month",
    hour_col="hour",
    value_col="avg_power",
    base_year=2021,
    tz="UTC",
)
gemini_profile = np.asarray(gemini_profile, dtype=float).ravel()

if len(gemini_profile) != hours_per_year:
    raise ValueError(f"Gemini profile length {len(gemini_profile)} must be {hours_per_year} after build.")

if T == hours_per_year:
    gemini_aligned = gemini_profile
else:
    gemini_aligned = gemini_profile[:T]

curtailment_fraction = 1.0  # set e.g. 0.3 if only 30% is realistically available
surplus_gemini_1y = np.clip(curtailment_fraction * gemini_aligned, 0.0, None)

# Per-scenario surplus (optionally vary year-to-year)
rng = np.random.default_rng(42)
year_sigma = 0.10  # 10% multiplicative variation across scenarios
scale_factors = rng.normal(1.0, year_sigma, size=(S, 1))

surplus_gemini = np.clip(np.tile(surplus_gemini_1y[None, :], (S, 1)) * scale_factors, 0.0, None)
surplus_proxy = make_surplus_proxy_from_prices(prices, surplus_max_mw=50.0, low_quantile=0.20)

SURPLUS_MODE = "gemini"  # "gemini" or "proxy"
surplus = surplus_gemini if SURPLUS_MODE.lower() == "gemini" else surplus_proxy

print(f"Surplus mode: {SURPLUS_MODE} | surplus shape: {surplus.shape}")
print(f"surplus stats: min={surplus.min():.2f}, mean={surplus.mean():.2f}, max={surplus.max():.2f}")

# -------------------------
# Optional helper: explicit multi-year hourly price paths (for reporting/plots).
# Not used directly by the MILP (keeps the optimization size manageable for H_years up to 100).
# -------------------------
def generate_multi_year_price_cube(
    base_prices_ST: np.ndarray,
    H_years: int,
    price_drift_annual: float = 0.0,
    vol_growth_annual: float = 0.0,
) -> np.ndarray:
    '''
    Returns price_cube[S,H,T]. Year y applies:
      level_y = (1+price_drift_annual)^y
      vol_y   = (1+vol_growth_annual)^y
      p_y,t = level_y * (mu + vol_y*(p0_t - mu))
    where mu is the within-year mean of p0 in each scenario.
    '''
    base_prices_ST = np.asarray(base_prices_ST, dtype=float)
    S0, T0 = base_prices_ST.shape
    H_years = int(H_years)
    mu = base_prices_ST.mean(axis=1, keepdims=True)  # (S,1)
    centered = base_prices_ST - mu
    cube = np.zeros((S0, H_years, T0), dtype=float)
    for y in range(H_years):
        level = (1.0 + price_drift_annual) ** y
        vol = (1.0 + vol_growth_annual) ** y
        cube[:, y, :] = level * (mu + vol * centered)
    return cube


Scenario mode: synthetic_3cases | scenarios=['BEST_2020', 'NORMAL_SYNTH', 'BAD_2022']
prices shape: (S,T)=(3,8760), year_scale=1
price stats: min=-384.93, mean=118.27, max=1091.10
Estimated price drift (annual): +18.2091%  (override via PRICE_DRIFT_ANNUAL_OVERRIDE)
Surplus mode: gemini | surplus shape: (3, 8760)
surplus stats: min=266.16, mean=417.05, max=558.43


In [4]:
# =========================
# 2) Technology data (PLACEHOLDERS: fill with your estimates)
#    Subsidy structures reflect what you already collected.
# =========================

# Convert doc values:
# Hydropower subsidy example: 0.1693 €/kWh in 2024 => 169.3 €/MWh (15 years)
hydro_sub_eur_per_MWh = 0.1693 * 1000.0

# Hydrogen electrolysis subsidy example: 0.3796 €/kWh_HHV (15 years)
# We'll apply it to produced H2_HHV = eta_ch * charged_electricity
h2_sub_eur_per_MWh_HHV = 0.3796 * 1000.0

TECH = {
    "PHS": {
        "eta_ch": 0.90,
        "eta_dis": 0.90,
        "self_dis": 0.0,             # per step fraction
        "Emax": 5000.0,              # MWh_store
        "Pch_max": 500.0,            # MW
        "Pdis_max": 500.0,           # MW
        "capex_E": 150000.0,         # €/MWh_store (placeholder)
        "capex_Pch": 50000.0,        # €/MW (placeholder)
        "capex_Pdis": 50000.0,       # €/MW (placeholder)
        "fom_E": 2000.0,             # €/MWh_store/year (placeholder)
        "fom_P": 5000.0,             # €/MW/year (placeholder)
        "vom_ch": 0.5,               # €/MWh_e (placeholder)
        "vom_dis": 0.5,              # €/MWh_e (placeholder)
        "subsidy_on_discharge_eur_per_MWh_e": hydro_sub_eur_per_MWh,
        "subsidy_on_h2_MWh_HHV": 0.0,
        "capex_grant_per_MWh_store": 0.0,
        "capex_grant_cap_total": 0.0,
        "capex_discount_factor": 0.0,  # e.g., 0.10 for ~10% proxy
        "salvage_frac": 0.10,

        # Multi-year CAPEX / lifetime modelling
        "lifetime_years": 60,
        "replacement_required": True,
        "capex_escalation_annual": 0.0,      # + => more expensive over time; - => cheaper
        "capex_payment_years": 10,           # example: spread CAPEX over 10 years
        "capex_payment_profile": None,       # None => equal split over capex_payment_years
        "commissioning_delay_years": 10,     # example: no operating cashflows before year 10
        "grant_repeats": False,              # grant only on first build by default
    },
    "Flywheel": {
        "eta_ch": 0.92,
        "eta_dis": 0.92,
        "self_dis": 0.001,
        "Emax": 200.0,
        "Pch_max": 200.0,
        "Pdis_max": 200.0,
        "capex_E": 400000.0,         # placeholder
        "capex_Pch": 80000.0,        # placeholder
        "capex_Pdis": 80000.0,       # placeholder
        "fom_E": 6000.0,             # placeholder
        "fom_P": 8000.0,             # placeholder
        "vom_ch": 1.0,
        "vom_dis": 1.0,
        "subsidy_on_discharge_eur_per_MWh_e": 0.0,
        "subsidy_on_h2_MWh_HHV": 0.0,
        "capex_grant_per_MWh_store": 0.0,
        "capex_grant_cap_total": 0.0,
        "capex_discount_factor": 0.10,
        "salvage_frac": 0.10,

        "lifetime_years": 20,
        "replacement_required": True,
        "capex_escalation_annual": 0.0,
        "capex_payment_years": 1,
        "capex_payment_profile": [1.0],
        "commissioning_delay_years": 0,
        "grant_repeats": False,
    },
    "Battery": {
        "eta_ch": 0.95,
        "eta_dis": 0.95,
        "self_dis": 0.0005,
        "Emax": 1000.0,
        "Pch_max": 500.0,
        "Pdis_max": 500.0,
        "capex_E": 250000.0,         # placeholder
        "capex_Pch": 60000.0,        # placeholder
        "capex_Pdis": 60000.0,       # placeholder
        "fom_E": 5000.0,             # placeholder
        "fom_P": 7000.0,             # placeholder
        "vom_ch": 2.0,
        "vom_dis": 2.0,
        "subsidy_on_discharge_eur_per_MWh_e": 0.0,
        "subsidy_on_h2_MWh_HHV": 0.0,
        # SPRILA example (big company): 70 €/kWh => 70,000 €/MWh (cap applies)
        "capex_grant_per_MWh_store": 70000.0,   # €/MWh_store
        "capex_grant_cap_total": 350000.0,      # € cap (from your doc table)
        "capex_discount_factor": 0.10,
        "salvage_frac": 0.10,

        "lifetime_years": 12,
        "replacement_required": True,
        "capex_escalation_annual": 0.0,
        "capex_payment_years": 1,
        "capex_payment_profile": [1.0],
        "commissioning_delay_years": 0,
        "grant_repeats": False,
    },
    "Hydrogen": {
        # Interpret SOC as stored H2 energy (MWh_HHV).
        # eta_ch: electricity -> H2_HHV conversion
        # eta_dis: H2_HHV -> electricity conversion
        "eta_ch": 0.70,
        "eta_dis": 0.55,
        "self_dis": 0.0,
        "Emax": 20000.0,             # MWh_HHV
        "Pch_max": 1000.0,           # MW_e into electrolyser
        "Pdis_max": 500.0,           # MW_e out from fuel cell/turbine
        "capex_E": 20000.0,          # €/MWh_HHV storage (placeholder)
        "capex_Pch": 500000.0,       # €/MW_e electrolysis (placeholder)
        "capex_Pdis": 700000.0,      # €/MW_e power block (placeholder)
        "fom_E": 300.0,              # €/MWh_HHV/year (placeholder)
        "fom_P": 15000.0,            # €/MW/year (placeholder)
        "vom_ch": 1.0,
        "vom_dis": 3.0,
        "subsidy_on_discharge_eur_per_MWh_e": 0.0,
        # Apply subsidy on produced H2_HHV over 15 years (from your doc)
        "subsidy_on_h2_MWh_HHV": h2_sub_eur_per_MWh_HHV,
        "capex_grant_per_MWh_store": 0.0,
        "capex_grant_cap_total": 0.0,
        "capex_discount_factor": 0.0,
        "salvage_frac": 0.05,

        "lifetime_years": 25,
        "replacement_required": True,
        "capex_escalation_annual": 0.0,
        "capex_payment_years": 1,
        "capex_payment_profile": [1.0],
        "commissioning_delay_years": 0,
        "grant_repeats": False,
    },
}

# Economic settings (course-consistent assumptions)
discount_rate = 0.08
H_years = 25
subsidy_years = 15


In [5]:
def solve_one_tech(
    tech_name,
    prices,
    surplus,
    scenario_prob,
    dt,
    year_scale,
    discount_rate,
    H_years,
    subsidy_years,
    output_flag=0,
    time_limit=60,
    prevent_simultaneous=True,
    force_frac_ub=None,
    force_min_E=None,
    force_min_P=None,
    price_drift_annual=None,
):
    """Solve design + dispatch for one technology, with:
    - price scenarios (S,T) for a representative year
    - long-horizon NPV with optional annual price drift (applies to price-dependent cashflows)
    - multi-year CAPEX schedules + replacements (lifetime) + commissioning delays
    """

    td = TECH[tech_name]
    prices = as_2d(prices)
    surplus = as_2d(surplus)
    scenario_prob = np.asarray(scenario_prob, dtype=float)
    scenario_prob = scenario_prob / scenario_prob.sum()

    S, T = prices.shape

    # Default drift: use global variable if present; else 0
    if price_drift_annual is None:
        price_drift_annual = float(globals().get("price_drift_annual", 0.0))
    price_drift_annual = float(price_drift_annual)

    r = float(discount_rate)
    H_years = int(H_years)
    subsidy_years = int(subsidy_years)

    commissioning_delay = int(max(0, td.get("commissioning_delay_years", 0)))

    # PV factors
    PW_ops = pw_factor_shifted(r, H_years, commissioning_delay)                        # constant annual amounts (VOM/FOM)
    PW_price = pw_factor_growth_shifted(r, price_drift_annual, H_years, commissioning_delay)  # price-dependent part

    # Subsidy applies for the first `subsidy_years` years of operation (starting at commissioning)
    H_sub_total = min(H_years, commissioning_delay + max(0, subsidy_years))
    PW_sub = pw_factor_shifted(r, H_sub_total, commissioning_delay)

    # CAPEX schedule / replacements
    lifetime_years = td.get("lifetime_years", None)
    replacement_required = bool(td.get("replacement_required", True))
    capex_escalation_annual = float(td.get("capex_escalation_annual", 0.0))
    capex_payment_years = int(max(0, td.get("capex_payment_years", 0)))
    capex_payment_profile = td.get("capex_payment_profile", None)

    capex_mult = build_capex_multiplier_series(
        H=H_years,
        lifetime_years=lifetime_years,
        construction_years=capex_payment_years,
        capex_payment_profile=capex_payment_profile,
        replacement_required=replacement_required,
        capex_escalation_annual=capex_escalation_annual,
    )
    PV_capex_factor = float(np.sum(capex_mult / (1.0 + r) ** np.arange(len(capex_mult))))

    # Salvage on the last installed asset (prorated by remaining life)
    salvage_frac = float(td.get("salvage_frac", 0.0))
    if lifetime_years is None or (not replacement_required) or lifetime_years <= 0:
        install_years = [0]
        L = H_years
    else:
        L = int(lifetime_years)
        install_years = list(range(0, H_years, L))
    last_install = int(max(install_years))
    used_years_last = H_years - last_install
    remaining_years_last = max(L - used_years_last, 0)
    salvage_effective_frac = 0.0 if L <= 0 else salvage_frac * (remaining_years_last / L)
    salvage_growth = (1.0 + capex_escalation_annual) ** last_install
    salvage_pv_factor = float(salvage_effective_frac * salvage_growth / (1.0 + r) ** H_years)

    # -------------------------
    # Build model
    # -------------------------
    m = gp.Model(f"storage_{tech_name}")
    m.Params.OutputFlag = output_flag
    if time_limit is not None:
        m.Params.TimeLimit = time_limit
    if prevent_simultaneous:
        m.Params.MIPGap = 0.05

    # -------------------------
    # Design vars
    # -------------------------
    E = m.addVar(lb=0.0, ub=td["Emax"], name="E")          # MWh_store (or MWh_HHV for hydrogen)
    Pch = m.addVar(lb=0.0, ub=td["Pch_max"], name="Pch")   # MW
    Pdis = m.addVar(lb=0.0, ub=td["Pdis_max"], name="Pdis")# MW

    # Optional one-time CAPEX grant (linearised min(rate*E, cap))
    grant = m.addVar(lb=0.0, ub=td["capex_grant_cap_total"], name="capex_grant")
    if td["capex_grant_cap_total"] > 0 and td["capex_grant_per_MWh_store"] > 0:
        m.addConstr(grant <= td["capex_grant_per_MWh_store"] * E, name="grant_rate")
        m.addConstr(grant <= td["capex_grant_cap_total"], name="grant_cap")
    else:
        m.addConstr(grant == 0.0, name="no_grant")

    # -------------------------
    # Operational vars
    # -------------------------
    c = m.addVars(S, T, lb=0.0, name="c")         # MW charging
    d = m.addVars(S, T, lb=0.0, name="d")         # MW discharging
    soc = m.addVars(S, T + 1, lb=0.0, name="soc") # MWh state of charge

    if prevent_simultaneous:
        z = m.addVars(S, T, vtype=GRB.BINARY, name="isCharging")
    else:
        z = None

    eta_ch = float(td["eta_ch"])
    eta_dis = float(td["eta_dis"])
    self_dis = float(td["self_dis"])

    M_ch = float(td["Pch_max"])
    M_dis = float(td["Pdis_max"])

    # -------------------------
    # Constraints
    # -------------------------
    for s in range(S):
        m.addConstr(soc[s, 0] <= E, name=f"soc0_ub[{s}]")
        m.addConstr(soc[s, 0] >= 0.0, name=f"soc0_lb[{s}]")
        m.addConstr(soc[s, T] == soc[s, 0], name=f"cyclic[{s}]")  # steady-state year

        for t in range(T):
            m.addConstr(soc[s, t] <= E, name=f"soc_ub[{s},{t}]")
            m.addConstr(c[s, t] <= Pch, name=f"pch[{s},{t}]")
            m.addConstr(d[s, t] <= Pdis, name=f"pdis[{s},{t}]")
            m.addConstr(c[s, t] <= float(surplus[s, t]), name=f"surplus[{s},{t}]")
            if prevent_simultaneous:
                m.addConstr(c[s, t] <= M_ch * z[s, t], name=f"noSim_c[{s},{t}]")
                m.addConstr(d[s, t] <= M_dis * (1 - z[s, t]), name=f"noSim_d[{s},{t}]")
            m.addConstr(
                soc[s, t + 1]
                == (1.0 - self_dis) * soc[s, t]
                + eta_ch * c[s, t] * dt
                - (d[s, t] * dt) / eta_dis,
                name=f"dyn[{s},{t}]"
            )

    # ============================================================
    # Counterfactual: force a non-zero investment to explain "why 0?"
    # ============================================================
    m.update()

    def _safe_ub(var, td, fallback_keys):
        try:
            return float(var.UB)
        except Exception:
            for k in fallback_keys:
                if k in td and td[k] is not None:
                    try:
                        return float(td[k])
                    except Exception:
                        pass
            return float("inf")

    def _finite_pos(x):
        try:
            x = float(x)
            return np.isfinite(x) and x > 0.0
        except Exception:
            return False

    applied_any = False

    if force_frac_ub is not None:
        ub_E = _safe_ub(E, td, ["Emax"])
        ub_Pch = _safe_ub(Pch, td, ["Pch_max"])
        ub_Pdis = _safe_ub(Pdis, td, ["Pdis_max"])
        if not (_finite_pos(ub_E) and _finite_pos(ub_Pch) and _finite_pos(ub_Pdis)):
            raise RuntimeError("Cannot apply force_frac_ub: non-finite UB(s) detected.")

        frac = float(force_frac_ub)
        if frac <= 0.0 or frac >= 1.0:
            raise ValueError("force_frac_ub must be in (0,1).")

        m.addConstr(E >= frac * ub_E, name="force_E_min")
        m.addConstr(Pch >= frac * ub_Pch, name="force_Pch_min")
        m.addConstr(Pdis >= frac * ub_Pdis, name="force_Pdis_min")
        applied_any = True

    if force_min_E is not None:
        m.addConstr(E >= float(force_min_E), name="force_E_abs_min")
        applied_any = True

    if force_min_P is not None:
        m.addConstr(Pch + Pdis >= float(force_min_P), name="force_P_total_min")
        applied_any = True

    # -------------------------
    # Economics (annual cashflow components)
    # -------------------------
    capex_raw = td["capex_E"] * E + td["capex_Pch"] * Pch + td["capex_Pdis"] * Pdis
    capex_net = (1.0 - float(td.get("capex_discount_factor", 0.0))) * capex_raw  # CAPEX after discount factor, before grant

    fom = td["fom_E"] * E + td["fom_P"] * (Pch + Pdis)  # €/year, costs

    sub_on_dis = float(td.get("subsidy_on_discharge_eur_per_MWh_e", 0.0))
    sub_on_h2 = float(td.get("subsidy_on_h2_MWh_HHV", 0.0))

    exp_annual_arbitrage = gp.LinExpr()
    exp_annual_vom = gp.LinExpr()
    exp_annual_subsidy = gp.LinExpr()

    for s in range(S):
        scen_arbitrage = gp.LinExpr()
        scen_vom = gp.LinExpr()
        scen_sub = gp.LinExpr()

        for t in range(T):
            p = float(prices[s, t])
            scen_arbitrage += p * (d[s, t] - c[s, t]) * dt
            scen_vom += -(float(td["vom_ch"]) * c[s, t] + float(td["vom_dis"]) * d[s, t]) * dt
            if sub_on_dis != 0.0:
                scen_sub += sub_on_dis * d[s, t] * dt
            if sub_on_h2 != 0.0:
                # subsidy on produced H2_HHV (MWh_HHV) = eta_ch * charged electricity (MWh_e)
                scen_sub += sub_on_h2 * (eta_ch * c[s, t] * dt)

        w = float(scenario_prob[s])
        exp_annual_arbitrage += w * scen_arbitrage
        exp_annual_vom += w * scen_vom
        exp_annual_subsidy += w * scen_sub

    # Scale short test windows up to annualised values
    exp_annual_arbitrage *= float(year_scale)
    exp_annual_vom *= float(year_scale)
    exp_annual_subsidy *= float(year_scale)

    # PV terms
    pv_capex_cost = -PV_capex_factor * capex_net + grant  # grant is one-time at t=0 (default)
    pv_salvage = salvage_pv_factor * capex_net            # salvage is a fraction of last installed CAPEX
    pv_ops_const = PW_ops * (exp_annual_vom - fom)        # constant annual costs (VOM + FOM)
    pv_price = PW_price * exp_annual_arbitrage            # price-dependent part with drift
    pv_sub = PW_sub * exp_annual_subsidy                  # subsidies only for subsidy_years of operation

    m.setObjective(pv_capex_cost + pv_salvage + pv_ops_const + pv_price + pv_sub, GRB.MAXIMIZE)

    m.optimize()

    if m.Status not in (GRB.OPTIMAL, GRB.TIME_LIMIT, GRB.SUBOPTIMAL):
        return {"tech": tech_name, "status": int(m.Status), "npv": None}

    # -------------------------
    # Collect results
    # -------------------------
    E_val = float(E.X)
    Pch_val = float(Pch.X)
    Pdis_val = float(Pdis.X)
    npv_val = float(m.ObjVal)

    c0 = np.array([c[0, t].X for t in range(T)], dtype=float)
    d0 = np.array([d[0, t].X for t in range(T)], dtype=float)
    soc0 = np.array([soc[0, t].X for t in range(T + 1)], dtype=float)
    soc_pct = (soc0[:-1] / E_val * 100.0) if E_val > 1e-9 else np.zeros(T)

    # Numeric breakdown (recompute with solution values)
    # Annual expectations
    arb_s = []
    vom_s = []
    sub_s = []
    for s in range(S):
        arb = float(np.sum(prices[s, :] * (np.array([d[s, t].X for t in range(T)]) - np.array([c[s, t].X for t in range(T)])) * dt))
        vom = -float(np.sum((float(td["vom_ch"]) * np.array([c[s, t].X for t in range(T)]) + float(td["vom_dis"]) * np.array([d[s, t].X for t in range(T)])) * dt))
        sub = 0.0
        if sub_on_dis != 0.0:
            sub += float(sub_on_dis * np.sum(np.array([d[s, t].X for t in range(T)]) * dt))
        if sub_on_h2 != 0.0:
            sub += float(sub_on_h2 * np.sum(eta_ch * np.array([c[s, t].X for t in range(T)]) * dt))
        arb_s.append(arb * year_scale)
        vom_s.append(vom * year_scale)
        sub_s.append(sub * year_scale)

    exp_arb_num = float(np.dot(scenario_prob, arb_s))
    exp_vom_num = float(np.dot(scenario_prob, vom_s))
    exp_sub_num = float(np.dot(scenario_prob, sub_s))
    fom_num = float(td["fom_E"] * E_val + td["fom_P"] * (Pch_val + Pdis_val))

    capex_raw_num = float(td["capex_E"] * E_val + td["capex_Pch"] * Pch_val + td["capex_Pdis"] * Pdis_val)
    capex_net_num = float((1.0 - float(td.get("capex_discount_factor", 0.0))) * capex_raw_num)
    grant_num = float(grant.X)

    pv_capex_num = -PV_capex_factor * capex_net_num + grant_num
    pv_salvage_num = salvage_pv_factor * capex_net_num
    pv_price_num = PW_price * exp_arb_num
    pv_ops_num = PW_ops * (exp_vom_num - fom_num)
    pv_sub_num = PW_sub * exp_sub_num

    npv_breakdown = {
        "PV_CAPEX": pv_capex_num,
        "PV_SALVAGE": pv_salvage_num,
        "PV_PRICE_ARBITRAGE": pv_price_num,
        "PV_VOM_FOM": pv_ops_num,
        "PV_SUBSIDY": pv_sub_num,
        "NPV_TOTAL": pv_capex_num + pv_salvage_num + pv_price_num + pv_ops_num + pv_sub_num,
    }

    scenario_breakdown = pd.DataFrame({
        "scenario": scenario_names if "scenario_names" in globals() and len(globals().get("scenario_names", [])) == S else [f"s{s}" for s in range(S)],
        "prob": scenario_prob,
        "annual_arbitrage": arb_s,
        "annual_vom": vom_s,
        "annual_subsidy": sub_s,
    })

    out = {
        "tech": tech_name,
        "status": int(m.Status),
        "npv": npv_val,
        "E": E_val,
        "Pch": Pch_val,
        "Pdis": Pdis_val,
        "assumptions": {
            "discount_rate": r,
            "H_years": H_years,
            "subsidy_years": subsidy_years,
            "commissioning_delay_years": commissioning_delay,
            "price_drift_annual": price_drift_annual,
            "PV_capex_factor": PV_capex_factor,
            "PW_ops": PW_ops,
            "PW_price": PW_price,
            "PW_sub": PW_sub,
            "salvage_pv_factor": salvage_pv_factor,
            "lifetime_years": lifetime_years,
            "capex_payment_years": capex_payment_years,
            "capex_escalation_annual": capex_escalation_annual,
        },
        "npv_breakdown": npv_breakdown,
        "scenario_breakdown": scenario_breakdown,
        "capex_mult_by_year": capex_mult,
        "dispatch_s0": pd.DataFrame({
            "t": np.arange(T),
            "price": prices[0],
            "surplus": surplus[0],
            "charge_MW": c0,
            "discharge_MW": d0,
            "soc_MWh": soc0[:-1],
            "soc_pct": soc_pct,
        }),
    }

    if prevent_simultaneous:
        out["simultaneous_hours_s0"] = int(np.sum((c0 > 1e-6) & (d0 > 1e-6)))

    return out


In [6]:
# =========================
# 3) Baseline solves
# =========================
import numpy as np
import pandas as pd

results = []
for tech_name in TECH.keys():
    res = solve_one_tech(
        tech_name=tech_name,
        prices=prices,
        surplus=surplus,
        scenario_prob=scenario_prob,
        dt=dt,
        year_scale=year_scale,
        discount_rate=discount_rate,
        H_years=H_years,
        subsidy_years=subsidy_years,
        output_flag=1,
        time_limit=60.0,
        prevent_simultaneous=True,
        price_drift_annual=price_drift_annual,
    )
    results.append(res)

summary = pd.DataFrame([{
    "tech": r["tech"],
    "status": r["status"],
    "NPV": r.get("npv", np.nan),
    "E (MWh_store)": r.get("E", np.nan),
    "Pch (MW)": r.get("Pch", np.nan),
    "Pdis (MW)": r.get("Pdis", np.nan),
    "simultaneous_hours_s0": r.get("simultaneous_hours_s0", np.nan),
} for r in results]).sort_values("NPV", ascending=False)

display(summary)

# -------------------------
# Counterfactual diagnostics: if a tech optimises to ~0, force a small build to see why
# -------------------------
r0_list = [r for r in results if (r.get("npv") is not None) and (abs(float(r.get("npv", 0.0))) < 1e-6)]

if len(r0_list) > 0:
    print(f"Found {len(r0_list)} technology(ies) with NPV≈0. Running counterfactual forced-build solves...")
    for r0 in r0_list:
        tech_name = r0["tech"]
        print("\n" + "="*80)
        print(f"Counterfactual (force 10% of UB) for: {tech_name}")

        rbest = solve_one_tech(
            tech_name=tech_name,
            prices=prices,
            surplus=surplus,
            scenario_prob=scenario_prob,
            dt=dt,
            year_scale=year_scale,
            discount_rate=discount_rate,
            H_years=H_years,
            subsidy_years=subsidy_years,
            output_flag=1,
            time_limit=60.0,
            prevent_simultaneous=True,
            force_frac_ub=0.10,
            price_drift_annual=price_drift_annual,
        )

        display(pd.Series((rbest or {}).get("npv_breakdown", {}), name=f"{tech_name} npv_breakdown"))
        display((rbest or {}).get("scenario_breakdown", pd.DataFrame()).head())
else:
    print("No technologies with NPV≈0 in baseline solve.")


Set parameter Username
Set parameter LicenseID to value 2661894
Academic license - for non-commercial use only - expires 2026-05-07
Set parameter OutputFlag to value 1
Set parameter TimeLimit to value 60
Set parameter MIPGap to value 0.05
Gurobi Optimizer version 12.0.3 build v12.0.3rc0 (win64 - Windows 11+.0 (26200.2))

CPU model: Intel(R) Core(TM) i7-10750H CPU @ 2.60GHz, instruction set [SSE2|AVX|AVX2]
Thread count: 6 physical cores, 12 logical processors, using up to 12 threads

Non-default parameters:
TimeLimit  60
MIPGap  0.05

Optimize a model with 183970 rows, 105127 columns and 394216 nonzeros
Model fingerprint: 0x0b2afe89
Variable types: 78847 continuous, 26280 integer (26280 binary)
Coefficient statistics:
  Matrix range     [9e-01, 5e+02]
  Objective range  [3e-02, 1e+05]
  Bounds range     [1e+00, 5e+03]
  RHS range        [3e+02, 6e+02]
Found heuristic solution: objective -0.0000000
Presolve removed 26290 rows and 4 columns
Presolve time: 1.10s
Presolved: 157680 rows, 105

Unnamed: 0,tech,status,NPV,E (MWh_store),Pch (MW),Pdis (MW),simultaneous_hours_s0
0,PHS,2,6914418000.0,5000.0,500.0,500.0,0
3,Hydrogen,2,5484055000.0,20000.0,508.719511,500.0,0
2,Battery,2,3934254000.0,1000.0,500.0,500.0,0
1,Flywheel,2,759424200.0,200.0,200.0,183.816,0


No technologies with NPV≈0 in baseline solve.


In [7]:
# =========================
# 5) Best alternative + its dispatch (scenario 0)
# =========================
best = max([r for r in results if r["npv"] is not None], key=lambda x: x["npv"])
best["tech"], best["npv"]


('PHS', 6914418117.602101)

In [8]:
import numpy as np
import ipywidgets as widgets
from IPython.display import display, clear_output
from plotly.subplots import make_subplots
import plotly.graph_objects as go

df = best["dispatch_s0"]

t = df["t"].to_numpy()
price = df["price"].to_numpy()
charge = df["charge_MW"].to_numpy()
discharge = df["discharge_MW"].to_numpy()

soc_mwh = df["soc_MWh"].to_numpy()
soc_pct = df["soc_pct"].to_numpy()

t_min = int(np.nanmin(t))
t_max = int(np.nanmax(t))

default_start = max(t_min, 0)
default_end = min(t_max, default_start + 24)
if default_end <= default_start:
    default_end = min(t_max, default_start + 1)

def build_fig():
    fig = make_subplots(
        rows=3, cols=1,
        shared_xaxes=True,
        vertical_spacing=0.06,
        subplot_titles=(
            f"Scenario 0 price - {best['tech']}",
            f"Charge / discharge - {best['tech']}",
            f"State of charge - {best['tech']}"
        )
    )

    fig.add_trace(go.Scatter(x=t, y=price, mode="lines", name="Price (€/MWh)"), row=1, col=1)
    fig.add_trace(go.Scatter(x=t, y=charge, mode="lines", name="Charge (MW)"), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=discharge, mode="lines", name="Discharge (MW)"), row=2, col=1)
    fig.add_trace(go.Scatter(x=t, y=soc_pct, mode="lines", name="SOC (%)"), row=3, col=1)

    fig.update_yaxes(title_text="€/MWh", row=1, col=1)
    fig.update_yaxes(title_text="MW", row=2, col=1)
    fig.update_yaxes(title_text="Energy", row=3, col=1)
    fig.update_yaxes(title_text="SOC (%)", row=3, col=1, range=[0, 100])
    fig.update_xaxes(title_text="t", row=3, col=1)

    fig.update_layout(
        hovermode="x unified",
        height=850,
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0),
    )

    # Optional: range slider on bottom axis (still useful in addition to manual range)
    fig.update_xaxes(rangeslider_visible=True, row=3, col=1)
    return fig

# ---- Controls (start/end inputs + sliders) ----
start_box = widgets.IntText(value=default_start, description="start t")
end_box   = widgets.IntText(value=default_end, description="end t")

start_slider = widgets.IntSlider(
    value=default_start, min=t_min, max=t_max-1, step=1,
    description="start", continuous_update=False, layout=widgets.Layout(width="700px")
)
end_slider = widgets.IntSlider(
    value=default_end, min=t_min+1, max=t_max, step=1,
    description="end", continuous_update=False, layout=widgets.Layout(width="700px")
)

out = widgets.Output()
_lock = {"busy": False}

def clamp_and_sync(start, end):
    start = int(max(t_min, min(start, t_max - 1)))
    end   = int(max(start + 1, min(end, t_max)))

    # keep end slider feasible
    end_slider.min = start + 1
    if end < end_slider.min:
        end = end_slider.min

    return start, end

def redraw(start, end):
    fig = build_fig()
    fig.update_xaxes(range=[start, end])  # applies to all shared x-axes
    with out:
        clear_output(wait=True)
        display(fig)

def on_any_change(_):
    if _lock["busy"]:
        return
    _lock["busy"] = True
    try:
        # take values from boxes (authoritative)
        start, end = clamp_and_sync(start_box.value, end_box.value)

        # sync everything
        start_box.value = start
        end_box.value = end
        start_slider.value = start
        end_slider.value = end

        redraw(start, end)
    finally:
        _lock["busy"] = False

def on_slider_change(_):
    if _lock["busy"]:
        return
    _lock["busy"] = True
    try:
        start, end = clamp_and_sync(start_slider.value, end_slider.value)

        start_box.value = start
        end_box.value = end
        start_slider.value = start
        end_slider.value = end

        redraw(start, end)
    finally:
        _lock["busy"] = False

start_box.observe(on_any_change, names="value")
end_box.observe(on_any_change, names="value")
start_slider.observe(on_slider_change, names="value")
end_slider.observe(on_slider_change, names="value")

# Initial draw
start0, end0 = clamp_and_sync(default_start, default_end)
start_box.value, end_box.value = start0, end0
start_slider.value, end_slider.value = start0, end0
redraw(start0, end0)

display(widgets.VBox([
    widgets.HBox([start_box, end_box]),
    start_slider,
    end_slider,
    out
]))

VBox(children=(HBox(children=(IntText(value=0, description='start t'), IntText(value=24, description='end t'))…

In [9]:
td = TECH["Hydrogen"]
df = best["dispatch_s0"]
year_scale = 8760/(T*dt)

annual_charge_MWh = year_scale * (df["charge_MW"].to_numpy() * dt).sum()
annual_h2_MWh_HHV = td["eta_ch"] * annual_charge_MWh
annual_h2_subsidy = TECH["Hydrogen"]["subsidy_on_h2_MWh_HHV"] * annual_h2_MWh_HHV

annual_charge_MWh, annual_h2_MWh_HHV, annual_h2_subsidy


(np.float64(2074051.6333318641),
 np.float64(1451836.143332305),
 np.float64(551117000.0089428))