In [None]:
from __future__ import annotations
import re
from pathlib import Path
from typing import Dict, Optional, List

import numpy as np
import pandas as pd

# ====================== Configuration (Modify as needed) ======================
CAPACITY_DIVISOR = 1000.0   # Normalize capacity by dividing by 1000
COEF_PVAL_MAX = None        # e.g., set to 0.05 to filter coefficients by significance
PATH_CAP_ISO   = Path("./tables/datacenter_sum.xlsx")
PATH_CAP_CITY  = Path("./tables_city/city_dc.xlsx")
CITY_SHEET     = None
PSEUDO_ISO_FOR_CITY = "CITY"

PATH_COEF      = Path("./fitting_result/res_dk_results.xlsx")
NONISO_COEF_SHEET = "sheet1"

PATH_AVG_PRICE = Path("./fitting_result/zone_price_diff_means.xlsx")
NONISO_PRICE_SHEET = "sheet1"

TARGET_YEARS = [2025, 2030]
OUT_XLSX = Path("./fitting_result/dc_price_impacts_2025_2030_by_iso.xlsx")
# ==============================================================================

# ---------- Utility: Name normalization (remove spaces/punctuation, ignore case) ----------
_norm_rx = re.compile(r"[^\w]+", flags=re.UNICODE)
def norm_name(s: str) -> str:
    if s is None:
        return ""
    return _norm_rx.sub("", str(s)).lower()

def read_all_sheets(path: Path) -> Dict[str, pd.DataFrame]:
    return pd.read_excel(path, sheet_name=None)

def pick_year_column(df: pd.DataFrame) -> str:
    cols_lower = {c.lower(): c for c in df.columns}
    # Check for English and Chinese aliases
    for key in ["year"]:
        if key in cols_lower:
            return cols_lower[key]
    first = df.columns[0]
    sample = pd.to_numeric(df[first], errors="coerce")
    if sample.dropna().between(1900, 2100).all():
        return first
    raise ValueError("Cannot identify year column, please check headers.")

def tidy_capacity_sheet(df: pd.DataFrame, iso_name: str,
                        is_city_block: bool = False) -> pd.DataFrame:
    ycol = pick_year_column(df)
    df = df.copy()
    df = df.loc[:, [ycol] + [c for c in df.columns if c != ycol]]
    df = df[pd.to_numeric(df[ycol], errors="coerce").between(2020, 2035)].copy()
    value_cols = [c for c in df.columns if c != ycol]
    m = df.melt(id_vars=[ycol], value_vars=value_cols,
                var_name="region", value_name="capacity")
    m.rename(columns={ycol: "year"}, inplace=True)
    m["iso"] = iso_name
    m["region_norm"] = m["region"].map(norm_name)
    m["capacity"] = pd.to_numeric(m["capacity"], errors="coerce") / CAPACITY_DIVISOR
    return m[["iso", "region", "region_norm", "year", "capacity"]]

def load_iso_capacity(path: Path) -> pd.DataFrame:
    book = read_all_sheets(path)
    frames = []
    for sheet, df in book.items():
        if df is not None and not df.empty:
            frames.append(tidy_capacity_sheet(df, iso_name=sheet))
    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(
        columns=["iso", "region", "region_norm", "year", "capacity"]
    )

def load_city_capacity(path: Path, sheet_name: Optional[str], pseudo_iso: str) -> pd.DataFrame:
    book = read_all_sheets(path)
    if sheet_name is None:
        for name, df in book.items():
            if df is not None and not df.empty:
                sheet_name = name
                break
    if sheet_name is None or sheet_name not in book:
        return pd.DataFrame(columns=["iso", "region", "region_norm", "year", "capacity"])
    df = book[sheet_name]
    return tidy_capacity_sheet(df, iso_name=pseudo_iso, is_city_block=True)

# --------- Read Coefficients (Supports "Region-Coef table" and "Regression Result" formats) ----------
def _find_col(cols_map: Dict[str, str], candidates: List[str]) -> Optional[str]:
    for k in candidates:
        if k in cols_map:
            return cols_map[k]
    return None

def tidy_coef_sheet(df: pd.DataFrame, iso_name: str) -> pd.DataFrame:

    if df is None or df.empty:
        return pd.DataFrame(columns=["iso","region","region_norm","coef","ci_lower","ci_upper"])

    cols = {c.lower().strip(): c for c in df.columns}

    # Possible column aliases (English and Chinese)
    zone_col = _find_col(cols, ["zone", "region", "area", "market area", "subzone"])
    coef_col = _find_col(cols, ["coef", "coefficient", "estimate", "beta"])
    pval_col = _find_col(cols, ["pval", "p_value", "p-value", "p"])

    # Confidence Interval column aliases
    lo_col = _find_col(cols, ["ci_lower", "ci lower", "lower", "lb", "lcl", "lci", "2.5%", "low", "lower_ci"])
    hi_col = _find_col(cols, ["ci_upper", "ci upper", "upper", "ub", "ucl", "uci", "97.5%", "high", "upper_ci"])

    # -------- Structure A: Coefficients by Region --------
    if zone_col is not None and coef_col is not None:
        out = df[[zone_col, coef_col]].copy()
        out.columns = ["region", "coef"]
        out["ci_lower"] = pd.to_numeric(df[lo_col], errors="coerce") if lo_col else np.nan
        out["ci_upper"] = pd.to_numeric(df[hi_col], errors="coerce") if hi_col else np.nan

        # Optional significance filtering
        if COEF_PVAL_MAX is not None and pval_col in df.columns:
            mask = pd.to_numeric(df[pval_col], errors="coerce") <= COEF_PVAL_MAX
            out = out.loc[mask].copy()

        out["iso"] = iso_name
        out["region_norm"] = out["region"].map(norm_name)
        out["coef"] = pd.to_numeric(out["coef"], errors="coerce")
        return out[["iso","region","region_norm","coef","ci_lower","ci_upper"]]

    # -------- Structure B: Regression Result (Variables in Rows), extract dc_local --------
    if coef_col is None:
        return pd.DataFrame(columns=["iso","region","region_norm","coef","ci_lower","ci_upper"])

    # Variable Name Column: Prefer first column; use index if first column is numeric
    var_col = df.columns[0]
    if pd.api.types.is_numeric_dtype(df[var_col]):
        var_series = pd.Index(df.index).astype(str)
    else:
        var_series = df[var_col].astype(str)

    norm_var = lambda s: re.sub(r"[\W_]+","",str(s)).lower()
    mask = var_series.map(norm_var).eq(norm_var("dc_local"))
    if not mask.any():
        return pd.DataFrame(columns=["iso","region","region_norm","coef","ci_lower","ci_upper"])

    sub = df.loc[mask].copy()

    # Significance Filtering
    if COEF_PVAL_MAX is not None and pval_col in sub.columns:
        sub = sub[pd.to_numeric(sub[pval_col], errors="coerce") <= COEF_PVAL_MAX]
        if sub.empty:
            return pd.DataFrame(columns=["iso","region","region_norm","coef","ci_lower","ci_upper"])

    coef_val = pd.to_numeric(sub[coef_col], errors="coerce").dropna()
    ci_lo = pd.to_numeric(sub[lo_col], errors="coerce").dropna() if lo_col else pd.Series(dtype=float)
    ci_hi = pd.to_numeric(sub[hi_col], errors="coerce").dropna() if hi_col else pd.Series(dtype=float)
    if coef_val.empty:
        return pd.DataFrame(columns=["iso","region","region_norm","coef","ci_lower","ci_upper"])

    return pd.DataFrame({
        "iso": [iso_name],
        "region": [np.nan],
        "region_norm": [np.nan],
        "coef": [coef_val.iloc[0]],
        "ci_lower": [ci_lo.iloc[0] if not ci_lo.empty else np.nan],
        "ci_upper": [ci_hi.iloc[0] if not ci_hi.empty else np.nan],
    })

def load_all_coefs(path: Path, noniso_sheet: str, pseudo_iso: str) -> pd.DataFrame:
    book = pd.read_excel(path, sheet_name=None)
    frames = []
    for sheet, df in book.items():
        iso_name = pseudo_iso if sheet.lower() == noniso_sheet.lower() else sheet
        frames.append(tidy_coef_sheet(df, iso_name))
    frames = [f for f in frames if f is not None and not f.empty]
    cols = ["iso","region","region_norm","coef","ci_lower","ci_upper"]
    return pd.concat(frames, ignore_index=True)[cols] if frames else pd.DataFrame(columns=cols)

# ---------- Read Average Prices ----------
def tidy_price_sheet(df: pd.DataFrame, iso_name: str) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame(columns=["iso","region","region_norm","avg_price"])
    orig_map = {c.lower().strip(): c for c in df.columns}
    zone_col = None
    for k in ["zone", "region", "area", "market area", "subzone"]:
        if k in orig_map:
            zone_col = orig_map[k]
            break
    if zone_col is None:
        zone_col = df.columns[0]
    candidates = [c for c in df.columns if c != zone_col]
    num_cols = [c for c in candidates if pd.api.types.is_numeric_dtype(df[c])]
    if not num_cols:
        can = []
        for c in candidates:
            s = pd.to_numeric(df[c], errors="coerce")
            if s.notna().any():
                can.append(c)
        num_cols = can
    if not num_cols:
        return pd.DataFrame(columns=["iso","region","region_norm","avg_price"])
    price_col = num_cols[0]
    out = df[[zone_col, price_col]].copy()
    out.columns = ["region","avg_price"]
    out["iso"] = iso_name
    out["region_norm"] = out["region"].map(norm_name)
    out["avg_price"] = pd.to_numeric(out["avg_price"], errors="coerce")
    return out[["iso","region","region_norm","avg_price"]]

def load_all_avg_prices(path: Path, noniso_sheet: str, pseudo_iso: str) -> pd.DataFrame:
    book = read_all_sheets(path)
    frames = []
    for sheet, df in book.items():
        iso_name = pseudo_iso if sheet.lower() == noniso_sheet.lower() else sheet
        frames.append(tidy_price_sheet(df, iso_name))
    frames = [f for f in frames if f is not None and not f.empty]
    return pd.concat(frames, ignore_index=True) if frames else pd.DataFrame(
        columns=["iso","region","region_norm","avg_price"]
    )

# ---------- Main Calculation: Include Confidence Intervals ----------
def compute_impacts_by_iso(
    cap_iso_long: pd.DataFrame,
    cap_city_long: pd.DataFrame,
    coefs_long: pd.DataFrame,
    price_long: pd.DataFrame,
    target_years: List[int],
) -> Dict[str, pd.DataFrame]:

    caps = pd.concat([cap_iso_long, cap_city_long], ignore_index=True)
    if caps.empty:
        return {}

    caps = caps[caps["year"].isin(target_years)].copy()
    result = {}

    for iso, g in caps.groupby("iso", sort=False):
        wide_cap = (g.pivot_table(index=["region","region_norm"], columns="year",
                                  values="capacity", aggfunc="first")
                      .reset_index())

        coef_iso_all = coefs_long.loc[coefs_long["iso"] == iso,
                                      ["region_norm","coef","ci_lower","ci_upper"]]
        price_iso = price_long.loc[price_long["iso"] == iso,
                                   ["region_norm","avg_price"]]

        # Region-specific vs ISO default
        coef_specific = coef_iso_all.loc[coef_iso_all["region_norm"].notna()].copy()
        iso_default = coef_iso_all.loc[coef_iso_all["region_norm"].isna()].copy()

        m = (wide_cap
             .merge(coef_specific, on="region_norm", how="left")
             .merge(price_iso, on="region_norm", how="left"))

        # Fill missing with ISO default coefficients (including CIs)
        if not iso_default.empty:
            def _fill_with_default(col: str):
                if col not in m.columns:  # Safety check
                    m[col] = np.nan
                default_val = pd.to_numeric(iso_default[col], errors="coerce").dropna()
                if not default_val.empty:
                    m[col] = m[col].fillna(default_val.iloc[0])

            for col in ["coef","ci_lower","ci_upper"]:
                _fill_with_default(col)

        # Calculate absolute and percentage increase for each year (with bounds)
        for y in target_years:
            inc_col   = f"inc_{y}"
            inc_lo    = f"inc_{y}_lo"
            inc_hi    = f"inc_{y}_hi"
            pct_col   = f"pct_{y}"
            pct_lo    = f"pct_{y}_lo"
            pct_hi    = f"pct_{y}_hi"

            if y in m.columns:
                cap_y = pd.to_numeric(m[y], errors="coerce")
                coef  = pd.to_numeric(m["coef"], errors="coerce")
                lo    = pd.to_numeric(m["ci_lower"], errors="coerce")
                hi    = pd.to_numeric(m["ci_upper"], errors="coerce")
                price = pd.to_numeric(m["avg_price"], errors="coerce")

                m[inc_col] = cap_y * coef
                m[inc_lo]  = cap_y * lo
                m[inc_hi]  = cap_y * hi

                # Percentage (%); set to NaN if price<=0 or NaN
                denom = price.replace(0, np.nan)
                m[pct_col] = (m[inc_col] / denom) * 100.0
                m[pct_lo]  = (m[inc_lo]  / denom) * 100.0
                m[pct_hi]  = (m[inc_hi]  / denom) * 100.0
            else:
                for c in [inc_col, inc_lo, inc_hi, pct_col, pct_lo, pct_hi]:
                    m[c] = np.nan

        # Organize columns
        base_cols = ["region", "coef", "ci_lower", "ci_upper", "avg_price"]
        year_cols = []
        for y in target_years:
            if y in m.columns:
                year_cols.append(y)  # Capacity column for verification
            year_cols += [f"inc_{y}_lo", f"inc_{y}", f"inc_{y}_hi",
                          f"pct_{y}_lo", f"pct_{y}", f"pct_{y}_hi"]

        ordered = [c for c in base_cols + year_cols if c in m.columns]
        result[iso] = m[ordered].copy()

    return result

In [None]:
# ========================= Run and Export =========================
cap_iso   = load_iso_capacity(PATH_CAP_ISO)
cap_city  = load_city_capacity(PATH_CAP_CITY, CITY_SHEET, PSEUDO_ISO_FOR_CITY)
coefs     = load_all_coefs(PATH_COEF, NONISO_COEF_SHEET, PSEUDO_ISO_FOR_CITY)
avg_price = load_all_avg_prices(PATH_AVG_PRICE, NONISO_PRICE_SHEET, PSEUDO_ISO_FOR_CITY)

by_iso = compute_impacts_by_iso(
    cap_iso_long=cap_iso,
    cap_city_long=cap_city,
    coefs_long=coefs,
    price_long=avg_price,
    target_years=TARGET_YEARS
)

OUT_XLSX.parent.mkdir(parents=True, exist_ok=True)
with pd.ExcelWriter(OUT_XLSX, engine="xlsxwriter") as w:
    for iso, df in by_iso.items():
        sheet = re.sub(r"[^\w\-]", "_", str(iso))[:31] or "sheet"
        df.to_excel(w, sheet_name=sheet, index=False)

print(f"âœ… Result generated: {OUT_XLSX}")
