In [8]:
# -*- coding: utf-8 -*-
"""
BDS 타임시리즈 + 지도(애니메이션) — 메타 호출 없이 Param/getList만 사용
- 최근 10년 단위로 시도별 BDS 산출(PCA/동일/사용자 가중)
- Plotly 코로플레스(연도 슬라이더) HTML 출력
- GeoJSON의 시도명 필드가 '한글/영문' 어느 쪽이든 자동 매칭 (한글이면 매핑 금지)

준비:
1) pip install requests pandas numpy scikit-learn python-dotenv plotly
2) .env 또는 OS 환경변수에 KOSIS_API_KEY 등록
3) 시도 경계 GeoJSON 경로 지정(아래 __main__의 GEOJSON_PATH 수정)
"""

import os, json, time
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import requests
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
import plotly.express as px

# ---------------- 설정 ----------------
load_dotenv()
API_KEY = os.getenv("KOSIS_API_KEY")
if not API_KEY:
    raise RuntimeError("환경변수 KOSIS_API_KEY가 필요합니다. (.env에 추가하세요)")

OUTDIR = Path("./outputs_timeseries")
OUTDIR.mkdir(parents=True, exist_ok=True)

BASE_PARAM = "https://kosis.kr/openapi/Param/statisticsParameterData.do"

# 표 정의(공식 표ID, ITM_NM 키워드, 연간: prdSe='Y')
TABLES = {
    "pc_grdp":      {"org":"101","tbl":"DT_1C86",     "keyword":"1인당 지역내총생산","prdSe":"Y"},
    "pop_growth":   {"org":"101","tbl":"DT_1YL20621","keyword":"인구증가율","prdSe":"Y"},
    "elderly_rate": {"org":"101","tbl":"DT_1YL20631","keyword":"고령인구비율","prdSe":"Y"},
    "fiscal_indep": {"org":"101","tbl":"DT_1YL20921","keyword":"재정자립도","prdSe":"Y"},
}

# 이름 정규화(GeoJSON/표 간 표기차 흡수)
def normalize_region_kor(s: str) -> str:
    s = str(s).strip()
    rep = {
        "세종시":"세종특별자치시",
        "제주도":"제주특별자치도",
        "전북특별자치도":"전라북도",   # 오래된 GeoJSON 호환용
        "강원특별자치도":"강원도",
        "전남":"전라남도","전북":"전라북도",
        "경남":"경상남도","경북":"경상북도",
        "충남":"충청남도","충북":"충청북도",
    }
    return rep.get(s, s)

KOR2ENG = {
    "서울특별시":"Seoul","부산광역시":"Busan","대구광역시":"Daegu","인천광역시":"Incheon",
    "광주광역시":"Gwangju","대전광역시":"Daejeon","울산광역시":"Ulsan",
    "세종특별자치시":"Sejong","경기도":"Gyeonggi-do","강원도":"Gangwon-do",
    "충청북도":"Chungcheongbuk-do","충청남도":"Chungcheongnam-do",
    "전라북도":"Jeollabuk-do","전라남도":"Jeollanam-do",
    "경상북도":"Gyeongsangbuk-do","경상남도":"Gyeongsangnam-do","제주특별자치도":"Jeju-do"
}

# ----------- KOSIS 호출(Param/getList 전용, JSON 방어 포함) -----------
def _get_json(url: str, params: Dict, retry=3, sleep=0.6):
    """
    일부 상황에서 서버가 HTML/문자열을 200으로 반환할 수 있으므로
    .json() 실패 시 진단 정보를 포함해 예외를 올림.
    """
    last = None
    headers = {"Accept":"application/json"}
    for _ in range(retry):
        r = requests.get(url, params=params, headers=headers, timeout=30)
        try:
            data = r.json()
            # KOSIS는 오류도 200으로 내려줄 수 있음
            if isinstance(data, dict) and data.get("errMsg"):
                last = r
            else:
                return data
        except Exception:
            last = r
        time.sleep(sleep)
    if last is None:
        raise RuntimeError("KOSIS 응답 실패(연결 불가).")
    ct = last.headers.get("Content-Type")
    preview = last.text[:200].replace("\n"," ")
    raise RuntimeError(f"KOSIS JSON 파싱 실패: status={last.status_code}, "
                       f"Content-Type={ct}, preview={preview}, params={params}")

def get_range_all_items(org:str, tbl:str, prdSe:str, y0:int, y1:int) -> pd.DataFrame:
    """
    연도 구간 × 전지역(ALL) × 전항목(ALL) 조회 → 표준 컬럼만 반환
    필드: region_name, period, value, (있으면 ITM_NM/ITM_ID)
    """
    params = {
        "method":"getList","apiKey":API_KEY,"format":"json","jsonVD":"Y",
        "orgId":org,"tblId":tbl,"prdSe":prdSe,
        "startPrdDe":str(y0),"endPrdDe":str(y1),
        "objL1":"ALL","itmId":"ALL",
        # outputFields 생략(표마다 상이; 전체 받아 안전 파싱)
    }
    data = _get_json(BASE_PARAM, params)
    df = pd.DataFrame(data)
    if df.empty:
        return df

    # 필수 컬럼 존재 확인
    if "DT" not in df.columns or "PRD_DE" not in df.columns:
        raise RuntimeError("KOSIS 응답에 DT 또는 PRD_DE 필드가 없습니다.")
    # 지역명 컬럼(C1_NM..)
    region_col = next((c for c in ["C1_NM","C2_NM","C3_NM","C4_NM","C5_NM","C6_NM","C7_NM","C8_NM"] if c in df.columns), None)
    if not region_col:
        raise RuntimeError("지역명 컬럼(C1_NM..C8_NM)을 찾지 못했습니다.")

    # 수치/연도형 변환 및 정리
    df["DT"] = pd.to_numeric(df["DT"], errors="coerce")
    df["PRD_DE"] = pd.to_numeric(df["PRD_DE"], errors="coerce")
    df = df.rename(columns={region_col:"region_name","PRD_DE":"period","DT":"value"})
    df = df.dropna(subset=["region_name","period","value"])
    # 합계행 제거
    df = df[~df["region_name"].isin({"전국","합계","계","전체"})]

    keep = ["region_name","period","value"]
    if "ITM_NM" in df.columns: keep.append("ITM_NM")
    if "ITM_ID" in df.columns: keep.append("ITM_ID")
    return df[keep]

def series_by_keyword(org:str, tbl:str, keyword:str, prdSe:str, y0:int, y1:int) -> pd.DataFrame:
    """
    ITM_NM이 있으면 contains(keyword)로 필터 → 지역×연도 평균
    ITM_NM이 없으면 ITM_ID 평균(있으면 1차) → 지역×연도 평균 (폴백)
    반환: region_name, period(int), value
    """
    raw = get_range_all_items(org, tbl, prdSe, y0, y1)
    if raw.empty:
        return pd.DataFrame(columns=["region_name","period","value"])

    # 지역명 표준화
    raw["region_name"] = raw["region_name"].astype(str).apply(normalize_region_kor)

    if "ITM_NM" in raw.columns and raw["ITM_NM"].notna().any():
        sub = raw[raw["ITM_NM"].astype(str).str.contains(keyword, na=False)]
        if sub.empty:
            cand = ", ".join(sorted(map(str, raw["ITM_NM"].dropna().unique()))[:10])
            raise RuntimeError(f"'{keyword}' 항목을 찾지 못했습니다(표 {tbl}). 가능한 예: {cand}")
        g = sub.groupby(["region_name","period"], as_index=False)["value"].mean()
    else:
        grp = ["region_name","period"] + (["ITM_ID"] if "ITM_ID" in raw.columns else [])
        tmp = raw.groupby(grp, as_index=False)["value"].mean()
        if "ITM_ID" in tmp.columns:
            tmp = tmp.groupby(["region_name","period"], as_index=False)["value"].mean()
        g = tmp

    g["period"] = g["period"].astype(int)
    return g[["region_name","period","value"]]

# ---------- BDS 계산 ----------
def compute_bds_one_year(df_year: pd.DataFrame,
                         cols: List[str],
                         weight_mode: str = "pca",
                         custom: Optional[Dict[str,float]] = None) -> Tuple[pd.DataFrame, Dict[str,float]]:
    """
    df_year: columns=[region_name, period] + cols
    - elderly_rate는 부담방향이므로 부호 반전 후 표준화
    - weight_mode: "pca" | "equal" | "custom"
    """
    X = df_year[cols].astype(float).copy()

    X_adj = X.copy()
    if "elderly_rate" in X_adj.columns:
        X_adj["elderly_rate"] = -X_adj["elderly_rate"]

    Z = StandardScaler().fit_transform(X_adj.values)

    if weight_mode == "equal":
        w = np.full(len(cols), 1/len(cols), dtype=float)
    elif weight_mode == "custom":
        if not custom:
            raise ValueError("custom 가중치 dict가 필요합니다.")
        w = np.array([custom[c] for c in cols], dtype=float)
        s = w.sum()
        if s == 0:
            raise ValueError("custom 가중치의 합이 0입니다.")
        w = w / s
    else:  # "pca"
        p = PCA(n_components=1, random_state=42).fit(Z)
        load = np.abs(p.components_[0])
        w = load / load.sum()

    df_out = df_year.copy()
    df_out["BDS"] = (Z * w).sum(axis=1)
    weights = {c: float(w[i]) for i, c in enumerate(cols)}
    return df_out, weights

# ---------- 메인: 최근 10년 시계열 + 지도 ----------
def build_timeseries_and_map(weight_mode="pca",
                             custom_weights: Optional[Dict[str,float]] = None,
                             min_year: int = 2000,
                             geojson_path: str = "./skorea-provinces-2018-geo.json",
                             geojson_name_field: Optional[str] = None,
                             out_csv: str = "bds_timeseries.csv",
                             out_html: str = "bds_choropleth.html"):
    """
    1) 각 표에서 넓은 구간(min_year~2100)을 한 번 호출하여 이용 가능한 연도 목록 확보
    2) 4지표 연도 교집합의 최댓값을 end_year로 채택 → 최근 10년(end_year-9 ~ end_year)
    3) 연도별로 4표 inner join → BDS 산출
    4) CSV/HTML 저장
    """
    # 1) 넓은 구간에서 지표별 연도 확보
    avail = {}
    wide_y0, wide_y1 = min_year, 2100
    for key, spec in TABLES.items():
        dfw = series_by_keyword(spec["org"], spec["tbl"], spec["keyword"], spec["prdSe"], wide_y0, wide_y1)
        years = sorted(dfw["period"].unique().tolist())
        if not years:
            raise RuntimeError(f"{key} 표에서 사용 가능한 연도가 없습니다. 표ID/항목/주기를 확인하세요.")
        avail[key] = years

    # 2) 4지표 연도 교집합 → end_year 및 최근 10년
    inter = set(avail["pc_grdp"])
    for k in ["pop_growth","elderly_rate","fiscal_indep"]:
        inter &= set(avail[k])
    if not inter:
        raise RuntimeError("4개 지표의 연도 교집합이 없습니다. 표/항목/주기를 확인하세요.")
    end_year = max(inter)
    years = [y for y in range(end_year-9, end_year+1) if y in inter]
    if not years:
        raise RuntimeError("최근 10년을 구성할 연도가 부족합니다.")
    print(f"[INFO] 사용 연도: {years[0]}–{years[-1]} (end_year={end_year})")

    # 3) 연도별 수집·병합
    frames = []
    for y in years:
        parts = []
        for col, spec in TABLES.items():
            df = series_by_keyword(spec["org"], spec["tbl"], spec["keyword"], spec["prdSe"], y, y)
            df = df.rename(columns={"value": col})
            parts.append(df[["region_name","period",col]])
        one = parts[0]
        for p in parts[1:]:
            one = one.merge(p, on=["region_name","period"], how="inner")
        if not one.empty:
            frames.append(one)

    if not frames:
        raise RuntimeError("연도별 병합 결과가 비었습니다.")
    base_ts = pd.concat(frames, ignore_index=True)
    base_ts["region_name"] = base_ts["region_name"].apply(normalize_region_kor)

    # 4) 연도별 BDS
    cols = ["pc_grdp","pop_growth","elderly_rate","fiscal_indep"]
    bds_frames, weights_by_year = [], []
    for y, dfy in base_ts.groupby("period"):
        scored, w = compute_bds_one_year(dfy, cols, weight_mode, custom_weights)
        bds_frames.append(scored)
        w["period"] = int(y)
        weights_by_year.append(w)

    bds_ts = pd.concat(bds_frames, ignore_index=True)
    bds_ts = bds_ts.sort_values(["period","BDS"], ascending=[True, False]).reset_index(drop=True)

    # 5) 저장
    OUTDIR.mkdir(parents=True, exist_ok=True)
    bds_ts.to_csv(OUTDIR/out_csv, index=False, encoding="utf-8-sig")
    with open(OUTDIR/"weights_by_year.json","w",encoding="utf-8") as f:
        json.dump({"weight_mode":weight_mode,
                   "weights_by_year":weights_by_year,
                   "years":years,
                   "end_year":int(end_year)}, f, ensure_ascii=False, indent=2)

    # 6) 지도(연도 슬라이더 코로플레스) — 한글/영문 자동 매칭 패치
    with open(geojson_path,"r",encoding="utf-8") as f:
        gj = json.load(f)

    # 시도명 필드 자동판별: 'CTP_KOR_NM'(한글) 우선, 없으면 'name'
    if geojson_name_field is None:
        sample_props = gj["features"][0]["properties"]
        if "CTP_KOR_NM" in sample_props:
            geojson_name_field = "CTP_KOR_NM"
        elif "name" in sample_props:
            geojson_name_field = "name"
        else:
            raise KeyError(f"GeoJSON에서 시도명 필드를 찾지 못했습니다. keys={list(sample_props.keys())}")

    # 'name'이 한글인지/영문인지 감지
    def _has_hangul(s: str) -> bool:
        s = str(s)
        return any("\uac00" <= ch <= "\ud7a3" for ch in s)

    name_samples = [feat["properties"][geojson_name_field] for feat in gj["features"]]
    uses_korean_names = any(_has_hangul(v) for v in name_samples)

    df_map = bds_ts.copy()

    if geojson_name_field == "CTP_KOR_NM":
        # 한글 시도명
        df_map["loc_key"] = df_map["region_name"].astype(str)
        featureidkey = "properties.CTP_KOR_NM"

    elif geojson_name_field == "name":
        if uses_korean_names:
            # name이 한글 → 영어 매핑 금지, 그대로 매칭
            df_map["loc_key"] = df_map["region_name"].astype(str)
        else:
            # name이 영문 → 한→영 매핑
            df_map["loc_key"] = df_map["region_name"].map(lambda x: KOR2ENG.get(x, x))
        featureidkey = "properties.name"

    else:
        # 기타 필드: region_name 그대로 매칭
        df_map["loc_key"] = df_map["region_name"].astype(str)
        featureidkey = f"properties.{geojson_name_field}"

    # 매칭 검증: GeoJSON에 없는 지역명 출력
    gj_name_set = {feat["properties"][geojson_name_field] for feat in gj["features"]}
    missing = sorted(set(df_map["loc_key"]) - gj_name_set)
    if missing:
        print("⚠ 매칭 실패 지역명(GeoJSON에 없음):", missing)

    fig = px.choropleth(
        df_map,
        geojson=gj,
        featureidkey=featureidkey,
        locations="loc_key",
        color="BDS",
        animation_frame="period",
        color_continuous_scale="Viridis",
        hover_name="region_name",
        hover_data={"pc_grdp":":.0f","pop_growth":":.2f","elderly_rate":":.2f",
                    "fiscal_indep":":.1f","loc_key":False}
    )
    fig.update_geos(fitbounds="locations", visible=False)
    fig.update_layout(title=f"BDS 최근 10년(가중치: {weight_mode})",
                      margin=dict(l=0, r=0, t=45, b=0))
    out_html_path = OUTDIR/out_html
    fig.write_html(str(out_html_path), include_plotlyjs="cdn")

    print(f"[완료] 시계열 CSV: {OUTDIR/out_csv}")
    print(f"[완료] 지도 HTML: {out_html_path}")

# ---- 실행 예시 ----
if __name__ == "__main__":
    # 준비한 시도 경계 GeoJSON 경로 지정
    # (예: 업로드한 파일명)
    GEOJSON_PATH = "./skorea-provinces-2018-geo.json"

    build_timeseries_and_map(
        weight_mode="pca",          # "equal" 또는 "custom"도 가능
        custom_weights=None,        # 예: {"pc_grdp":0.35,"pop_growth":0.25,"elderly_rate":0.15,"fiscal_indep":0.25}
        min_year=2000,
        geojson_path=GEOJSON_PATH,
        geojson_name_field=None,    # 수동지정하려면 "name" 또는 "CTP_KOR_NM"
        out_csv="bds_timeseries.csv",
        out_html="bds_choropleth.html"
    )


[INFO] 사용 연도: 2013–2022 (end_year=2022)
[완료] 시계열 CSV: outputs_timeseries/bds_timeseries.csv
[완료] 지도 HTML: outputs_timeseries/bds_choropleth.html
