In [49]:
import pandas as pd
import numpy as np
from scipy import stats

EPS = 1e-9

In [50]:
def _safe_div(a, b, eps=EPS):
    """안전한 나눗셈"""
    return a / (b + eps)


def _rolling_slope(s, window):
    """Rolling window에서 선형회귀 기울기 계산"""
    def _slope(x):
        x = np.asarray(x, dtype=float)
        if np.any(~np.isfinite(x)) or len(x) < 2:
            return np.nan
        y = np.arange(len(x), dtype=float)
        slope, _, _, _, _ = stats.linregress(y, x)
        return slope
    
    return s.rolling(window, min_periods=window).apply(_slope, raw=True)

In [51]:
def resample_5min(
    df,
    time_col=None,
    rule="5min",
    sum_cols=None,
    mean_cols=None,
    extra_mean_cols=(),
    interp_limit: int = 12,
):
    """
    1분(or irregular) -> 5분 리샘플링.
    """
    x = df.copy()

    if time_col is not None:
        x[time_col] = pd.to_datetime(x[time_col])
        x = x.set_index(time_col)
    if not isinstance(x.index, pd.DatetimeIndex):
        raise ValueError("df must have a DatetimeIndex or provide time_col.")
    x = x.sort_index()

    # 숫자화
    all_cols = set(sum_cols or []) | set(mean_cols or []) | set(extra_mean_cols)
    for c in all_cols:
        if c in x.columns:
            x[c] = pd.to_numeric(x[c], errors="coerce")

    # 집계 dict
    agg = {}
    for c in sum_cols or []:
        if c in x.columns:
            agg[c] = "sum"
    for c in list(mean_cols or []) + list(extra_mean_cols):
        if c in x.columns:
            agg[c] = "mean"

    if not agg:
        raise ValueError("No columns found to resample.")

    # 리샘플
    out = x.resample(rule).agg(agg)

    return out

#### 수위-유량 기반 특성

- level_sum = level_A+level_B

- level_diff = level_A−level_B

- lag: *_lag1, lag2, lag3, lag6, lag12, lag36

- rolling (shift(1) 후 계산):
```
평균: _rmean{w}

표준편차: _rstd{w}

범위: _rmin{w}, _rmax{w}

변동폭: _rIQR{w} (Q90 − Q10)

추세: _rslope{w} (선형회귀 기울기)
```
-> 유입 유량의 관성, 추세, 변동성

In [52]:
def add_lag_rolling_features(
    df,
    base_cols,
    lags=(1, 2, 3, 6, 12, 36),
    roll_windows=(3, 6, 12, 36),
    slope_windows=(6, 12),
):
    """수위 lag & rolling 특성 생성 (성능 최적화)"""
    new_cols = {}
    
    for c in base_cols:
        if c not in df.columns:
            continue

        # lag
        for L in lags:
            new_cols[f"{c}_lag{L}"] = df[c].shift(L)

        # rolling (shift(1))
        s = df[c].shift(1)
        for w in roll_windows:
            r = s.rolling(w, min_periods=w)
            new_cols[f"{c}_rmean{w}"] = r.mean()
            new_cols[f"{c}_rstd{w}"] = r.std()
            new_cols[f"{c}_rmin{w}"] = r.min()
            new_cols[f"{c}_rmax{w}"] = r.max()
            new_cols[f"{c}_rIQR{w}"] = r.quantile(0.90) - r.quantile(0.10)

        for w in slope_windows:
            new_cols[f"{c}_rslope{w}"] = _rolling_slope(s, w)
    
    # 한 번에 결합
    return pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)

### 강우 특성

#### 공간 통합

- RN_15m_mean / max / min / std / spread

- RN_60m_mean / …

- RN_12H_mean / …

- RN_DAY_mean / …

#### 강우 형태

- RN15_div_RN60: 단기 집중도(소나기형 vs 지속강우)

#### 선행강우지수(ARI)

- ARI_tau6(30분)

- ARI_tau12(60분)

- ARI_tau24(120분)

ARI(t) = Σ rain(t−i) · exp(−i/τ): 강우의 지연/누적 효과 반영

#### 건조/습윤 상태

- wet_flag

- dry_spell_minutes

→ 동일 강우량이라도 선행 건조 상태에 따라 유입 반응이 달라짐

In [53]:
def add_weather_rain_features(
    df,
    stations=(368, 541, 569),
    rain_cols=("RN_15m", "RN_60m", "RN_12H", "RN_DAY"),
    met_cols=("TA", "HM", "TD"),
    ari_source="RN_60m",
    ari_taus=(6, 12, 24),   # 5분 단위 tau(스텝)
    wet_thr=0.1,
):
    """강우 & 기상 특성 생성 (성능 최적화)"""
    new_cols = {}

    # 지점별 dewpoint depression + 상호작용(기상 내부)
    for sid in stations:
        ta = f"TA_{sid}"
        td = f"TD_{sid}"
        hm = f"HM_{sid}"
        if ta in df.columns and td in df.columns:
            new_cols[f"TA_minus_TD_{sid}"] = df[ta] - df[td]
            if hm in df.columns:
                new_cols[f"TAxHM_{sid}"] = df[ta] * df[hm]
                new_cols[f"(TA-TD)xHM_{sid}"] = new_cols[f"TA_minus_TD_{sid}"] * df[hm]

        rn15 = f"RN_15m_{sid}"
        rn60 = f"RN_60m_{sid}"
        if rn15 in df.columns and rn60 in df.columns:
            new_cols[f"RN15_div_RN60_{sid}"] = _safe_div(df[rn15], df[rn60])
    
    # 먼저 지점별 특성 추가
    out = pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)
    new_cols = {}

    # 공간 통합 stats
    def _spatial_stats(base_name):
        cols = [f"{base_name}_{sid}" for sid in stations if f"{base_name}_{sid}" in out.columns]
        if len(cols) == 0:
            return
        new_cols[f"{base_name}_mean"] = out[cols].mean(axis=1)
        new_cols[f"{base_name}_max"] = out[cols].max(axis=1)
        new_cols[f"{base_name}_min"] = out[cols].min(axis=1)
        new_cols[f"{base_name}_std"] = out[cols].std(axis=1)
        new_cols[f"{base_name}_spread"] = new_cols[f"{base_name}_max"] - new_cols[f"{base_name}_min"]

    for rc in rain_cols:
        _spatial_stats(rc)
    for mc in met_cols:
        _spatial_stats(mc)
    _spatial_stats("TA_minus_TD")
    
    # 공간 통합 특성 추가
    out = pd.concat([out, pd.DataFrame(new_cols, index=out.index)], axis=1)
    new_cols = {}

    # dry spell / wet flag는 평균 강우로
    rain_ref = out["RN_60m_mean"] if "RN_60m_mean" in out.columns else None
    if rain_ref is not None:
        new_cols["wet_flag"] = (rain_ref > wet_thr).astype(np.int8)
        dry = (new_cols["wet_flag"] == 0).astype(np.int8)
        grp = (new_cols["wet_flag"] == 1).cumsum()
        new_cols["dry_spell_steps"] = dry.groupby(grp).cumsum()
        new_cols["dry_spell_minutes"] = new_cols["dry_spell_steps"] * 5
    
    # wet/dry 특성 추가
    out = pd.concat([out, pd.DataFrame(new_cols, index=out.index)], axis=1)
    new_cols = {}

    # ARI: shift(1)로 현재 강우 사용 금지
    m = f"{ari_source}_mean"
    rain_for_ari = out[m] if m in out.columns else rain_ref
    if rain_for_ari is not None:
        rr = rain_for_ari.shift(1)
        for tau in ari_taus:
            klen = int(6 * tau)
            w = np.exp(-np.arange(klen) / float(tau))
            w = w / (w.sum() + EPS)

            def _ari(x):
                x = np.asarray(x, dtype=float)
                if np.any(~np.isfinite(x)):
                    return np.nan
                ww = w[-len(x):]
                return float((x * ww).sum())

            new_cols[f"ARI_tau{tau}"] = rr.rolling(klen, min_periods=klen).apply(_ari, raw=True)
    
    # ARI 특성 추가
    if new_cols:
        out = pd.concat([out, pd.DataFrame(new_cols, index=out.index)], axis=1)

    return out

#### 기상 특성 (Meteorology)

- TA_minus_TD : 이슬점 감차 → 대기 포화도

- TAxHM

- (TA_minus_TD)xHM

→ 증발·토양 포화·지표 유출 조건을 간접 반영

#### 강우 × 수위 상호작용

- rain_x_levelsum_lag1 = RN_60m_mean(t−1) × level_sum(t−1)

→ 같은 비라도 수위가 높을수록 유입 증가 가능성 큼

In [54]:
def add_interaction_features(df):
    """강우 × 수위 상호작용 특성 (성능 최적화)"""
    new_cols = {}
    
    # rain_x_levelsum_lag1
    if "RN_60m_mean" in df.columns and "level_sum" in df.columns:
        new_cols["rain_x_levelsum_lag1"] = df["RN_60m_mean"].shift(1) * df["level_sum"].shift(1)
    
    # 추가 상호작용
    if "RN_15m_mean" in df.columns and "level_sum" in df.columns:
        new_cols["rain15_x_levelsum_lag1"] = df["RN_15m_mean"].shift(1) * df["level_sum"].shift(1)
    
    if "wet_flag" in df.columns and "level_sum" in df.columns:
        new_cols["wet_x_levelsum"] = df["wet_flag"].shift(1) * df["level_sum"].shift(1)
    
    return pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)

#### 시간 주기성(Time Features)

하수 유입은 강한 일중·주간 패턴을 가짐.

- tod_sin, tod_cos : 하루 주기(1440분)

- dow_sin, dow_cos : 요일 주기

- is_weekend

→ 외생 변수 없이도 기본 베이스라인 성능 확보에 중요

In [55]:
def add_time_features(df):
    """시간 주기성 특성 생성 (성능 최적화)"""
    idx = df.index
    tod = (idx.hour * 60 + idx.minute).astype(float)
    dow = idx.dayofweek.astype(float)

    new_cols = {
        "tod_sin": np.sin(2 * np.pi * tod / 1440.0),
        "tod_cos": np.cos(2 * np.pi * tod / 1440.0),
        "dow_sin": np.sin(2 * np.pi * dow / 7.0),
        "dow_cos": np.cos(2 * np.pi * dow / 7.0),
        "is_weekend": (idx.dayofweek >= 5).astype(np.int8)
    }
    
    return pd.concat([df, pd.DataFrame(new_cols, index=df.index)], axis=1)

### 전체 특성 생성 파이프라인

In [56]:
def make_modelFLOW_features(
    df,
    time_col="SYS_TIME",
    do_resample=True,
    rule="5min",
    stations=(368, 541, 569),
    rain_cols=("RN_15m", "RN_60m", "RN_12H", "RN_DAY"),
    met_cols=("TA", "HM", "TD"),
    level_cols=("level_TankA", "level_TankB"),
    lags=(1, 2, 3, 6, 12, 36),
    roll_windows=(3, 6, 12, 36),
    slope_windows=(6, 12),
    ari_taus=(6, 12, 24),
    wet_thr=0.1,
):
    
    # 1. 리샘플링
    if do_resample:
        # 강수는 sum, 나머지는 mean
        sum_cols = tuple(f"{rc}_{sid}" for sid in stations for rc in rain_cols)
        mean_cols = (
            tuple(f"{mc}_{sid}" for sid in stations for mc in met_cols) +
            level_cols
        )
        df = resample_5min(df, time_col=time_col, rule=rule, sum_cols=sum_cols, mean_cols=mean_cols)
        time_col = None
    
    # 2. 인덱스 설정
    x = df.copy()
    if time_col is not None:
        x[time_col] = pd.to_datetime(x[time_col])
        x = x.set_index(time_col)
    if not isinstance(x.index, pd.DatetimeIndex):
        raise ValueError("df must have a DatetimeIndex or provide time_col.")
    x = x.sort_index()
    
    # 3. 기본 특성 생성
    # level_sum, level_diff
    if all(c in x.columns for c in level_cols):
        x["level_sum"] = x[level_cols[0]] + x[level_cols[1]]
        x["level_diff"] = x[level_cols[0]] - x[level_cols[1]]
    
    # 4. 수위 lag & rolling 특성 
    base_cols = list(level_cols) + ["level_sum", "level_diff"]
    base_cols = [c for c in base_cols if c in x.columns]
    x = add_lag_rolling_features(x, base_cols, lags, roll_windows, slope_windows)
    
    # 5. 강우 & 기상 특성
    x = add_weather_rain_features(x, stations, rain_cols, met_cols, "RN_60m", ari_taus, wet_thr)
    
    # 6. 상호작용 특성
    x = add_interaction_features(x)
    
    # 7. 시간 특성
    x = add_time_features(x)
    
    # 8. 정리
    x = x.replace([np.inf, -np.inf], np.nan)
    
    return x

In [57]:
# 데이터 로드
flow = pd.read_csv("../../data/processed/FLOW_cleaned.csv")
aws = pd.read_csv("../../data/processed/AWS_cleaned.csv")

flow['SYS_TIME'] = pd.to_datetime(flow['SYS_TIME'])
aws['SYS_TIME'] = pd.to_datetime(aws['SYS_TIME'])

flow = flow.sort_values('SYS_TIME')
aws = aws.sort_values('SYS_TIME')

# 병합 (asof merge)
df = pd.merge_asof(flow, aws, on='SYS_TIME', direction='backward', tolerance=pd.Timedelta('1min'))

In [58]:
# 특성 생성
X = make_modelFLOW_features(df, time_col="SYS_TIME", do_resample=True, rule="5min")

# 타겟 변수 생성 (5분 후 유량 예측)
y_flowA = resample_5min(df, time_col="SYS_TIME", extra_mean_cols=("flow_TankA",))["flow_TankA"].shift(-1)
y_flowB = resample_5min(df, time_col="SYS_TIME", extra_mean_cols=("flow_TankB",))["flow_TankB"].shift(-1)

# 데이터 결합
data = X.join(pd.DataFrame({"y_flowA": y_flowA, "y_flowB": y_flowB})).dropna()

In [59]:
# 데이터 확인
data.shape

(26193, 205)

In [60]:
data.to_csv("../../data/processed/modelFLOW_dataset.csv", index=True)