In [6]:
# -*- coding: utf-8 -*-
"""
CSV(일 단위 날짜) ← NetCDF(시간 단위) 기상자료 결합
+ 결측값을 '가장 가까운 위/경도 격자'로 대체(Imputation) + flag

- CSV: wildfire_2019_2023_combine.csv
  - 날짜: RCPT_DT (예: 2019.1.20 / 2019.01.20 모두 허용)
  - 위치: latitude, longitude
- NC 폴더: /Users/igangsan/Desktop/climate (확장자 없이 저장된 NetCDF도 자동 감지)
- 출력: wildfire_with_daily_climate_imputed.csv
"""

import os
import re
import numpy as np
import pandas as pd
import xarray as xr

# ================== 경로/칼럼 설정 ==================
CSV_PATH = "/Users/igangsan/Desktop/forest_geo_added.csv"

CLIMATE_DIR   = "/Users/igangsan/Desktop/climate"


OUT_PATH = "/Users/igangsan/Desktop/forest_geo_climate_added.csv"

DATE_COL = "rcpt_dt"
LAT_COL  = "latitude"
LON_COL  = "longitude"

# 사용할 변수(있으면 사용)
VARS_WANT = ["t2m","d2m","u10","v10","tp","ssrd","pev"]
# 일 단위 결과 칼럼 이름(변환/집계 정의)
OUT_COLS = [
    "t2m_mean_C","t2m_min_C","t2m_max_C",
    "d2m_mean_C","d2m_min_C","d2m_max_C",
    "u10_mean_ms","v10_mean_ms","wind_speed_mean","wind_dir_deg",
    "tp_sum_mm","pev_sum_mm",
    "ssrd_sum_Jm2","ssrd_sum_MJm2",
]
# ====================================================

# ---------- 유틸 함수들 ----------
def k_to_c(x): return x - 273.15
def wind_speed(u, v): return np.sqrt(u**2 + v**2)
def wind_dir_from_uv(u, v):  # 0=북, 90=동 (바람 '불어오는' 방향)
    return (np.degrees(np.arctan2(-u, -v)) + 360.0) % 360.0

# 기존 to_date_series_flex() 완전히 교체
def to_date_series_flex(s: pd.Series) -> pd.Series:
    """
    문자열에서 '연-월-일'의 처음 3개 숫자 블록만 사용해 date로 변환.
    허용 예: 
      - 2019-01-20 04:23:23
      - 2019.1.20
      - 2019/01/20 4:10:00 PM
      - 20190120, 20190120042323, 201901200423
    """
    s = s.astype(str).str.strip()

    def parse_one(x: str):
        x = x.strip()
        if not x:
            return pd.NaT
        # 1) 순수 숫자: YYYYMMDD[HHMM[SS]]
        if x.isdigit():
            if len(x) >= 14:
                return pd.to_datetime(x[:14], format="%Y%m%d%H%M%S", errors="coerce")
            if len(x) == 12:
                return pd.to_datetime(x, format="%Y%m%d%H%M", errors="coerce")
            if len(x) == 8:
                return pd.to_datetime(x, format="%Y%m%d", errors="coerce")
        # 2) 그 외: 처음 3개의 숫자 토큰만 추출(연,월,일)
        toks = re.findall(r"\d+", x)
        if len(toks) >= 3:
            y, m, d = toks[0], toks[1], toks[2]
            ts = pd.to_datetime(f"{y}-{m}-{d}", errors="coerce")
            if ts is not None:
                return ts
        # 3) 일반 파서 (T/점/슬래시/공백 섞여도)
        x2 = x.replace("T", " ")
        ts = pd.to_datetime(x2, errors="coerce", infer_datetime_format=True)
        if pd.isna(ts):
            ts = pd.to_datetime(x2.replace(".", "-").replace("/", "-"),
                                errors="coerce", infer_datetime_format=True)
        return ts

    ts = s.apply(parse_one)
    return ts.dt.date

def looks_like_netcdf(path: str) -> bool:
    try:
        with open(path, "rb") as f:
            head = f.read(8)
        if head.startswith(b"CDF"):              # NetCDF classic/64-bit
            return True
        if head == b"\x89HDF\r\n\x1a\n":         # NetCDF4(HDF5)
            return True
    except Exception:
        pass
    if path.lower().endswith(".nc"):
        return True
    return False

def open_dataset_any(path: str):
    for eng in ["h5netcdf","scipy","netcdf4"]:
        try:
            return xr.open_dataset(path, engine=eng)
        except Exception:
            pass
    try:
        return xr.open_dataset(path, engine="cfgrib")   # (설치되어 있으면)
    except Exception:
        pass
    return None

def list_nc_files(dir_path: str):
    out = []
    for name in os.listdir(dir_path):
        p = os.path.join(dir_path, name)
        if os.path.isfile(p) and not name.startswith(".") and looks_like_netcdf(p):
            out.append(p)
    return sorted(out)

def nearest_index(arr_1d: np.ndarray, value: float) -> int:
    return int(np.nanargmin(np.abs(arr_1d - value)))

# ---------- CSV 로드 ----------
df = pd.read_csv(CSV_PATH)
missing = {DATE_COL, LAT_COL, LON_COL} - set(df.columns)
if missing:
    raise ValueError(f"CSV에 필요한 칼럼이 없습니다: {missing}")

df["_date"] = to_date_series_flex(df[DATE_COL])
if df["_date"].isna().any():
    examples = df.loc[df["_date"].isna(), DATE_COL].unique()[:5]
    raise ValueError(f"날짜 파싱 실패 예시(최대 5개): {examples}")

# ---------- NetCDF 열기 & 일 집계 ----------
nc_files = list_nc_files(CLIMATE_DIR)
if not nc_files:
    raise FileNotFoundError(f"NetCDF 후보 파일을 찾지 못했습니다: {CLIMATE_DIR}")

datasets = []
for fp in nc_files:
    ds1 = open_dataset_any(fp)
    if ds1 is not None:
        datasets.append(ds1)
if not datasets:
    raise RuntimeError("NetCDF 파일을 열 수 없습니다.")

ds = xr.combine_by_coords(datasets, combine_attrs="override")

time_name = "valid_time" if "valid_time" in ds.coords else ("time" if "time" in ds.coords else None)
if time_name is None:
    raise KeyError("Dataset에 시간 좌표(valid_time/time)가 없습니다.")

vars_present = [v for v in VARS_WANT if v in ds.data_vars]
if not vars_present:
    raise KeyError(f"필요 변수({VARS_WANT})가 없습니다. 실제 변수: {list(ds.data_vars)}")

agg = {}

if "t2m" in vars_present:
    agg["t2m_mean_C"] = k_to_c(ds["t2m"]).resample({time_name:"1D"}).mean()
    agg["t2m_min_C"]  = k_to_c(ds["t2m"]).resample({time_name:"1D"}).min()
    agg["t2m_max_C"]  = k_to_c(ds["t2m"]).resample({time_name:"1D"}).max()

if "d2m" in vars_present:
    agg["d2m_mean_C"] = k_to_c(ds["d2m"]).resample({time_name:"1D"}).mean()
    agg["d2m_min_C"]  = k_to_c(ds["d2m"]).resample({time_name:"1D"}).min()
    agg["d2m_max_C"]  = k_to_c(ds["d2m"]).resample({time_name:"1D"}).max()

if "u10" in vars_present and "v10" in vars_present:
    u_daily = ds["u10"].resample({time_name:"1D"}).mean()
    v_daily = ds["v10"].resample({time_name:"1D"}).mean()
    agg["u10_mean_ms"]     = u_daily
    agg["v10_mean_ms"]     = v_daily
    agg["wind_speed_mean"] = wind_speed(u_daily, v_daily)
    agg["wind_dir_deg"]    = xr.apply_ufunc(
        wind_dir_from_uv, u_daily, v_daily,
        vectorize=True, dask="parallelized", output_dtypes=[float]
    )

if "tp" in vars_present:
    agg["tp_sum_mm"]  = ds["tp"].resample({time_name:"1D"}).sum() * 1000.0
if "pev" in vars_present:
    agg["pev_sum_mm"] = ds["pev"].resample({time_name:"1D"}).sum() * 1000.0
if "ssrd" in vars_present:
    ssrd_daily = ds["ssrd"].resample({time_name:"1D"}).sum()
    agg["ssrd_sum_Jm2"]  = ssrd_daily
    agg["ssrd_sum_MJm2"] = ssrd_daily / 1e6

if not agg:
    raise RuntimeError("집계할 변수가 없습니다.")

daily = xr.Dataset(agg)

# 날짜 좌표를 단순 date로 바꿔 정합 ↑
daily_dates = pd.to_datetime(daily[time_name].values).date
daily = daily.assign_coords({time_name: daily_dates})

# ---------- 최근접 격자 결합 (1차) ----------
lat_vals = daily.latitude.values
lon_vals = daily.longitude.values
dates_avail = set(daily[time_name].values.tolist())  # python date 객체 집합

def pick_point(lat, lon, date_obj):
    """최근접 격자 1개에서 값 추출 (date가 없으면 NaN 반환)"""
    if (pd.isna(lat) or pd.isna(lon) or pd.isna(date_obj) or (date_obj not in dates_avail)):
        return {k: np.nan for k in daily.data_vars}
    try:
        ilat = nearest_index(lat_vals, float(lat))
        ilon = nearest_index(lon_vals, float(lon))
        p = daily.sel({time_name: date_obj}).isel(latitude=ilat, longitude=ilon)
        return {k: float(p[k].values) for k in daily.data_vars}
    except Exception:
        return {k: np.nan for k in daily.data_vars}

# 1차 값 붙이기
rows = df[[LAT_COL, LON_COL, "_date"]].to_dict(orient="records")
first_values = [pick_point(r[LAT_COL], r[LON_COL], r["_date"]) for r in rows]
met_df = pd.DataFrame(first_values)

# ---------- 결측 대체(가장 가까운 격자) ----------
# 주변 이웃 탐색 반경 설정(격자 인덱스 기준): 1~3칸까지 확대 탐색
MAX_RADIUS = 3
var_list = list(daily.data_vars)

# per-variable imputation flag 컬럼 준비
for v in var_list:
    met_df[f"{v}_imputed"] = False
# 행 전체에 대한 flag/반경 기록
met_df["any_imputed"] = False
met_df["impute_radius"] = 0  # 0=원래 최근접으로 채워짐(대체 없음)

# 빠르게 접근하기 위해 daily를 날짜별로 나눠두기
daily_by_date = {d: daily.sel({time_name: d}) for d in dates_avail}

def fill_from_neighbors(lat, lon, date_obj, current_row):
    """현재 행의 각 변수에 대해 NaN이면 반경을 늘리며 이웃 격자에서 첫 유효값으로 대체"""
    if (pd.isna(lat) or pd.isna(lon) or pd.isna(date_obj) or (date_obj not in daily_by_date)):
        return current_row, 0  # 대체 불가
    
    plane = daily_by_date[date_obj]  # (latitude, longitude) 2D 필드들
    nlat = plane.sizes["latitude"]
    nlon = plane.sizes["longitude"]

    # 기준 인덱스
    try:
        i0 = nearest_index(lat_vals, float(lat))
        j0 = nearest_index(lon_vals, float(lon))
    except Exception:
        return current_row, 0

    used_radius = 0
    # 변수별로 결측이면 이웃 탐색
    for v in var_list:
        if np.isnan(current_row[v]):
            found = False
            # 반경 1..MAX_RADIUS 탐색
            for r in range(1, MAX_RADIUS+1):
                # r-정사각형 테두리 이웃 스캔(바깥 테두리만)
                candidates = []
                for di in range(-r, r+1):
                    for dj in [-r, r]:
                        candidates.append((i0+di, j0+dj))
                for dj in range(-r+1, r):
                    for di in [-r, r]:
                        candidates.append((i0+di, j0+dj))
                # 후보 중 유효값 찾기
                for (ii, jj) in candidates:
                    if 0 <= ii < nlat and 0 <= jj < nlon:
                        val = float(plane[v].isel(latitude=ii, longitude=jj).values)
                        if not np.isnan(val):
                            current_row[v] = val
                            current_row[f"{v}_imputed"] = True
                            used_radius = max(used_radius, r)
                            found = True
                            break
                if found:
                    break
    return current_row, used_radius

# 결측이 있는 행들에 대해 이웃 격자 대체 수행
mask_missing_any = met_df[var_list].isna().any(axis=1)
for idx in met_df.index[mask_missing_any]:
    lat = df.at[idx, LAT_COL]
    lon = df.at[idx, LON_COL]
    date_obj = df.at[idx, "_date"]
    row_vals = met_df.loc[idx, var_list + [f"{v}_imputed" for v in var_list] + ["any_imputed","impute_radius"]].to_dict()
    # dict에서 필요한 키만 활용
    row_core = {k: row_vals[k] for k in var_list}
    row_core, radius_used = fill_from_neighbors(lat, lon, date_obj, row_core)
    # 결과 반영
    for v in var_list:
        met_df.at[idx, v] = row_core[v]
    if radius_used > 0:
        met_df.at[idx, "any_imputed"] = True
        met_df.at[idx, "impute_radius"] = radius_used
        for v in var_list:
            # 위에서 True로 바꿨을 수 있으니 그대로 둠; False면 유지
            pass

# ---------- 원본 단위 → 사람이 쓰기 좋은 칼럼으로 rename ----------
# daily.data_vars 기준으로 met_df는 원래 변수명(t2m, tp, ...)가 아니라
# 이미 우리가 만들어둔 일 집계 칼럼(agg) 이름들로 들어와 있음.
# (위 pick_point/plane가 daily(agg)에서 읽으므로 met_df 컬럼은 agg 키들 = OUT_COLS)
# 다만 엔진/환경에 따라 일부만 존재할 수 있으니 존재하는 것만 유지.

keep_cols = [c for c in OUT_COLS if c in met_df.columns]  # 안전하게

# ---------- 최종 병합 & 저장 ----------
# (원본 df에서 _date 제거 + 기상데이터 + imputation flag)

out = pd.concat(
    [df.drop(columns=["_date"]), met_df[keep_cols + ["any_imputed", "impute_radius"]]],
    axis=1
)

out.to_csv(OUT_PATH, index=False, encoding="utf-8-sig")
print(f"[완료] 대체+플래그 결과 저장: {OUT_PATH}")

# 간단 요약
n_any_imp = int(out["any_imputed"].sum())
print(f" - any_imputed=True 행 수: {n_any_imp}")
print(" - impute_radius 분포:")
print(out["impute_radius"].value_counts(dropna=False).sort_index())


[완료] 대체+플래그 결과 저장: /Users/igangsan/Desktop/forest_geo_climate_added.csv
 - any_imputed=True 행 수: 251
 - impute_radius 분포:
impute_radius
0    3309
1     235
2      16
Name: count, dtype: int64
