In [11]:
import os
import math
import json
import time
import requests
import pandas as pd
import numpy as np

from datetime import datetime, timedelta
from sklearn.ensemble import HistGradientBoostingRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPRegressor


# =========================
# 0) 설정
# =========================
CSV_PATH = "C:/taro/1차_프로젝트/통합데이터/SUWON_S_DATA_TABLE.csv"

# (1) 기상청 단기예보 API 서비스키 (공공데이터포털에서 발급)
KMA_SERVICE_KEY = "693f6b4e2b9cfa42ebd3871d65a4fde0319d263007fc2312aeb2e75cecef29aa"

# (2) 공휴일(특일정보) API 서비스키 (공공데이터포털에서 발급)
HOLIDAY_SERVICE_KEY = "693f6b4e2b9cfa42ebd3871d65a4fde0319d263007fc2312aeb2e75cecef29aa"

# 수원시 기본 위경도(원하는 위치로 바꿔도 됨)
DEFAULT_LAT = 37.2636
DEFAULT_LON = 127.0286  # 수원시 위경도 예시 :contentReference[oaicite:1]{index=1}

# API base URLs
KMA_BASE = "https://apis.data.go.kr/1360000/VilageFcstInfoService_2.0"
HOLI_BASE = "https://apis.data.go.kr/B090041/openapi/service/SpcdeInfoService"

# 캐시(같은 API 여러번 호출 방지)
CACHE_DIR = "./_api_cache"
os.makedirs(CACHE_DIR, exist_ok=True)


# =========================
# 1) 위경도 -> 기상청 격자(nx, ny) 변환 (LCC DFS)
#    (기상청 격자 변환 공식) :contentReference[oaicite:2]{index=2}
# =========================
def latlon_to_grid(lat: float, lon: float):
    RE = 6371.00877  # Earth radius (km)
    GRID = 5.0       # Grid spacing (km)
    SLAT1 = 30.0     # Projection latitude 1 (deg)
    SLAT2 = 60.0     # Projection latitude 2 (deg)
    OLON = 126.0     # Reference longitude (deg)
    OLAT = 38.0      # Reference latitude (deg)
    XO = 210 / GRID
    YO = 675 / GRID

    DEGRAD = math.pi / 180.0

    re = RE / GRID
    slat1 = SLAT1 * DEGRAD
    slat2 = SLAT2 * DEGRAD
    olon = OLON * DEGRAD
    olat = OLAT * DEGRAD

    sn = math.tan(math.pi * 0.25 + slat2 * 0.5) / math.tan(math.pi * 0.25 + slat1 * 0.5)
    sn = math.log(math.cos(slat1) / math.cos(slat2)) / math.log(sn)

    sf = math.tan(math.pi * 0.25 + slat1 * 0.5)
    sf = math.pow(sf, sn) * math.cos(slat1) / sn

    ro = math.tan(math.pi * 0.25 + olat * 0.5)
    ro = re * sf / math.pow(ro, sn)

    ra = math.tan(math.pi * 0.25 + (lat) * DEGRAD * 0.5)
    ra = re * sf / math.pow(ra, sn)

    theta = lon * DEGRAD - olon
    if theta > math.pi:
        theta -= 2.0 * math.pi
    if theta < -math.pi:
        theta += 2.0 * math.pi
    theta *= sn

    x = ra * math.sin(theta) + XO
    y = ro - ra * math.cos(theta) + YO

    return int(x + 1.5), int(y + 1.5)  # nx, ny


# =========================
# 2) 공휴일 API: 연/월 단위로 공휴일 가져오기
#    getRestDeInfo 사용 :contentReference[oaicite:3]{index=3}
# =========================
def fetch_holidays_for_year(year: int) -> set:
    holidays = set()

    for month in range(1, 13):
        params = {
            "ServiceKey": HOLIDAY_SERVICE_KEY,
            "solYear": str(year),
            "solMonth": f"{month:02d}",
            "_type": "json",
            "numOfRows": 100,
            "pageNo": 1
        }
        url = f"{HOLI_BASE}/getRestDeInfo"

        try:
            r = requests.get(url, params=params, timeout=20)

            # HTTP 에러는 공휴일=0으로 처리하고 계속
            if r.status_code != 200:
                print(f"[Holiday API] {year}-{month:02d} HTTP {r.status_code} -> 공휴일=0 처리")
                continue

            # 1) 우선 json 시도
            try:
                data = r.json()
            except Exception:
                # json 파싱 실패(보통 XML/문자열 에러 응답)
                print(f"[Holiday API] {year}-{month:02d} JSON 파싱 실패 -> 공휴일=0 처리")
                # 필요시 디버그:
                # print(r.text[:300])
                continue

            # 2) dict가 아니면(=str 등) 스킵
            if not isinstance(data, dict):
                print(f"[Holiday API] {year}-{month:02d} JSON이 dict가 아님({type(data)}) -> 공휴일=0 처리")
                continue

            # 3) 정상 구조가 아니면 스킵
            resp = data.get("response")
            if not isinstance(resp, dict):
                print(f"[Holiday API] {year}-{month:02d} response 구조 이상 -> 공휴일=0 처리")
                continue

            body = resp.get("body", {})
            if not isinstance(body, dict):
                print(f"[Holiday API] {year}-{month:02d} body 구조 이상 -> 공휴일=0 처리")
                continue

            items = body.get("items", {}).get("item", [])
            if isinstance(items, dict):
                items = [items]
            if not isinstance(items, list):
                print(f"[Holiday API] {year}-{month:02d} items 구조 이상 -> 공휴일=0 처리")
                continue

            for it in items:
                if not isinstance(it, dict):
                    continue
                locdate = str(it.get("locdate", ""))  # YYYYMMDD
                isHoliday = it.get("isHoliday", "N")
                if len(locdate) == 8 and isHoliday == "Y":
                    holidays.add(locdate)

            time.sleep(0.12)

        except Exception as e:
            print(f"[Holiday API] {year}-{month:02d} 오류: {e} -> 공휴일=0 처리")
            continue

    return holidays


# =========================
# 3) 기상청 단기예보 API에서 (특정 날짜, 특정 시각) TMP/PCP 추출
#    getVilageFcst :contentReference[oaicite:4]{index=4}
# =========================
def _parse_pcp(value: str) -> float:
    # PCP는 "강수없음", "1mm 미만", "30.0mm", "50.0mm 이상" 같은 문자열이 섞일 수 있음
    if value is None:
        return 0.0
    s = str(value).strip()
    if s in ["강수없음", "0", "0.0", "0mm", "0.0mm"]:
        return 0.0
    if "미만" in s:
        return 0.5
    if "이상" in s:
        num = "".join([c for c in s if (c.isdigit() or c == ".")])
        return float(num) if num else 50.0
    num = "".join([c for c in s if (c.isdigit() or c == ".")])
    return float(num) if num else 0.0


def fetch_kma_hourly_weather(target_date: pd.Timestamp, nx: int, ny: int) -> dict:
    """
    target_date(날짜) 기준으로 해당 날짜의 24시간(00~23)의 TMP/PCP를 dict로 반환
    반환 예: { '0000': {'TMP': 3.1, 'PCP': 0.0}, ... }
    """
    date_str = target_date.strftime("%Y%m%d")
    cache_path = os.path.join(CACHE_DIR, f"kma_{date_str}_{nx}_{ny}.json")
    if os.path.exists(cache_path):
        with open(cache_path, "r", encoding="utf-8") as f:
            return json.load(f)

    # 단기예보 base_time은 정해진 발표시각 체계가 있어 “당일 0500/0800/1100/1400/1700/2000/2300” 등으로 씀.
    # 여기선 단순히 "전날 2300"을 기본으로 시도 (실패 시 여러 base_time fallback)
    base_date = (target_date - pd.Timedelta(days=1)).strftime("%Y%m%d")
    fallback_times = ["2300", "2000", "1700", "1400", "1100", "0800", "0500"]

    url = f"{KMA_BASE}/getVilageFcst"
    out = {f"{h:02d}00": {"TMP": None, "PCP": None} for h in range(24)}

    for bt in fallback_times:
        params = {
            "serviceKey": KMA_SERVICE_KEY,
            "dataType": "JSON",
            "numOfRows": 3000,
            "pageNo": 1,
            "base_date": base_date,
            "base_time": bt,
            "nx": nx,
            "ny": ny
        }
        r = requests.get(url, params=params, timeout=30)
        if r.status_code != 200:
            continue
        data = r.json()
        items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
        if not items:
            continue

        # 해당 날짜(date_str)의 fcstTime별 TMP/PCP 채우기
        for it in items:
            if it.get("fcstDate") != date_str:
                continue
            t = it.get("fcstTime")  # "0200" 등
            cat = it.get("category")
            val = it.get("fcstValue")
            if t not in out:
                continue
            if cat == "TMP":
                try:
                    out[t]["TMP"] = float(val)
                except:
                    pass
            elif cat == "PCP":
                out[t]["PCP"] = _parse_pcp(val)

        # 충분히 채워졌으면 중단
        filled_tmp = sum(v["TMP"] is not None for v in out.values())
        if filled_tmp >= 18:  # 대충 대부분 채워지면 ok
            break

        time.sleep(0.15)

    # 남은 결측은 간단 대체: 평균/0
    tmps = [v["TMP"] for v in out.values() if v["TMP"] is not None]
    tmp_mean = float(np.mean(tmps)) if tmps else 10.0
    for k in out:
        if out[k]["TMP"] is None:
            out[k]["TMP"] = tmp_mean
        if out[k]["PCP"] is None:
            out[k]["PCP"] = 0.0

    with open(cache_path, "w", encoding="utf-8") as f:
        json.dump(out, f, ensure_ascii=False)

    return out


# =========================
# 4) 학습 데이터 만들기: (TA_YMD, HOUR) 단위로 UNIT/TEMP/RAIN 집계
#    + 공휴일 변수 추가
# =========================
def make_hourly_dataset(csv_path: str) -> pd.DataFrame:
    df = pd.read_csv(csv_path)

    df["TA_YMD"] = pd.to_datetime(df["TA_YMD"].astype(str), format="%Y%m%d", errors="coerce")
    df = df.dropna(subset=["TA_YMD"])

    # 시간대별로 집계: UNIT 합계, TEMP/RAIN 평균
    agg = (
        df.groupby(["TA_YMD", "HOUR"], as_index=False)
          .agg(
              UNIT=("UNIT", "sum"),
              TEMP=("TEMP", "mean"),
              RAIN=("RAIN", "mean"),
          )
          .sort_values(["TA_YMD", "HOUR"])
          .reset_index(drop=True)
    )

    # 공휴일
    years = sorted(agg["TA_YMD"].dt.year.unique().tolist())
    holiday_set = set()
    for y in years:
        holiday_set |= fetch_holidays_for_year(int(y))

    agg["date_yyyymmdd"] = agg["TA_YMD"].dt.strftime("%Y%m%d")
    agg["is_holiday"] = agg["date_yyyymmdd"].isin(holiday_set).astype(int)

    # 달력 feature
    dt = agg["TA_YMD"]
    agg["dow"] = dt.dt.dayofweek
    agg["month"] = dt.dt.month
    agg["day"] = dt.dt.day
    agg["weekofyear"] = dt.dt.isocalendar().week.astype(int)
    agg["is_weekend"] = (agg["dow"] >= 5).astype(int)

    doy = dt.dt.dayofyear
    agg["doy_sin"] = np.sin(2 * np.pi * doy / 365.25)
    agg["doy_cos"] = np.cos(2 * np.pi * doy / 365.25)

    return agg


# =========================
# 5) 시간대(HOUR)별 모델 학습
#    - ML: HistGradientBoostingRegressor
#    - DL: MLPRegressor (표준화 + 신경망)
# =========================
def train_models_by_hour(data: pd.DataFrame):
    feature_cols = [
        "dow", "month", "day", "weekofyear", "is_weekend", "is_holiday",
        "doy_sin", "doy_cos",
        "TEMP", "RAIN"
    ]

    models_ml = {}
    models_dl = {}

    # 시계열 holdout: 마지막 20% 날짜를 test로 (모델 학습 자체는 train만 사용)
    unique_dates = sorted(data["TA_YMD"].unique())
    split_idx = int(len(unique_dates) * 0.8)
    train_dates = set(unique_dates[:split_idx])
    test_dates = set(unique_dates[split_idx:])

    for hour in range(24):
        d = data[data["HOUR"] == hour].sort_values("TA_YMD").dropna(subset=feature_cols + ["UNIT"])

        train = d[d["TA_YMD"].isin(train_dates)]
        test = d[d["TA_YMD"].isin(test_dates)]

        if len(train) < 50 or len(test) < 20:
            continue

        X_train, y_train = train[feature_cols], train["UNIT"]
        X_test, y_test = test[feature_cols], test["UNIT"]

        # ML
        ml = HistGradientBoostingRegressor(
            learning_rate=0.06, max_depth=6, max_iter=300, random_state=42
        )
        ml.fit(X_train, y_train)
        pred_ml = ml.predict(X_test)

        # DL
        dl = Pipeline([
            ("scaler", StandardScaler()),
            ("mlp", MLPRegressor(
                hidden_layer_sizes=(64, 32),
                early_stopping=True,
                max_iter=1200,
                random_state=42
            ))
        ])
        dl.fit(X_train, y_train)
        pred_dl = dl.predict(X_test)

        # 간단 평가 출력
        ml_mae = np.mean(np.abs(y_test - pred_ml))
        dl_mae = np.mean(np.abs(y_test - pred_dl))

        print(f"[HOUR={hour:02d}] train={len(train)} test={len(test)} | MAE(ML)={ml_mae:,.1f} | MAE(DL)={dl_mae:,.1f}")

        models_ml[hour] = ml
        models_dl[hour] = dl

    return models_ml, models_dl, feature_cols


# =========================
# 6) 날짜만 입력 -> (24시간 예측 + 일합계)
#    데이터에 없는 날짜면: 기상청 API로 TMP/PCP를 채워 넣어 예측
# =========================
def predict_for_date(date_str: str, models_ml, models_dl, feature_cols, holiday_set: set,
                     lat=DEFAULT_LAT, lon=DEFAULT_LON):
    # 날짜 파싱
    s = date_str.strip()
    if len(s) == 8 and s.isdigit():
        target_date = pd.to_datetime(s, format="%Y%m%d")
    else:
        target_date = pd.to_datetime(s)

    # 공휴일
    ymd = target_date.strftime("%Y%m%d")
    is_holiday = 1 if ymd in holiday_set else 0

    # 달력 feature
    dow = int(target_date.dayofweek)
    month = int(target_date.month)
    day = int(target_date.day)
    weekofyear = int(target_date.isocalendar().week)
    is_weekend = 1 if dow >= 5 else 0
    doy = int(target_date.dayofyear)
    doy_sin = float(np.sin(2 * np.pi * doy / 365.25))
    doy_cos = float(np.cos(2 * np.pi * doy / 365.25))

    # 기상청 예보(시간별 TEMP/RAIN 대체)
    nx, ny = latlon_to_grid(lat, lon)
    hourly_weather = fetch_kma_hourly_weather(target_date, nx, ny)

    hourly_preds = []
    for hour in range(24):
        if hour not in models_ml or hour not in models_dl:
            continue

        hhmm = f"{hour:02d}00"
        temp = float(hourly_weather[hhmm]["TMP"])
        rain = float(hourly_weather[hhmm]["PCP"])

        row = pd.DataFrame([{
            "dow": dow, "month": month, "day": day, "weekofyear": weekofyear,
            "is_weekend": is_weekend, "is_holiday": is_holiday,
            "doy_sin": doy_sin, "doy_cos": doy_cos,
            "TEMP": temp, "RAIN": rain
        }], columns=feature_cols)

        pred_ml = float(models_ml[hour].predict(row)[0])
        pred_dl = float(models_dl[hour].predict(row)[0])
        pred_ml = max(0.0, pred_ml)
        pred_dl = max(0.0, pred_dl)

        hourly_preds.append({
            "date": ymd,
            "hour": hour,
            "temp": temp,
            "rain": rain,
            "pred_unit_ml": pred_ml,
            "pred_unit_dl": pred_dl,
        })

    result = pd.DataFrame(hourly_preds).sort_values("hour")
    daily_ml = result["pred_unit_ml"].sum()
    daily_dl = result["pred_unit_dl"].sum()

    return result, daily_ml, daily_dl


# =========================
# 7) 실행
# =========================
if __name__ == "__main__":
    # 1) 학습 데이터 생성
    data = make_hourly_dataset(CSV_PATH)

    # 2) 공휴일 set(예측 때 쓰려고)
    years = sorted(data["TA_YMD"].dt.year.unique().tolist())
    holiday_set = set()
    for y in years:
        holiday_set |= fetch_holidays_for_year(int(y))

    # 3) 시간대별 모델 학습
    models_ml, models_dl, feature_cols = train_models_by_hour(data)

    # 4) 날짜 입력 예측
    user_date = input("\n예측할 날짜 입력 (YYYY-MM-DD 또는 YYYYMMDD): ")
    # 예측 연도가 학습 데이터 밖이면 공휴일도 추가로 로딩
    target_year = int(pd.to_datetime(user_date).year)
    if target_year not in years:
        holiday_set |= fetch_holidays_for_year(target_year)

    hourly_df, daily_ml, daily_dl = predict_for_date(
        user_date,
        models_ml=models_ml,
        models_dl=models_dl,
        feature_cols=feature_cols,
        holiday_set=holiday_set,
        lat=DEFAULT_LAT, lon=DEFAULT_LON
    )

    print("\n[시간대별 예측 결과(일부)]")
    print(hourly_df.head(10).to_string(index=False))

    print(f"\n✅ 일합계 예상 UNIT (ML): {daily_ml:,.0f}")
    print(f"✅ 일합계 예상 UNIT (DL): {daily_dl:,.0f}")

    # 저장 원하면:
    out_path = f"pred_{pd.to_datetime(user_date).strftime('%Y%m%d')}.csv"
    hourly_df.to_csv(out_path, index=False, encoding="utf-8-sig")
    print(f"\n(저장됨) {out_path}")


[Holiday API] 2023-02 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-04 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-07 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-11 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2024-07 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2024-11 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2025-02 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2025-04 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2025-07 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2025-09 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2025-11 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-02 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-04 오류: 'str' object has no attribute 'get' -> 공휴일=0 처리
[Holiday API] 2023-07 오류: 'str' object



[HOUR=04] train=828 test=207 | MAE(ML)=45,363.1 | MAE(DL)=63,340.6




[HOUR=05] train=828 test=207 | MAE(ML)=54,665.2 | MAE(DL)=88,909.6




[HOUR=06] train=828 test=207 | MAE(ML)=74,052.8 | MAE(DL)=101,239.4
[HOUR=07] train=828 test=207 | MAE(ML)=70,893.5 | MAE(DL)=556,300.2
[HOUR=08] train=828 test=207 | MAE(ML)=89,916.3 | MAE(DL)=646,548.9
[HOUR=09] train=828 test=207 | MAE(ML)=108,498.1 | MAE(DL)=568,860.0
[HOUR=10] train=828 test=207 | MAE(ML)=100,151.3 | MAE(DL)=317,465.5

[시간대별 예측 결과(일부)]
    date  hour  temp  rain  pred_unit_ml  pred_unit_dl
20251223     1   0.0   0.0 265503.001392      7.333906
20251223     2   1.0   0.0 257743.073489      7.535826
20251223     3   1.0   0.0 317952.973259      7.527476
20251223     4   1.0   0.0 510322.192420 456612.111227
20251223     5   1.0   0.0 751622.573422 516725.881526
20251223     6   1.0   0.0 659850.831061 489358.110727
20251223     7   1.0   0.0 641497.531754      7.358805
20251223     8   2.0   0.0 783980.151302      7.411019
20251223     9   3.0   0.0 531352.595276      7.547920
20251223    10   5.0   0.0 497176.732305      7.287357

✅ 일합계 예상 UNIT (ML): 5,217,002
✅ 일합

In [6]:
# =========================
# 7) 실행
# =========================
if __name__ == "__main__":
    # 1) 학습 데이터 생성
    data = make_hourly_dataset(df)

    # 2) 공휴일 set(예측 때 쓰려고)
    years = sorted(data["TA_YMD"].dt.year.unique().tolist())
    holiday_set = set()
    for y in years:
        holiday_set |= fetch_holidays_for_year(int(y))

    # 3) 시간대별 모델 학습
    models_ml, models_dl, feature_cols = train_models_by_hour(data)

    # 4) 날짜 입력 예측
    user_date = input("\n예측할 날짜 입력 (YYYY-MM-DD 또는 YYYYMMDD): ")
    # 예측 연도가 학습 데이터 밖이면 공휴일도 추가로 로딩
    target_year = int(pd.to_datetime(user_date).year)
    if target_year not in years:
        holiday_set |= fetch_holidays_for_year(target_year)

    hourly_df, daily_ml, daily_dl = predict_for_date(
        user_date,
        models_ml=models_ml,
        models_dl=models_dl,
        feature_cols=feature_cols,
        holiday_set=holiday_set,
        lat=DEFAULT_LAT, lon=DEFAULT_LON
    )

    print("\n[시간대별 예측 결과(일부)]")
    print(hourly_df.head(10).to_string(index=False))

    print(f"\n✅ 일합계 예상 UNIT (ML): {daily_ml:,.0f}")
    print(f"✅ 일합계 예상 UNIT (DL): {daily_dl:,.0f}")

    # 저장 원하면:
    out_path = f"pred_{pd.to_datetime(user_date).strftime('%Y%m%d')}.csv"
    hourly_df.to_csv(out_path, index=False, encoding="utf-8-sig")
    print(f"\n(저장됨) {out_path}")


TypeError: make_hourly_dataset() takes 0 positional arguments but 1 was given