In [1]:
from dataclasses import dataclass
import numpy as np
import polars as pl

MAC_DIR = '/Users/igwanhyeong/PycharmProjects/data_research/raw_data/'

A_dict = {
    'oper_part_no': ['A' for _ in range(8)],
    'demand_dt': [i for i in range(202401, 202409)],
    'demand_qty': [float(i) for i in range(8)],
    'gbm_cd': ['VD' for _ in range(8)]
}

A_pl = pl.DataFrame(A_dict)

B_dict = {
    'oper_part_no': ['B' for _ in range(8)],
    'demand_dt': [i for i in range(202501, 202509)],
    'demand_qty': [float(i) for i in range(8)],
    'gbm_cd': ['VD' for _ in range(8)]
}
B_pl = pl.DataFrame(B_dict)


target_dyn_demand_monthly = pl.read_parquet(MAC_DIR + 'target_dyn_demand_monthly.parquet').with_columns(pl.lit('VD').alias('gbm_cd'))
df = pl.concat([target_dyn_demand_monthly, A_pl, B_pl])

df = df.rename({'oper_part_no': 'part_no', 'demand_dt': 'yyyymm', 'demand_qty': 'qty'})

In [2]:
keys = df.select('gbm_cd').unique().to_series().to_list()
frames = df.partition_by('gbm_cd', maintain_order = True)
by_bu = dict(zip(keys, frames))

In [3]:
# ltb/forecasting/short_series/policy_config.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class ShortSeriesPolicyConfig:
    history_threshold: int = 6               # 6 미만이면 ShortSeries 라우팅
    donor_k: int = 5                         # 유사 도너 개수
    donor_min_k: int = 3                     # 최소 확보 도너 개수
    donor_same_group_only: bool = True       # 같은 그룹(BU/카테고리 등)에서만 도너 탐색

    # 계층 점유율(share) EMA + 수축
    share_lambda: float = 0.7                # EMA 가중치 (0.6~0.8 권장)
    share_floor: float = 0.0                 # 점유율 하한
    share_ceil: float = 1.0                  # 점유율 상한

    # 글로벌 풀링 베이스라인(간이 Theta/감쇠 추세)
    damped_r_min: float = 0.85
    damped_r_max: float = 0.98

    # 가드레일/윈저라이즈
    max_step_up: float = 0.10                # +10%
    max_step_down: float = 0.40              # -40%
    winsor_q_low: float = 0.05
    winsor_q_high: float = 0.95
    winsor_mul: float = 4.0
    winsor_growth: float = 3.0               # 비정상 증가 억제 계수
    floor_qty: float = 0.0                   # 의무보유/최소 수요 하한 (있으면 설정)

    # 정량화(불확실성) 기본
    quantiles: tuple = (0.1, 0.5, 0.9)

    # 기타
    random_state: Optional[int] = 42

In [4]:
'''
donor_bank.py
'''
from typing import List, Tuple, Dict, Any

@dataclass
class Donor:
    group_id: str
    series: np.ndarray # 전체 시계열 (길이 >= 5 권장)

class DonorBank:
    def __init__(self, donors: List[Donor]) -> None:
        self.donors = donors

    @staticmethod
    def _best_k_windows(target_5: np.ndarray, donor_series: np.ndarray, k: int) -> Tuple[List[int], List[float]]:
        L = len(donor_series)
        if L < 9:
            return [], []

        scores = []
        idxs = []
        for start in range(0, L - 9):
            window = donor_series[start: start + 9]
            if np.std(window) == 0 or np.std(target_5) == 0:
                corr = 0.0
            else:
                corr = float(np.corrcoef(target_5, window)[0, 1])

            idxs. append(start)
            scores.append(corr)

        order = np.argsort(scores)[::-1][:k]
        return [idxs[i] for i in order], [scores[i] for i in order]

    def find_analogs(
            self,
            target_5: np.ndarray,
            meta: Dict[str, Any],
            k: int = 5,
            same_group_only: bool = True
    ) -> List[Dict[str, Any]]:
        group_id = meta.get('group_id', None)
        candidates = [d for d in self.donors if (not same_group_only) or (d.group_id == group_id)]

        matches = []
        for d in candidates:
            idxs, scores = self._best_k_windows(target_5, d.series, k = 1)
            if not idxs:
                continue
            start = idxs[0]
            score = scores[0]

            matches.append({
                'group_id': d.group_id,
                'series': d.series,
                'start': start,
                'score': score
            })

        matches = sorted(matches, key = lambda k: k['score'], reverse = True)[:k]
        return matches

    @staticmethod
    def analog_transfer(
            target_5: np.ndarray,
            matches: List[Dict[str, Any]],
            horizon: int,
            scale_mode: str = 'last2_mean'
    ) -> Tuple[np.ndarray, np.ndarray]:
        if not matches:
            return np.array([]), np.empty((0, horizon))

        donor_preds = []
        for m in matches:
            ds = m['series']
            st = m['start']
            future = ds[st+9: st+9+horizon]
            if len(future) < horizon:
                if len(future) == 0:
                    future = np.zeros(horizon, dtype=float)
                else:
                    last = future[-1]
                    future = np.concatenate([future, np.full(horizon - len(future), last, dtype = float)])
            donor_preds.append(future)
        donor_stack = np.vstack(donor_preds)

        if scale_mode == 'last2_mean':
            t_mean = float(np.mean(target_5[-2:])) if np.any(target_5[-2:]) else float(np.mean(target_5))
            d_means = np.mean(donor_stack[:, :2], axis = 1)
            d_mean = float(np.mean(d_means)) if np.any(d_means) else max(float(np.mean(donor_stack[:, :2])), 1e-6)
            s = t_mean / max(d_mean, 1e-6)
        else:
            s = 1.0

        y_hat = s * np.mean(donor_stack, axis = 0)
        return y_hat, donor_stack

In [5]:
'''
hierarchical_allocate.py
'''

def ema_shrunk_share(last_share: float, group_mean_share: float, lam: float) -> float:
    s = lam * float(last_share) + (1.0 - lam) * float(group_mean_share)
    return float(np.clip(s, 0.0, 1.0))

def allocate_from_group_forecast(
        group_forecast: np.ndarray,
        last_share: float,
        group_mean_share: float,
        lam: float,
        floor: float = 0.0,
        ceil: float = 1.0
) -> np.ndarray:
    s_next = ema_shrunk_share(last_share, group_mean_share, lam)
    s_next = float(np.clip(s_next, floor, ceil))
    return s_next

In [6]:
# yyyymm <-> (year, month) 변환과 월 더하기
def split_yyyymm(yyyymm: int) -> tuple[int, int]:
    return yyyymm // 100, yyyymm % 100

def join_yyyymm(year: int, month: int) -> int:
    return year * 100 + month

def add_months_yyyymm(yyyymm: int, k: int) -> int:
    y, m = split_yyyymm(yyyymm)
    m_new = m + k
    y += (m_new - 1) // 12
    m = ((m_new - 1) % 12) + 1
    return join_yyyymm(y, m)

def month_range_yyyymm(start_yyyymm: int, end_yyyymm: int) -> list[int]:
    # start <= ... <= end 포함 범위
    out = []
    cur = start_yyyymm
    while cur <= end_yyyymm:
        out.append(cur)
        cur = add_months_yyyymm(cur, 1)
    return out


def fill_missing_months_per_part(df_bu: pl.DataFrame) -> pl.DataFrame:
    # 사업부 전체 yyyymm 범위
    bounds = df_bu.select(
        pl.col("yyyymm").min().alias("min_ym"),
        pl.col("yyyymm").max().alias("max_ym")
    ).row(0)
    min_ym, max_ym = int(bounds[0]), int(bounds[1])

    full_months = month_range_yyyymm(min_ym, max_ym)  # 이전에 정의한 유틸
    months_df = pl.DataFrame({"yyyymm": full_months})

    parts_df = df_bu.select("part_no").unique()

    # part_no × yyyymm 전체 격자
    full_idx = parts_df.join(months_df, how="cross")

    # 원본을 LEFT JOIN 후 결측 0 채움
    filled = (
        full_idx.join(
            df_bu.select("part_no", "yyyymm", "qty"),
            on=["part_no", "yyyymm"],
            how="left",
        )
        .with_columns(pl.col("qty").fill_null(0.0))
        .sort(["part_no", "yyyymm"])
    )
    return filled

filled_by_bu = {bu: fill_missing_months_per_part(df_bu) for bu, df_bu in by_bu.items()}

In [7]:
H = 120
cfg = ShortSeriesPolicyConfig(
    history_threshold = 6, donor_k = 5, donor_min_k = 3,
    max_step_up = 0.10, max_step_down = 0.40,
    winsor_q_low = 0.05, winsor_q_high = 0.95,
    winsor_mul = 4.0, winsor_growth=3.0,
    floor_qty = 0.0
)


# 해당 메서드에서 과연 len(series)를 >= 9 + H로 맞춰야 하는가?
def build_donor_bank(df_bu_filled: pl.DataFrame, group_id: str) -> DonorBank:
    donors = []
    for pno, g in df_bu_filled.group_by('part_no'):
        series = g.sort('yyyymm')['qty'].to_numpy()
        if len(series) >= 9:
            donors.append(Donor(group_id = group_id, series = series.astype(float)))
    return DonorBank(donors)

def build_group_series(df_bu_filled: pl.DataFrame) -> tuple[np.ndarray, list[int]]:
    grp = (df_bu_filled.group_by('yyyymm')
            .agg(pl.col('qty').sum().alias('qty'))
            .sort('yyyymm')
           )
    return grp['qty'].to_numpy(), grp['yyyymm'].to_list()

In [8]:
# 예측 월 계산(fcst_yyyymm)
def forecast_months(last_yyyymm: int, H: int) -> list[int]:
    return [add_months_yyyymm(last_yyyymm, i + 1) for i in range(H)]

# 점유율 계산(동일 yyyymm 조인)
def compute_shares_item_vs_group(item_tbl: pl.DataFrame, group_tbl: pl.DataFrame) -> np.ndarray:
    j = (item_tbl
         .join(group_tbl.rename({'qty': 'qty_grp'}), on = 'yyyymm', how = 'inner')
         .select((pl.col('qty') / pl.col('qty_grp').clip(lower_bound = 1e-6)).alias('share'))
         )
    return j['share'].to_numpy()

def simple_damped_trend_forecast(
    history: np.ndarray,
    horizon: int,
    r: float
) -> np.ndarray:
    """
    간이 Theta/감쇠 추세 대체:
    - 레벨 L0 = 최근 2~3개월 평균
    - 기울기 b = 최근 차분의 중앙값
    - 감쇠 r (0.85~0.98)
    """
    h = history.astype(float)
    L0 = float(np.mean(h[-2:])) if len(h) >= 2 else float(np.mean(h))
    diffs = np.diff(h) if len(h) >= 2 else np.array([0.0])
    b = float(np.median(diffs)) if len(diffs) else 0.0

    steps = np.arange(1, horizon + 1, dtype=float)
    damp = (1.0 - np.power(r, steps)) / (1.0 - r)  # Σ r^i 형태
    y = L0 + b * damp
    return y

def choose_r_by_group(r_min: float, r_max: float) -> float:
    # 그룹별 에러프로파일에 따라 다르게 줄 수 있으나 스켈레톤에서는 중간값 사용
    return (r_min + r_max) / 2.0

In [9]:
# ltb/forecasting/short_series/guardrails.py
from __future__ import annotations
import numpy as np
from typing import Tuple

def apply_step_caps(
    y_hat: np.ndarray,
    last_level: float,
    max_step_up: float,
    max_step_down: float
) -> np.ndarray:
    capped = []
    prev = float(last_level)
    for yh in y_hat:
        up_bound = prev * (1.0 + max_step_up)
        dn_bound = prev * (1.0 - max_step_down)
        val = float(np.clip(yh, dn_bound, up_bound))
        capped.append(val)
        prev = val
    return np.array(capped, dtype=float)

def apply_winsor_and_floor(
    y_hat: np.ndarray,
    winsor_q: Tuple[float, float],
    winsor_mul: float,
    winsor_growth: float,
    floor_qty: float
) -> np.ndarray:
    arr = y_hat.copy().astype(float)
    if len(arr) >= 3:
        q_low, q_high = np.quantile(arr, winsor_q)
        iqr = q_high - q_low
        lo = q_low - winsor_mul * iqr
        hi = q_high + winsor_mul * iqr
        arr = np.clip(arr, lo, hi)
    # 과도한 성장 완충 (간단히 전 스텝 대비 winsor_growth 배 제한)
    for i in range(1, len(arr)):
        arr[i] = min(arr[i], arr[i-1] * winsor_growth)
    if floor_qty is not None:
        arr = np.maximum(arr, float(floor_qty))
    return arr

def quantiles_from_donors(
    donor_stack: np.ndarray,
    scale_factor: float = 1.0,
    qs=(0.1, 0.5, 0.9)
) -> dict:
    """
    도너 기반 분포에서 분위수를 직접 계산
    donor_stack: (K, H)
    """
    if donor_stack.size == 0:
        return {}
    q_map = {}
    for q in qs:
        q_map[q] = scale_factor * np.quantile(donor_stack, q, axis=0)
    return q_map

def symmetric_quantile_fan(
    median: np.ndarray,
    spread_ratio: float = 0.3,  # 기본 확산 비율
    qs=(0.1, 0.5, 0.9)
) -> dict:
    out = {}
    for q in qs:
        if q == 0.5:
            out[q] = median
        elif q < 0.5:
            out[q] = np.maximum(0.0, median * (1.0 - spread_ratio))
        else:
            out[q] = median * (1.0 + spread_ratio)
    return out

In [10]:
# ltb/forecasting/short_series/short_series_forecaster.py
from __future__ import annotations
import numpy as np
from typing import Dict, Any, Tuple

class ShortSeriesForecaster:
    """
    라우팅 순서:
      1) 도너 전이(충분 K 확보 시) → 도너 앙상블 예측
      2) 그룹 예측(있다면) × 점유율(EMA+수축) → 배분
      3) 글로벌 풀링 베이스라인(간이 Theta/감쇠)
      4) 가드레일(스텝 캡/윈저/그로스 제한/플로어)
      5) 분위수 산출(도너 기반 또는 대칭 팬)
    """
    def __init__(self, config: ShortSeriesPolicyConfig, donor_bank: DonorBank) -> None:
        self.cfg = config
        self.donor_bank = donor_bank

    def forecast(
        self,
        history: np.ndarray,               # shape: (T,)  T<=5
        meta: Dict[str, Any],              # {'group_id', 'group_mean_share', 'last_share', ...}
        horizon: int,
        group_forecast: np.ndarray | None  # shape: (H,) or None
    ) -> Dict[float, np.ndarray]:
        assert history.ndim == 1, "history must be 1D array"
        T = len(history)
        assert T < self.cfg.history_threshold, "history too long for ShortSeriesForecaster"

        # ---- 1) 도너 전이
        y_donor = None
        donor_stack = np.empty((0, horizon))
        matches = self.donor_bank.find_analogs(
            target_5=history,
            meta=meta,
            k=self.cfg.donor_k,
            same_group_only=self.cfg.donor_same_group_only
        )
        if len(matches) >= self.cfg.donor_min_k:
            y_donor, donor_stack = self.donor_bank.analog_transfer(
                target_5=history,
                matches=matches,
                horizon=horizon
            )

        # ---- 2) 계층 배분
        y_hier = None
        if group_forecast is not None and "last_share" in meta and "group_mean_share" in meta:
            y_hier = allocate_from_group_forecast(
                group_forecast=group_forecast,
                last_share=float(meta["last_share"]),
                group_mean_share=float(meta["group_mean_share"]),
                lam=self.cfg.share_lambda,
                floor=self.cfg.share_floor,
                ceil=self.cfg.share_ceil
            )

        # ---- 3) 글로벌 베이스라인
        r = choose_r_by_group(self.cfg.damped_r_min, self.cfg.damped_r_max)
        y_base = simple_damped_trend_forecast(history, horizon, r)

        # ---- 4) 앙상블 결합 (간단 가중 합; 필요 시 메타학습으로 교체)
        # 우선순위: 도너 > 계층 > 베이스라인
        weights = []
        candidates = []
        if y_donor is not None and len(y_donor):
            candidates.append(y_donor); weights.append(0.6)
        if y_hier is not None:
            candidates.append(y_hier);  weights.append(0.3)
        candidates.append(y_base);      weights.append(0.1)

        weights = np.array(weights, dtype=float)
        weights = weights / weights.sum()
        stacked = np.vstack(candidates)  # (n_comp, H)
        y_hat = np.sum(stacked * weights[:, None], axis=0)

        # ---- 5) 가드레일 적용
        last_level = float(np.mean(history[-2:])) if len(history) >= 2 else float(history[-1])
        y_hat = apply_step_caps(
            y_hat, last_level,
            max_step_up=self.cfg.max_step_up,
            max_step_down=self.cfg.max_step_down
        )
        y_hat = apply_winsor_and_floor(
            y_hat,
            winsor_q=(self.cfg.winsor_q_low, self.cfg.winsor_q_high),
            winsor_mul=self.cfg.winsor_mul,
            winsor_growth=self.cfg.winsor_growth,
            floor_qty=self.cfg.floor_qty
        )

        # ---- 6) 분위수 산출
        if donor_stack.size > 0:
            # 도너 분포 기반 분위수 + 가드레일 재적용
            q_map = quantiles_from_donors(donor_stack, scale_factor=1.0, qs=self.cfg.quantiles)
            # 중앙값을 최종 y_hat으로 치환
            q_map[0.5] = y_hat
            # 하/상 분위수도 같은 가드레일 적용(일관성)
            for q in q_map:
                q_map[q] = apply_step_caps(q_map[q], last_level, self.cfg.max_step_up, self.cfg.max_step_down)
                q_map[q] = apply_winsor_and_floor(
                    q_map[q],
                    winsor_q=(self.cfg.winsor_q_low, self.cfg.winsor_q_high),
                    winsor_mul=self.cfg.winsor_mul,
                    winsor_growth=self.cfg.winsor_growth,
                    floor_qty=self.cfg.floor_qty
                )
        else:
            # 도너 없음 → 대칭 팬
            q_map = symmetric_quantile_fan(y_hat, spread_ratio=0.3, qs=self.cfg.quantiles)

        return q_map  # {0.1: np.ndarray(H,), 0.5: ..., 0.9: ...}

In [11]:
outputs = []  # (gbm_cd, part_no, fcst_yyyymm, q10, q50, q90)

for bu, df_bu in by_bu.items():
    # 7-1) 결측월 채운 테이블
    df_bu_filled = filled_by_bu[bu]

    # 7-2) 그룹(사업부) 시계열 & 예측
    group_s, group_months = build_group_series(df_bu_filled)
    group_fcst = None
    if len(group_s) >= 2:
        r = choose_r_by_group(cfg.damped_r_min, cfg.damped_r_max)
        group_fcst = simple_damped_trend_forecast(group_s, H, r)

    # 7-3) fcst_yyyymm 생성
    last_ym = max(group_months) if group_months else None
    fcst_months = forecast_months(int(last_ym), H) if last_ym else [ ]

    # 7-4) 도너 뱅크
    donor_bank = build_donor_bank(df_bu_filled, group_id=bu)
    ssf = ShortSeriesForecaster(cfg, donor_bank)

    # 7-5) 그룹 합표(점유율용)
    group_tbl = (df_bu_filled.group_by("yyyymm")
                 .agg(pl.col("qty").sum().alias("qty"))
                 .sort("yyyymm"))

    # 7-6) 파트 예측
    for pno, g in df_bu_filled.group_by("part_no"):
        item_tbl = g.sort("yyyymm").select("yyyymm", "qty")
        history = item_tbl["qty"].to_numpy()
        T = len(history)

        # 점유율 특징
        shares = compute_shares_item_vs_group(item_tbl, group_tbl) if T > 0 else np.array([], float)
        last_share = float(shares[-1]) if shares.size else 0.0
        group_mean_share = float(np.mean(shares[-min(T, 6):])) if shares.size else 0.0

        meta = {"group_id": bu, "last_share": last_share, "group_mean_share": group_mean_share}

        if T < cfg.history_threshold:
            q_map = ssf.forecast(history=history, meta=meta, horizon=H, group_forecast=group_fcst)
            q10 = q_map.get(0.1); q50 = q_map.get(0.5); q90 = q_map.get(0.9)
        else:
            # 긴 항목은 기존 모델(예: PatchMixer/Titan)로 라우팅하십시오.
            # 임시 대체: 그룹 예측 × 점유율(EMA 수축 전) 또는 감쇠 베이스라인
            if group_fcst is not None:
                y = group_fcst * (0.7 * last_share + 0.3 * group_mean_share)
            else:
                r = choose_r_by_group(cfg.damped_r_min, cfg.damped_r_max)
                y = simple_damped_trend_forecast(history, H, r)
            q10, q50, q90 = y * 0.8, y, y * 1.2

        # 결과 수집
        for i, ym in enumerate(fcst_months):
            outputs.append((bu, pno, ym, float(q10[i]), float(q50[i]), float(q90[i])))

fcst_df = pl.DataFrame(outputs, schema=["gbm_cd", "part_no", "fcst_yyyymm", "q10", "q50", "q90"])

  fcst_df = pl.DataFrame(outputs, schema=["gbm_cd", "part_no", "fcst_yyyymm", "q10", "q50", "q90"])


In [12]:
fcst_df = (fcst_df
            .with_columns(pl.col('part_no').map_elements(lambda x: x[0], pl.Utf8).alias('part_no')))
fcst_df

gbm_cd,part_no,fcst_yyyymm,q10,q50,q90
str,str,i64,f64,f64,f64
"""VD""","""0001-1001""",202703,0.036268,0.045336,0.054403
"""VD""","""0001-1001""",202704,0.036281,0.045351,0.054421
"""VD""","""0001-1001""",202705,0.036292,0.045365,0.054438
"""VD""","""0001-1001""",202706,0.036302,0.045378,0.054453
"""VD""","""0001-1001""",202707,0.036312,0.04539,0.054468
…,…,…,…,…,…
"""VD""","""ZZ90239""",203610,0.0,0.0,0.0
"""VD""","""ZZ90239""",203611,0.0,0.0,0.0
"""VD""","""ZZ90239""",203612,0.0,0.0,0.0
"""VD""","""ZZ90239""",203701,0.0,0.0,0.0


In [16]:
fcst_df.filter(pl.col('part_no') == "B")

gbm_cd,part_no,fcst_yyyymm,q10,q50,q90
str,str,i64,f64,f64,f64
"""VD""","""B""",202703,0.0,0.0,0.0
"""VD""","""B""",202704,0.0,0.0,0.0
"""VD""","""B""",202705,0.0,0.0,0.0
"""VD""","""B""",202706,0.0,0.0,0.0
"""VD""","""B""",202707,0.0,0.0,0.0
…,…,…,…,…,…
"""VD""","""B""",203610,0.0,0.0,0.0
"""VD""","""B""",203611,0.0,0.0,0.0
"""VD""","""B""",203612,0.0,0.0,0.0
"""VD""","""B""",203701,0.0,0.0,0.0
