(셀 1) 필요한 모든 import를 한 곳에 묶기

In [1]:
!pip -q install "openai>=1.40.0" python-dotenv pandas openpyxl numpy

In [2]:
# ===== Cell 1: Imports (ONLY HERE) =====
import os, re, json
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple, Any

import numpy as np
import pandas as pd
from dotenv import load_dotenv, find_dotenv

(셀 2) 상수/정책/로그/.env/시간함수 “여기만” (전체 코드)

In [3]:
# ===== Cell 2: Constants / Policies / Logging / Env =====

DEBUG = True

def log(layer: str, msg: str, **kwargs):
    if not DEBUG:
        return
    print(f"[{layer}] {msg}")
    if kwargs:
        for k, v in kwargs.items():
            print(f"  - {k}: {v}")

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")

# .env 로딩 (필요 시)
load_dotenv(find_dotenv(), override=False)
assert os.getenv("OPENAI_API_KEY"), "OPENAI_API_KEY가 .env에 없습니다."
log("INIT", ".env loaded", OPENAI_API_KEY="SET")

# 지원 슬롯
SLOT_NAMES = ["electricity_usage", "citygas_usage", "water_usage"]

# 단위 토큰 표준화(키: 감지 토큰, 값: 표준 단위 문자열)
UNIT_TOKENS = {
    "kwh": "kWh", "mwh": "MWh", "wh": "Wh",
    "mj": "MJ", "gj": "GJ",
    "m3": "m3", "㎥": "m3",
    "tco2e": "tCO2e", "co2e": "tCO2e",
    "kvarh": "kVarh",
}

# slotName별 컬럼 패턴
PATTERNS_BY_SLOT = {
    "electricity_usage": {
        "date":  [r"\btimestamp\b", r"\bdatetime\b", r"\bdate\b", r"\btime\b", "일시", "시각", "연월일", "년월일", "일자", "검침일"],
        "flow":  ["사용량", "전력", "전기사용", "usage", r"\bconsumption\b", r"\bpower\b"],
        "cum":   ["누적", "cumulative", "meter", "계량"],
        "unit":  [r"\bunit\b", r"\buom\b", "단위", "kwh", "mwh", "wh", "mj", "gj", "kvarh"],
    },
    "citygas_usage": {
        "date":  [r"\btimestamp\b", r"\bdatetime\b", r"\bdate\b", r"\btime\b", "일시", "시각", "연월일", "년월일", "일자", "검침일"],
        "flow":  ["사용량", "가스", "도시가스", r"\bflow\b", r"\bvolume\b", "usage"],
        "cum":   ["누적", "cumulative", "meter", "계량"],
        "unit":  [r"\bunit\b", r"\buom\b", "단위", "m3", "㎥", "gj", "mj", "tco2e", "co2e"],
    },
    "water_usage": {
        "date":  [r"\btimestamp\b", r"\bdatetime\b", r"\bdate\b", r"\btime\b", "일시", "시각", "연월일", "년월일", "일자", "검침일"],
        "flow":  ["사용량", "수도", "용수", r"\bflow\b", r"\bvolume\b", "usage"],
        "cum":   ["누적", "cumulative", "meter", "계량"],
        "unit":  [r"\bunit\b", r"\buom\b", "단위", "m3", "㎥"],
    },
}

# slot별 예상 단위
EXPECTED_UNITS_BY_SLOT = {
    "electricity_usage": {"kWh"},
    "citygas_usage": {"m3"},
    "water_usage": {"m3"},
}

# 단위 토큰이 전혀 없을 때 fallback
SLOT_DEFAULT_UNIT = {
    "electricity_usage": "kWh",
    "citygas_usage": "m3",
    "water_usage": "m3",
}

# 최종 표준 컬럼명 규칙
SLOT_OUTPUT_SCHEMA = {
    "electricity_usage": {
        "timestamp": "timestamp",
        "flow": "flow_kwh",
        "cum": None,  # 전기는 기본적으로 cum 없음
    },
    "citygas_usage": {
        "timestamp": "timestamp",
        "flow": "flow_m3",
        "cum": None,  # 가스는 cum 필수 아님(필요 시 확장 가능)
    },
    "water_usage": {
        "timestamp": "timestamp",
        "flow": "flow_m3",
        "cum": "cumulative_meter_m3",  # 수도는 cum이 있으면 core로 처리 가능
    },
}

# 파싱률 임계치
TS_THRESHOLD = 0.95
NUM_THRESHOLD = 0.95

# 정책
FAIL_IF_UNIT_UNRESOLVED = False
FAIL_IF_DUP_TS = True

# LLM 옵션
USE_LLM = False  # 운영에서는 기본 False, 필요 시 True

# unit_schema label
TIME_UNIT_LABEL = "time"
UNKNOWN_UNIT_LABEL = "-"


[INIT] .env loaded
  - OPENAI_API_KEY: SET


(셀 3) 유틸 “import 없음 / 상수 재정의 없음” (전체 코드)

In [4]:
# ===== Cell 3: Utilities (NO imports, NO constant re-definitions) =====

def rule_score(colname: str, patterns: List[str]) -> int:
    s = str(colname).lower()
    score = 0
    for p in patterns:
        if p.startswith(r"\b") or "(" in p or "[" in p:
            if re.search(p, s, flags=re.IGNORECASE):
                score += 1
        else:
            if p.lower() in s:
                score += 1
    return score

def parse_rate_numeric(series: pd.Series) -> float:
    cleaned = series.astype(str).str.replace(",", "", regex=False).str.strip()
    parsed = pd.to_numeric(cleaned, errors="coerce")
    return float(parsed.notna().mean()) if len(parsed) else 0.0

def detect_unit_from_name(colname: str) -> Optional[str]:
    s = str(colname).lower()
    for k, u in UNIT_TOKENS.items():
        if k in s:
            return u
    return None

def detect_unit_from_unit_column(unit_series: pd.Series) -> Optional[str]:
    values = unit_series.astype(str).str.lower().dropna().tolist()
    hits = []
    for v in values[:500]:
        for k, u in UNIT_TOKENS.items():
            if k in v:
                hits.append(u)
    if not hits:
        return None
    return pd.Series(hits).mode().iloc[0]

def parse_period(period_start: str, period_end: str) -> Tuple[pd.Timestamp, pd.Timestamp]:
    ps = pd.to_datetime(period_start, errors="raise").tz_localize(None)
    pe = pd.to_datetime(period_end, errors="raise").tz_localize(None)
    if pe < ps:
        raise ValueError("period_end가 period_start보다 빠릅니다.")
    return ps, pe

def normalize_datetime_series(raw: pd.Series, period_start: pd.Timestamp) -> Tuple[pd.Series, float, Dict[str, Any], pd.Series]:
    s = raw.astype(str).str.strip()

    ts1 = pd.to_datetime(s, errors="coerce")
    if ts1.dt.tz is not None:
        ts1 = ts1.dt.tz_localize(None)

    def _fix_one(x: str) -> Optional[pd.Timestamp]:
        if x is None: 
            return None
        x = str(x).strip()
        if not x:
            return None

        x2 = (x.replace("년", "/").replace("월", "/").replace("일", " ")
               .replace("시", ":").replace("분", ""))
        x2 = x2.replace(".", "/").replace("-", "/").replace(",", " ")
        x2 = re.sub(r"\s+", " ", x2).strip()

        nums = re.findall(r"\d+", x2)
        if len(nums) < 2:
            return None

        if len(nums[0]) == 4:
            year = int(nums[0]); idx = 1
        else:
            year = int(period_start.year); idx = 0

        # 간단한 케이스 보정
        try:
            month = int(nums[idx]); day = int(nums[idx + 1])
            hour = int(nums[idx + 2]) if len(nums) >= idx + 3 else 0
            minute = int(nums[idx + 3]) if len(nums) >= idx + 4 else 0
            if 0 <= hour < 24 and 0 <= minute < 60:
                return pd.Timestamp(year=year, month=month, day=day, hour=hour, minute=minute)
        except:
            return None
        return None

    ts2 = ts1.copy()
    mask_na = ts2.isna()
    filled = 0
    if mask_na.any():
        fixed = s[mask_na].map(_fix_one)
        ts2.loc[mask_na] = pd.to_datetime(fixed, errors="coerce")
        filled = int(mask_na.sum())

    rate = float(ts2.notna().mean()) if len(ts2) else 0.0
    normalized_str = ts2.dt.strftime("%Y/%m/%d %H:%M")
    debug = {"parse_rate": rate, "filled_from_custom": filled}
    return ts2, rate, debug, normalized_str

def detect_time_granularity(ts_parsed: pd.Series) -> Optional[str]:
    ts = ts_parsed.dropna().sort_values()
    if len(ts) < 3:
        return None
    deltas = ts.diff().dropna().dt.total_seconds()
    if deltas.empty:
        return None
    mode_sec = float(deltas.mode().iloc[0])

    if abs(mode_sec - 600) <= 60: return "10min"
    if abs(mode_sec - 900) <= 60: return "15min"
    if abs(mode_sec - 1800) <= 120: return "30min"
    if abs(mode_sec - 3600) <= 120: return "hourly"
    if abs(mode_sec - 86400) <= 3600: return "day"
    if abs(mode_sec - 604800) <= 86400: return "week"
    return None

def granularity_to_pandas_freq(gran: str) -> Optional[str]:
    mapping = {"10min": "10min", "15min": "15min", "30min": "30min", "hourly": "h", "day": "d"}
    return mapping.get(gran)

# ✅ (C 해결) validated_fields / unit_schema 생성은 아래 하나로 통일합니다.
def build_output_schema(
    df: pd.DataFrame,
    slotName: str,
    date_col: str,
    flow_col: str,
    cum_col: Optional[str],
    resolved_flow_unit: str
) -> Tuple[List[str], List[str], Dict[str, str]]:
    """
    - core 컬럼: timestamp, flow_*, (cum이 선택되면 cum도 core)
    - non-core 컬럼: unit_schema는 무조건 "-"
    - validated_fields: 최종 출력 컬럼명 리스트
    - rename_map: 원본컬럼 -> 최종컬럼명
    """
    schema = SLOT_OUTPUT_SCHEMA[slotName]
    rename_map: Dict[str, str] = {}

    out_ts = "timestamp"
    out_flow = schema["flow"]
    out_cum = schema.get("cum", None)  # water는 cumulative_meter_m3 가능

    # core rename
    rename_map[date_col] = out_ts
    rename_map[flow_col] = out_flow

    cum_is_core = False
    if cum_col is not None and out_cum is not None:
        rename_map[cum_col] = out_cum
        cum_is_core = True

    # validated_fields = [core...] + [others...]
    validated_fields: List[str] = [out_ts, out_flow]
    if cum_is_core:
        validated_fields.append(out_cum)

    # others keep original names (단, core와 이름 충돌하면 __orig)
    reserved = set(validated_fields)
    for c in df.columns:
        c = str(c)
        if c in rename_map:
            continue
        out_name = c
        if out_name in reserved:
            out_name = f"{out_name}__orig"
        rename_map[c] = out_name
        reserved.add(out_name)
        validated_fields.append(out_name)

    # unit_schema 규칙: core만 단위, 나머지는 "-"
    unit_schema: List[str] = []
    for name in validated_fields:
        if name == out_ts:
            unit_schema.append(TIME_UNIT_LABEL)
        elif name == out_flow:
            unit_schema.append(resolved_flow_unit)
        elif cum_is_core and name == out_cum:
            unit_schema.append("m3")
        else:
            unit_schema.append(UNKNOWN_UNIT_LABEL)

    return validated_fields, unit_schema, rename_map


(셀 4) strict 출력 셀 (전체 코드)

In [5]:
# ===== Cell 4: Strict Output Helpers =====

def pass_output_strict(file_path: str, time_granularity: str, unit_schema: list, validated_fields: list):
    return {
        "status": "PASS",
        "file_path": file_path,
        "payload": {
            "time_granularity": time_granularity,
            "unit_schema": unit_schema,
            "validated_fields": validated_fields
        },
        "processed_at": now_utc_iso()
    }

def fail_output_strict(file_path: str, code: str, message: str, location: str):
    return {
        "status": "FAIL",
        "error": {"code": code, "message": message, "location": location},
        "file_path": file_path,
        "processed_at": now_utc_iso()
    }

def print_full(result: dict):
    print(json.dumps(result, ensure_ascii=False, indent=2))


Cell 5 — L0 입력 검증 (layer0_input_sanity)
역할(쉬운 설명)

“데이터프레임 검증 파이프라인에 들어오기 전에” 입력이 말이 되는지 초기 안전검사를 한다.

여기서 FAIL이면 뒤 레이어(L1~L4)는 돌릴 필요가 없어서 가장 싸고 빠른 방어선임.

하는 일(구체)

obj가 dict인지

필수키 존재 (slotName, kind, ext, period_start, period_end, dataframe)

slotName이 지원 슬롯인지

dataframe이 pandas.DataFrame인지, 행/열이 충분한지

period_start/end가 날짜로 파싱되는지 + 역전 여부

In [6]:
# ===== Cell 5: L0 input sanity =====

def layer0_input_sanity(obj: dict) -> Tuple[Optional[dict], Optional[dict]]:
    layer = "L0"

    if not isinstance(obj, dict):
        log(layer, "BAD_INPUT_TYPE", got=str(type(obj)))
        return None, {"code":"BAD_INPUT", "message":"입력이 dict가 아닙니다.", "location":"input"}

    required = ["slotName", "kind", "ext", "period_start", "period_end", "dataframe"]
    missing = [k for k in required if k not in obj]
    if missing:
        log(layer, "MISSING_KEYS", missing=missing)
        return None, {"code":"MISSING_KEYS", "message":f"필수 키 누락: {missing}", "location":"input"}

    slot = obj["slotName"]
    if slot not in SLOT_NAMES:
        log(layer, "UNKNOWN_SLOT", slotName=slot)
        return None, {"code":"UNKNOWN_SLOT", "message":f"지원하지 않는 slotName: {slot}", "location":"input.slotName"}

    df = obj["dataframe"]
    if not isinstance(df, pd.DataFrame):
        log(layer, "BAD_DF_TYPE", got=str(type(df)))
        return None, {"code":"BAD_DF", "message":"dataframe이 pandas.DataFrame이 아닙니다.", "location":"input.dataframe"}

    if df.shape[0] < 1 or df.shape[1] < 2:
        log(layer, "NO_DATA", shape=df.shape)
        return None, {"code":"NO_DATA", "message":f"데이터 행/열이 부족합니다. shape={df.shape}", "location":"input.dataframe"}

    try:
        ps, pe = parse_period(obj["period_start"], obj["period_end"])
    except Exception as e:
        log(layer, "BAD_PERIOD", exc=type(e).__name__)
        return None, {"code":"BAD_PERIOD", "message":f"기간 파싱 실패: {type(e).__name__}", "location":"input.period_start/end"}

    log(layer, "PASS", slotName=slot, shape=df.shape, period_start=str(ps), period_end=str(pe))

    normalized = dict(obj)
    normalized["_period_start_ts"] = ps
    normalized["_period_end_ts"] = pe
    return normalized, None


Cell 6 — L1 DF 정규화 (layer1_normalize_df)
역할(쉬운 설명)

각 회사 파일은 컬럼명이 “ 일시 ”, "timestamp "처럼 지저분할 수 있음.

L1은 **컬럼명을 표준화(strip)**해서 이후 단계가 안정적으로 돌아가게 만든다.

추가로 프로젝트 퀄리티를 올리기 위해:

빈 컬럼명

strip 후 중복 컬럼명
을 발견하면 즉시 FAIL시킨다.

왜 도움이 되나?

중복 컬럼이 있으면 L2에서 date/flow 찾을 때 잘못된 컬럼 선택이 생기고,

rename_map을 만들 때 충돌이 생기고,

unit_schema 길이/순서가 뒤틀릴 수 있다.
→ 그러니 L1에서 FAIL시키는 게 “품질 게이트”로 좋음.

In [7]:
# ===== Cell 6: L1 normalize DF =====

def layer1_normalize_df(df: pd.DataFrame) -> Tuple[Optional[pd.DataFrame], Optional[dict]]:
    layer = "L1"
    df2 = df.copy()
    cols = [str(c).strip() for c in df2.columns]

    # 빈 컬럼명 FAIL
    if any(c == "" for c in cols):
        log(layer, "EMPTY_COLUMN_NAME", columns=cols)
        return None, {"code":"L1_INVALID_COLUMNS", "message":"빈 컬럼명이 존재합니다(strip 후 '').", "location":"df.columns"}

    # strip 후 중복 컬럼명 FAIL
    if pd.Series(cols).duplicated().any():
        log(layer, "DUPLICATE_COLUMNS_AFTER_NORMALIZE", columns=cols)
        return None, {"code":"L1_INVALID_COLUMNS", "message":"L1 정규화 이후 중복 컬럼명이 발생했습니다(strip 후 동일).", "location":"df.columns"}

    df2.columns = cols
    log(layer, "PASS", columns=cols)
    return df2, None


Cell 7 — LLM 컬럼 분류 함수(“프로덕션 안전형” 최소 구현)
역할(쉬운 설명)

룰과 파싱률로도 date/flow를 못 찾는 “애매한 컬럼명” 상황에서,

LLM이 “이 컬럼은 date/flow/cum/unit 중 무엇인지”를 매핑 형태로만 반환하도록 강제한다.

핵심 원칙(너의 프로젝트 룰)

LLM이 “설명 문장” 쓰면 안 됨 → 오직 JSON mapping만

결과를 그대로 믿지 말고 → L2/L3에서 파싱률로 최종 방어

In [8]:
# ===== Cell 7: LLM classify (safe contract) =====

def llm_classify_columns_min_tokens(columns: List[str], df: pd.DataFrame, slotName: str) -> dict:
    """
    반드시 아래 형태만 반환해야 함:
    {"mapping": {"colA":"date", "colB":"flow", "colC":"other"...}}

    - 여기서는 기본 구현을 "안전하게 막아둠"
    - 팀에서 실제 OpenAI 호출을 연결하려면, 이 함수 내부만 교체하면 됨
    """
    # 기본은 "미사용" (USE_LLM=True인데 실제 호출을 못 붙였으면 즉시 빈 mapping)
    # => L2에서 fallback으로 이어지고, 그래도 못 찾으면 REQUIRED_FIELD_NOT_FOUND로 FAIL
    return {"mapping": {}}


Cell 8 — L2 컬럼 탐지(룰 → (LLM 옵션) → 파싱률 fallback)
역할(쉬운 설명)

데이터프레임에서 “날짜 컬럼(date)”과 “사용량 컬럼(flow)”을 반드시 찾아내는 단계

회사마다 컬럼명이 달라서:

룰 기반 점수로 후보 뽑고

그래도 부족하면 LLM(옵션)

마지막으로 값 형태(파싱률)로 선택한다

중요한 기능(팀 질문 대비)

force_llm=True면 무조건 LLM 호출 경로로 들어가도록 만들 수 있음 (테스트용)

auto_llm=True는 “룰이 실패했을 때만” 자동 호출하는 운영형 옵션

In [9]:
# ===== Cell 8: L2 find columns =====

def layer2_find_columns(
    df: pd.DataFrame,
    slotName: str,
    topn: int = 10,
    auto_llm: bool = True,
    force_llm: bool = False
) -> Tuple[Optional[str], Dict[str, Optional[str]], List[str], Optional[dict], dict]:

    layer = "L2"
    pats = PATTERNS_BY_SLOT[slotName]
    cols = [str(c) for c in df.columns]

    scored = []
    for c in cols:
        scored.append((
            c,
            rule_score(c, pats["date"]),
            rule_score(c, pats["flow"]),
            rule_score(c, pats["unit"]),
            rule_score(c, pats["cum"])
        ))

    date_candidates = [c for c, sd, _, _, _ in sorted(scored, key=lambda x: x[1], reverse=True)[:topn] if sd > 0]
    flow_candidates = [c for c, _, sf, _, _ in sorted(scored, key=lambda x: x[2], reverse=True)[:topn] if sf > 0]
    cum_candidates  = [c for c, _, _, _, sc in sorted(scored, key=lambda x: x[4], reverse=True)[:topn] if sc > 0]
    unit_cols       = [c for c, _, _, su, _ in scored if su > 0]

    debug = {
        "scored_top10": sorted(scored, key=lambda x: sum(x[1:]), reverse=True)[:10],
        "date_candidates": date_candidates,
        "flow_candidates": flow_candidates,
        "cum_candidates": cum_candidates,
        "unit_cols": unit_cols,
        "used_llm": False,
        "llm_reason": None,
        "llm_raw": None
    }

    date_col = date_candidates[0] if date_candidates else None
    flow_col = flow_candidates[0] if flow_candidates else None
    cum_col  = cum_candidates[0] if cum_candidates else None

    # (1) LLM 트리거 판단
    need_llm = False
    if force_llm:
        need_llm = True
        debug["llm_reason"] = "force_llm"
        log(layer, "AUTO_LLM_TRIGGERED", reason="force_llm", USE_LLM=USE_LLM)
    elif auto_llm and (not date_col or not flow_col):
        # 룰이 실패했고, auto_llm 켜져 있으면 LLM을 “시도”할 수 있음
        need_llm = True
        debug["llm_reason"] = "rule_missing"
        log(layer, "AUTO_LLM_TRIGGERED", reason="rule_missing", USE_LLM=USE_LLM)

    # (2) LLM 실행(옵션)
    if need_llm and USE_LLM:
        debug["used_llm"] = True
        llm_res = llm_classify_columns_min_tokens(cols, df, slotName)
        debug["llm_raw"] = llm_res

        mapping = (llm_res.get("mapping", {}) or {})
        date_llm = [c for c, lab in mapping.items() if lab == "date" and c in cols]
        flow_llm = [c for c, lab in mapping.items() if lab == "flow" and c in cols]
        cum_llm  = [c for c, lab in mapping.items() if lab == "cum" and c in cols]
        unit_llm = [c for c, lab in mapping.items() if lab == "unit" and c in cols]

        date_col = date_col or (date_llm[0] if date_llm else None)
        flow_col = flow_col or (flow_llm[0] if flow_llm else None)
        cum_col  = cum_col  or (cum_llm[0] if cum_llm else None)
        unit_cols = list(set(unit_cols + unit_llm))

    # (3) fallback: 값 형태로 찾기
    if not date_col:
        date_rates = sorted(
            [(c, float(pd.to_datetime(df[c], errors="coerce").notna().mean())) for c in cols],
            key=lambda x: x[1], reverse=True
        )
        debug["date_fallback_top5"] = date_rates[:5]
        date_col = date_rates[0][0] if (date_rates and date_rates[0][1] >= TS_THRESHOLD) else None

    if not flow_col:
        val_rates = sorted([(c, parse_rate_numeric(df[c])) for c in cols], key=lambda x: x[1], reverse=True)
        debug["flow_fallback_top5"] = val_rates[:5]
        flow_col = val_rates[0][0] if (val_rates and val_rates[0][1] >= NUM_THRESHOLD) else None

    if not date_col or not flow_col:
        return None, {"flow": None, "cum": None}, unit_cols, {
            "code":"REQUIRED_FIELD_NOT_FOUND",
            "message":"필수 컬럼(date/flow)을 찾지 못했습니다.",
            "location":"columns"
        }, debug

    log(layer, "picked", date_col=date_col, flow_col=flow_col, cum_col=cum_col, unit_cols=unit_cols)
    return date_col, {"flow": flow_col, "cum": cum_col}, unit_cols, None, debug


Cell 9 — L3 값 검증(파싱률/단위/해상도)
역할(쉬운 설명)

L2가 “이 컬럼이 date/flow 같아요”라고 고른 것을 진짜로 사용 가능한지 최종 확인하는 단계

여기서 통과해야만 L4(기간 커버리지)로 간다.

하는 일(구체)

date_col의 datetime 파싱률 체크 (TS_THRESHOLD)

flow_col의 숫자 파싱률 체크 (NUM_THRESHOLD)

시간 간격을 보고 time_granularity 판정

단위 추정(단위 컬럼 → 컬럼명 suffix → 슬롯 기본값)

기대 단위와 다르면 UNIT_MISMATCH FAIL

In [10]:
# ===== Cell 9: L3 validate values =====

def layer3_validate_values(
    df: pd.DataFrame,
    slotName: str,
    date_col: str,
    value_cols: Dict[str, Optional[str]],
    unit_cols: List[str],
    period_start: pd.Timestamp
) -> Tuple[Optional[dict], dict]:

    layer = "L3"
    debug: Dict[str, Any] = {}

    flow_col = value_cols["flow"]

    ts_parsed, ts_rate, ts_dbg, ts_norm_str = normalize_datetime_series(df[date_col], period_start=period_start)
    flow_num_rate = parse_rate_numeric(df[flow_col])

    time_gran = detect_time_granularity(ts_parsed)

    debug["ts_rate"] = ts_rate
    debug["flow_num_rate"] = flow_num_rate
    debug["time_granularity"] = time_gran

    if ts_rate < TS_THRESHOLD:
        return {"code":"TS_PARSE_LOW", "message":f"날짜 파싱률 낮음(ts_rate={ts_rate:.2f})", "location":f"col:{date_col}"}, debug
    if flow_num_rate < NUM_THRESHOLD:
        return {"code":"VALUE_PARSE_LOW", "message":f"숫자 파싱률 낮음(num_rate={flow_num_rate:.2f})", "location":f"col:{flow_col}"}, debug
    if time_gran is None:
        return {"code":"TIME_GRAIN_UNKNOWN", "message":"시간 해상도 판별 불가", "location":f"col:{date_col}"}, debug

    # 단위 결정
    resolved_unit = None

    # 1) unit 컬럼이 있으면 그 값에서 탐지
    for uc in unit_cols or []:
        u = detect_unit_from_unit_column(df[uc])
        if u:
            resolved_unit = u
            break

    # 2) flow 컬럼명에서 탐지 (Usage_kWh 같은 케이스)
    if resolved_unit is None:
        u = detect_unit_from_name(flow_col)
        if u:
            resolved_unit = u

    # 3) slot 기본 단위 fallback
    if resolved_unit is None:
        if FAIL_IF_UNIT_UNRESOLVED:
            return {"code":"UNIT_UNRESOLVED", "message":"단위 판별 불가", "location":f"col:{flow_col}"}, debug
        resolved_unit = SLOT_DEFAULT_UNIT[slotName]

    expected_units = EXPECTED_UNITS_BY_SLOT[slotName]
    if resolved_unit not in expected_units:
        return {
            "code":"UNIT_MISMATCH",
            "message":f"단위 불일치: got={resolved_unit}, expected={list(expected_units)[0]}",
            "location":f"col:{flow_col}"
        }, debug

    debug["_ts_parsed"] = ts_parsed
    debug["resolved_unit"] = resolved_unit
    return None, debug


Cell 10 — L4 기간 커버리지(누락/중복/결측 FAIL)
역할(쉬운 설명)

“분기 보고”의 핵심은 기간 전체가 빠짐없이 채워져 있는지야.

L4는 지정한 period_start~period_end에 대해:

timestamp 누락

timestamp 중복

flow 값 결측
을 강하게 FAIL시키는 품질 게이트임.

In [11]:
# ===== Cell 10: L4 period coverage =====

def granularity_to_pandas_freq(gran: str) -> Optional[str]:
    mapping = {"10min": "10min", "15min": "15min", "30min": "30min", "hourly": "h", "day": "d"}
    return mapping.get(gran)

def align_ts_for_compare(ts: pd.Series, gran: str) -> pd.Series:
    if gran in ("10min", "15min", "30min"):
        mins = int(gran.replace("min", ""))
        return ts.dt.floor(f"{mins}min")
    if gran == "hourly":
        return ts.dt.floor("h")
    if gran == "day":
        return ts.dt.floor("d")
    return ts

def layer4_validate_period_coverage(
    df: pd.DataFrame,
    ts_parsed: pd.Series,
    flow_col: str,
    period_start: pd.Timestamp,
    period_end: pd.Timestamp,
    granularity: str
) -> Tuple[Optional[dict], dict]:

    layer = "L4"
    debug: Dict[str, Any] = {}

    ts_aligned = align_ts_for_compare(ts_parsed, granularity)

    mask = (ts_aligned >= period_start) & (ts_aligned <= period_end)
    df_in = df.loc[mask].copy()
    ts_in = ts_aligned.loc[mask]

    debug["rows_in_period"] = int(df_in.shape[0])
    if df_in.shape[0] == 0:
        return {"code":"PERIOD_OUT_OF_RANGE", "message":"지정 기간 내 데이터가 없습니다.", "location":"period"}, debug

    dup_count = int(pd.Series(ts_in.dropna().values).duplicated().sum())
    debug["duplicate_ts_count"] = dup_count
    if FAIL_IF_DUP_TS and dup_count > 0:
        return {"code":"DUPLICATE_TIMESTAMPS", "message":"기간 내 중복 timestamp 존재", "location":"timestamp"}, debug

    freq = granularity_to_pandas_freq(granularity)
    if freq is None:
        return {"code":"TIME_GRAIN_UNKNOWN", "message":"시간 해상도 판별 불가(L4)", "location":"time_granularity"}, debug

    expected = pd.date_range(start=period_start, end=period_end, freq=freq)
    exp_set = set(expected.tolist())
    act_set = set(ts_in.dropna().tolist())

    missing = sorted(list(exp_set - act_set))
    if missing:
        return {"code":"PERIOD_MISSING_TIMESTAMPS", "message":"기간 내 timestamp 누락", "location":"timestamp"}, debug

    val_num = pd.to_numeric(df_in[flow_col].astype(str).str.replace(",", "", regex=False).str.strip(), errors="coerce")
    if int(val_num.isna().sum()) > 0:
        return {"code":"PERIOD_VALUE_MISSING", "message":"기간 내 flow 값 누락", "location":f"col:{flow_col}"}, debug

    log(layer, "PASS", rows_in_period=debug["rows_in_period"])
    return None, debug


(셀 11) validate_structured_upstream (교체/전체 코드)

In [12]:
# ===== Cell 11: Main validate (STRICT output only) =====

# def layer0_input_sanity(obj: dict) -> Tuple[Optional[dict], Optional[dict]]:
#     layer = "L0"

#     if not isinstance(obj, dict):
#         log(layer, "BAD_INPUT_TYPE", got=str(type(obj)))
#         return None, {"code":"BAD_INPUT", "message":"입력이 dict가 아닙니다.", "location":"input"}

#     required = ["slotName", "kind", "ext", "period_start", "period_end", "dataframe"]
#     missing = [k for k in required if k not in obj]
#     if missing:
#         log(layer, "MISSING_KEYS", missing=missing)
#         return None, {"code":"MISSING_KEYS", "message":f"필수 키 누락: {missing}", "location":"input"}

#     slot = obj["slotName"]
#     if slot not in SLOT_NAMES:
#         log(layer, "UNKNOWN_SLOT", slotName=slot)
#         return None, {"code":"UNKNOWN_SLOT", "message":f"지원하지 않는 slotName: {slot}", "location":"input.slotName"}

#     df = obj["dataframe"]
#     if not isinstance(df, pd.DataFrame):
#         log(layer, "BAD_DF_TYPE", got=str(type(df)))
#         return None, {"code":"BAD_DF", "message":"dataframe이 pandas.DataFrame이 아닙니다.", "location":"input.dataframe"}

#     if df.shape[0] < 1 or df.shape[1] < 2:
#         log(layer, "NO_DATA", shape=df.shape)
#         return None, {"code":"NO_DATA", "message":f"데이터 행/열이 부족합니다. shape={df.shape}", "location":"input.dataframe"}

#     try:
#         ps, pe = parse_period(obj["period_start"], obj["period_end"])
#     except Exception as e:
#         log(layer, "BAD_PERIOD", exc=type(e).__name__)
#         return None, {"code":"BAD_PERIOD", "message":f"기간 파싱 실패: {type(e).__name__}", "location":"input.period_start/end"}

#     log(layer, "PASS", slotName=slot, shape=df.shape, period_start=str(ps), period_end=str(pe))

#     normalized = dict(obj)
#     normalized["_period_start_ts"] = ps
#     normalized["_period_end_ts"] = pe
#     return normalized, None


# def layer1_normalize_df(df: pd.DataFrame) -> Tuple[Optional[pd.DataFrame], Optional[dict]]:
#     """
#     L1: 컬럼명 strip + (중복/빈 컬럼명) 감지 시 FAIL
#     """
#     layer = "L1"
#     df2 = df.copy()
#     cols = [str(c).strip() for c in df2.columns]

#     # 빈 컬럼명 체크
#     if any(c == "" for c in cols):
#         log(layer, "EMPTY_COLUMN_NAME", columns=cols)
#         return None, {"code":"L1_INVALID_COLUMNS", "message":"빈 컬럼명이 존재합니다(strip 후 '').", "location":"df.columns"}

#     # 중복 컬럼명 체크
#     if pd.Series(cols).duplicated().any():
#         log(layer, "DUPLICATE_COLUMNS_AFTER_NORMALIZE", columns=cols)
#         return None, {"code":"L1_INVALID_COLUMNS", "message":"L1 정규화 이후 중복 컬럼명이 발생했습니다(strip 후 동일).", "location":"df.columns"}

#     df2.columns = cols
#     log(layer, "PASS", columns=cols)
#     return df2, None


# def layer2_find_columns(df: pd.DataFrame, slotName: str, topn: int = 10) -> Tuple[Optional[str], Dict[str, Optional[str]], List[str], Optional[dict], dict]:
#     layer = "L2"
#     pats = PATTERNS_BY_SLOT[slotName]
#     cols = [str(c) for c in df.columns]

#     scored = []
#     for c in cols:
#         scored.append((
#             c,
#             rule_score(c, pats["date"]),
#             rule_score(c, pats["flow"]),
#             rule_score(c, pats["unit"]),
#             rule_score(c, pats["cum"])
#         ))

#     date_candidates = [c for c, sd, _, _, _ in sorted(scored, key=lambda x: x[1], reverse=True)[:topn] if sd > 0]
#     flow_candidates = [c for c, _, sf, _, _ in sorted(scored, key=lambda x: x[2], reverse=True)[:topn] if sf > 0]
#     cum_candidates  = [c for c, _, _, _, sc in sorted(scored, key=lambda x: x[4], reverse=True)[:topn] if sc > 0]
#     unit_cols       = [c for c, _, _, su, _ in scored if su > 0]

#     debug = {
#         "scored_top10": sorted(scored, key=lambda x: sum(x[1:]), reverse=True)[:10],
#         "date_candidates": date_candidates,
#         "flow_candidates": flow_candidates,
#         "cum_candidates": cum_candidates,
#         "unit_cols": unit_cols,
#         "used_llm": False,
#     }

#     date_col = date_candidates[0] if date_candidates else None
#     flow_col = flow_candidates[0] if flow_candidates else None
#     cum_col  = cum_candidates[0] if cum_candidates else None

#     # 룰 부족 시: fallback (LLM은 여기서 쓰려면 별도 llm_classify...를 연결하면 됨)
#     if not date_col:
#         date_rates = sorted(
#             [(c, float(pd.to_datetime(df[c], errors="coerce").notna().mean())) for c in cols],
#             key=lambda x: x[1], reverse=True
#         )
#         debug["date_fallback_top5"] = date_rates[:5]
#         date_col = date_rates[0][0] if (date_rates and date_rates[0][1] >= TS_THRESHOLD) else None

#     if not flow_col:
#         val_rates = sorted([(c, parse_rate_numeric(df[c])) for c in cols], key=lambda x: x[1], reverse=True)
#         debug["flow_fallback_top5"] = val_rates[:5]
#         flow_col = val_rates[0][0] if (val_rates and val_rates[0][1] >= NUM_THRESHOLD) else None

#     if not date_col or not flow_col:
#         return None, {"flow": None, "cum": None}, unit_cols, {
#             "code":"REQUIRED_FIELD_NOT_FOUND",
#             "message":"필수 컬럼(date/flow)을 찾지 못했습니다.",
#             "location":"columns"
#         }, debug

#     log(layer, "picked", date_col=date_col, flow_col=flow_col, cum_col=cum_col, unit_cols=unit_cols)
#     return date_col, {"flow": flow_col, "cum": cum_col}, unit_cols, None, debug


# def layer3_validate_values(
#     df: pd.DataFrame,
#     slotName: str,
#     date_col: str,
#     value_cols: Dict[str, Optional[str]],
#     unit_cols: List[str],
#     period_start: pd.Timestamp
# ) -> Tuple[Optional[dict], dict]:
#     layer = "L3"
#     debug: Dict[str, Any] = {}

#     flow_col = value_cols["flow"]
#     cum_col  = value_cols.get("cum")

#     ts_parsed, ts_rate, ts_dbg, ts_norm_str = normalize_datetime_series(df[date_col], period_start=period_start)
#     flow_num_rate = parse_rate_numeric(df[flow_col])

#     debug["ts_rate"] = ts_rate
#     debug["flow_num_rate"] = flow_num_rate
#     debug["time_granularity"] = detect_time_granularity(ts_parsed)

#     if ts_rate < TS_THRESHOLD:
#         return {"code":"TS_PARSE_LOW", "message":f"날짜 파싱률이 낮습니다(ts_rate={ts_rate:.2f}).", "location":f"col:{date_col}"}, debug
#     if flow_num_rate < NUM_THRESHOLD:
#         return {"code":"VALUE_PARSE_LOW", "message":f"flow 숫자 파싱률이 낮습니다(num_rate={flow_num_rate:.2f}).", "location":f"col:{flow_col}"}, debug
#     if debug["time_granularity"] is None:
#         return {"code":"TIME_GRAIN_UNKNOWN", "message":"시간 해상도를 판별할 수 없습니다.", "location":f"col:{date_col}"}, debug

#     # 단위 결정: unit_col -> flow_col_name -> slot_default
#     resolved_unit = None
#     for uc in unit_cols or []:
#         u = detect_unit_from_unit_column(df[uc])
#         if u:
#             resolved_unit = u
#             break
#     if resolved_unit is None:
#         u = detect_unit_from_name(flow_col)
#         if u:
#             resolved_unit = u
#     if resolved_unit is None:
#         if FAIL_IF_UNIT_UNRESOLVED:
#             return {"code":"UNIT_UNRESOLVED", "message":"단위를 판별할 수 없습니다.", "location":f"col:{flow_col}"}, debug
#         resolved_unit = SLOT_DEFAULT_UNIT[slotName]

#     # 단위 불일치 FAIL
#     expected_units = EXPECTED_UNITS_BY_SLOT[slotName]
#     if resolved_unit not in expected_units:
#         return {"code":"UNIT_MISMATCH", "message":f"단위 불일치: got={resolved_unit}, expected={list(expected_units)[0]}", "location":f"col:{flow_col}"}, debug

#     debug["_ts_parsed"] = ts_parsed
#     debug["resolved_unit"] = resolved_unit
#     return None, debug


# def align_ts_for_compare(ts: pd.Series, gran: str) -> pd.Series:
#     if gran in ("10min", "15min", "30min"):
#         mins = int(gran.replace("min", ""))
#         return ts.dt.floor(f"{mins}min")
#     if gran == "hourly":
#         return ts.dt.floor("h")
#     if gran == "day":
#         return ts.dt.floor("d")
#     return ts


# def layer4_validate_period_coverage(
#     df: pd.DataFrame,
#     ts_parsed: pd.Series,
#     flow_col: str,
#     period_start: pd.Timestamp,
#     period_end: pd.Timestamp,
#     granularity: str
# ) -> Tuple[Optional[dict], dict]:
#     layer = "L4"
#     debug: Dict[str, Any] = {}

#     ts_aligned = align_ts_for_compare(ts_parsed, granularity)
#     mask = (ts_aligned >= period_start) & (ts_aligned <= period_end)
#     df_in = df.loc[mask].copy()
#     ts_in = ts_aligned.loc[mask]

#     debug["rows_in_period"] = int(df_in.shape[0])
#     if df_in.shape[0] == 0:
#         return {"code":"PERIOD_OUT_OF_RANGE", "message":"지정 기간 내 데이터가 없습니다.", "location":"period"}, debug

#     dup_count = int(pd.Series(ts_in.dropna().values).duplicated().sum())
#     debug["duplicate_ts_count"] = dup_count
#     if FAIL_IF_DUP_TS and dup_count > 0:
#         return {"code":"DUPLICATE_TIMESTAMPS", "message":"기간 내 중복 timestamp가 존재합니다.", "location":"timestamp"}, debug

#     freq = granularity_to_pandas_freq(granularity)
#     if freq is None:
#         return {"code":"TIME_GRAIN_UNKNOWN", "message":"시간 해상도를 판별할 수 없습니다(L4).", "location":"time_granularity"}, debug

#     expected = pd.date_range(start=period_start, end=period_end, freq=freq)
#     exp_set = set(expected.tolist())
#     act_set = set(ts_in.dropna().tolist())
#     missing = sorted(list(exp_set - act_set))
#     if missing:
#         return {"code":"PERIOD_MISSING_TIMESTAMPS", "message":"기간 내 timestamp 누락", "location":"timestamp"}, debug

#     val_num = pd.to_numeric(df_in[flow_col].astype(str).str.replace(",", "", regex=False).str.strip(), errors="coerce")
#     if int(val_num.isna().sum()) > 0:
#         return {"code":"PERIOD_VALUE_MISSING", "message":"기간 내 flow 값 누락", "location":f"col:{flow_col}"}, debug

#     log(layer, "PASS", rows_in_period=debug["rows_in_period"])
#     return None, debug


def validate_structured_upstream(obj: dict, file_path: str = "upstream.xlsx"):
    log("MAIN", "start", slotName=obj.get("slotName"), file_path=file_path)

    # L0
    norm, err0 = layer0_input_sanity(obj)
    if err0:
        return fail_output_strict(file_path, err0["code"], err0["message"], err0["location"])

    slotName = norm["slotName"]
    df_raw = norm["dataframe"]
    ps, pe = norm["_period_start_ts"], norm["_period_end_ts"]

    # L1
    df, err1 = layer1_normalize_df(df_raw)
    if err1:
        return fail_output_strict(file_path, err1["code"], err1["message"], err1["location"])

    # L2
    date_col, value_cols, unit_cols, err2, _ = layer2_find_columns(df, slotName=slotName)
    if err2:
        return fail_output_strict(file_path, err2["code"], err2["message"], err2["location"])

    # L3
    err3, dbg3 = layer3_validate_values(df, slotName, date_col, value_cols, unit_cols, ps)
    if err3:
        return fail_output_strict(file_path, err3["code"], err3["message"], err3["location"])

    # L4
    err4, _ = layer4_validate_period_coverage(df, dbg3["_ts_parsed"], value_cols["flow"], ps, pe, dbg3["time_granularity"])
    if err4:
        return fail_output_strict(file_path, err4["code"], err4["message"], err4["location"])

    # 여기서 output 스키마/단위 스키마를 "하나의 함수"로 통일 생성
    validated_fields, unit_schema, rename_map = build_output_schema(
        df=df,
        slotName=slotName,
        date_col=date_col,
        flow_col=value_cols["flow"],
        cum_col=value_cols.get("cum"),
        resolved_flow_unit=dbg3["resolved_unit"]
    )

    payload_time_gran = dbg3["time_granularity"]
    return pass_output_strict(file_path, payload_time_gran, unit_schema, validated_fields)


테스트 셀들(12~19)

Cell 12 — 테스트 데이터 생성기(현실성 + 규모)
역할

전기(15분, 8832행), 가스/수도(시간, 2208행) 같은 현실적 규모로 df를 만든다.

“파일 I/O 없이도” 파이프라인 검증 가능.

In [13]:
# ===== Cell 12: Test data generators =====

def make_ts_range(period_start: str, period_end: str, freq: str) -> pd.DatetimeIndex:
    ps = pd.to_datetime(period_start)
    pe = pd.to_datetime(period_end)
    return pd.date_range(ps, pe, freq=freq)

def gen_electricity_df(period_start: str, period_end: str) -> pd.DataFrame:
    ts = make_ts_range(period_start, period_end, "15min")
    n = len(ts)
    df = pd.DataFrame({
        "date": ts,
        "Usage_kWh": np.round(np.random.uniform(50, 180, n), 3),
        "Lagging_Current_Reactive.Power_kVarh": np.round(np.random.uniform(0, 30, n), 3),
        "Leading_Current_Reactive_Power_kVarh": np.round(np.random.uniform(0, 30, n), 3),
        "Lagging_Current_Power_Factor": np.round(np.random.uniform(0.7, 1.0, n), 3),
        "Leading_Current_Power_Factor": np.round(np.random.uniform(0.7, 1.0, n), 3),
    })
    return df

def gen_water_df(period_start: str, period_end: str) -> pd.DataFrame:
    ts = make_ts_range(period_start, period_end, "h")
    n = len(ts)
    flow = np.round(np.random.uniform(5, 40, n), 3)
    cum = np.round(np.cumsum(flow), 3)
    df = pd.DataFrame({
        "timestamp": ts,
        "flow_m3": flow,
        "cumulative_meter_m3": cum
    })
    return df

def gen_citygas_df(period_start: str, period_end: str) -> pd.DataFrame:
    ts = make_ts_range(period_start, period_end, "h")
    n = len(ts)
    flow = np.round(np.random.uniform(10, 60, n), 3)
    calorific = np.round(np.random.uniform(38, 43, n), 3)  # MJ/m3 근사
    energy = np.round(flow * calorific, 3)                  # MJ
    co2e = np.round(energy * 0.000056, 6)                   # 임의 계수(테스트용)
    df = pd.DataFrame({
        "timestamp": ts,
        "flow_m3": flow,
        "calorific_MJ_per_m3": calorific,
        "energy_MJ": energy,
        "CO2e_t": co2e,
    })
    return df


Cell 13 — PASS 테스트 3종(전기/수도/가스)
역할

“실제 운영 성공 케이스” 3개를 한 번에 보여줌

In [14]:
# ===== Cell 13: PASS tests (3 slots) =====

PERIOD_START = "2025-10-01T00:00:00"
PERIOD_END   = "2025-12-31T04:00:00"

# (1) 전기 PASS
obj_e = {
    "slotName": "electricity_usage",
    "kind": "EXCEL",
    "ext": "xlsx",
    "period_start": PERIOD_START,
    "period_end": PERIOD_END,
    "dataframe": gen_electricity_df(PERIOD_START, PERIOD_END)
}
print("=== [PASS] 전기요금 ===")
res = validate_structured_upstream(obj_e, file_path="성광벤드_전기요금_검증용_정형증빙.xlsx")
print_full(res)

# (2) 수도 PASS
obj_w = {
    "slotName": "water_usage",
    "kind": "EXCEL",
    "ext": "xlsx",
    "period_start": PERIOD_START,
    "period_end": PERIOD_END,
    "dataframe": gen_water_df(PERIOD_START, PERIOD_END)
}
print("\n=== [PASS] 수도요금 ===")
res = validate_structured_upstream(obj_w, file_path="성광벤드_수도요금_검증용_정형증빙.xlsx")
print_full(res)

# (3) 가스 PASS
obj_g = {
    "slotName": "citygas_usage",
    "kind": "EXCEL",
    "ext": "xlsx",
    "period_start": PERIOD_START,
    "period_end": PERIOD_END,
    "dataframe": gen_citygas_df(PERIOD_START, PERIOD_END)
}
print("\n=== [PASS] 도시가스요금 ===")
res = validate_structured_upstream(obj_g, file_path="성광벤드_도시가스요금_검증용_정형증빙.xlsx")
print_full(res)


=== [PASS] 전기요금 ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: 성광벤드_전기요금_검증용_정형증빙.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (8753, 6)
  - period_start: 2025-10-01 00:00:00
  - period_end: 2025-12-31 04:00:00
[L1] PASS
  - columns: ['date', 'Usage_kWh', 'Lagging_Current_Reactive.Power_kVarh', 'Leading_Current_Reactive_Power_kVarh', 'Lagging_Current_Power_Factor', 'Leading_Current_Power_Factor']
[L2] picked
  - date_col: date
  - flow_col: Usage_kWh
  - cum_col: None
  - unit_cols: ['Usage_kWh', 'Lagging_Current_Reactive.Power_kVarh', 'Leading_Current_Reactive_Power_kVarh']
[L4] PASS
  - rows_in_period: 8753
{
  "status": "PASS",
  "file_path": "성광벤드_전기요금_검증용_정형증빙.xlsx",
  "payload": {
    "time_granularity": "15min",
    "unit_schema": [
      "time",
      "kWh",
      "-",
      "-",
      "-",
      "-"
    ],
    "validated_fields": [
      "timestamp",
      "flow_kwh",
      "Lagging_Current_Reactive.Power_kVarh",
      "Leading_Current_Reactive_Po

Cell 14 — FAIL(L0) 테스트 2종
역할

입력 단계에서 FAIL이 제대로 발생하는지 증명

In [15]:
# ===== Cell 14: FAIL tests - L0 =====

print("=== [FAIL 기대] L0 - MISSING_KEYS ===")
obj = {"kind":"EXCEL","ext":"xlsx","period_start":PERIOD_START,"period_end":PERIOD_END,"dataframe":pd.DataFrame({"a":[1],"b":[2]})}
print_full(validate_structured_upstream(obj, file_path="L0_missing_keys.xlsx"))

print("\n=== [FAIL 기대] L0 - BAD_DF ===")
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":PERIOD_START,"period_end":PERIOD_END,"dataframe":"NOT_DF"}
print_full(validate_structured_upstream(obj, file_path="L0_bad_df.xlsx"))


=== [FAIL 기대] L0 - MISSING_KEYS ===
[MAIN] start
  - slotName: None
  - file_path: L0_missing_keys.xlsx
[L0] MISSING_KEYS
  - missing: ['slotName']
{
  "status": "FAIL",
  "error": {
    "code": "MISSING_KEYS",
    "message": "필수 키 누락: ['slotName']",
    "location": "input"
  },
  "file_path": "L0_missing_keys.xlsx",
  "processed_at": "2026-01-21T00:29:36Z"
}

=== [FAIL 기대] L0 - BAD_DF ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L0_bad_df.xlsx
[L0] BAD_DF_TYPE
  - got: <class 'str'>
{
  "status": "FAIL",
  "error": {
    "code": "BAD_DF",
    "message": "dataframe이 pandas.DataFrame이 아닙니다.",
    "location": "input.dataframe"
  },
  "file_path": "L0_bad_df.xlsx",
  "processed_at": "2026-01-21T00:29:36Z"
}


Cell 15 — FAIL(L1) 중복/빈 컬럼명
역할

L1 품질 게이트(중복/빈 컬럼명)가 실제로 FAIL을 만드는지 증명

In [16]:
# ===== Cell 15: FAIL tests - L1 =====

print("=== [FAIL 기대] L1 - 중복 컬럼명(strip 후 동일) ===")
df = pd.DataFrame({"A":[1]*10, " A ":[2]*10, "B":[3]*10})
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":PERIOD_START,"period_end":PERIOD_END,"dataframe":df}
print_full(validate_structured_upstream(obj, file_path="L1_dup_cols.xlsx"))

print("\n=== [FAIL 기대] L1 - 빈 컬럼명 ===")
df = pd.DataFrame({"":[1]*10, "B":[2]*10})
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":PERIOD_START,"period_end":PERIOD_END,"dataframe":df}
print_full(validate_structured_upstream(obj, file_path="L1_empty_col.xlsx"))


=== [FAIL 기대] L1 - 중복 컬럼명(strip 후 동일) ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L1_dup_cols.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (10, 3)
  - period_start: 2025-10-01 00:00:00
  - period_end: 2025-12-31 04:00:00
[L1] DUPLICATE_COLUMNS_AFTER_NORMALIZE
  - columns: ['A', 'A', 'B']
{
  "status": "FAIL",
  "error": {
    "code": "L1_INVALID_COLUMNS",
    "message": "L1 정규화 이후 중복 컬럼명이 발생했습니다(strip 후 동일).",
    "location": "df.columns"
  },
  "file_path": "L1_dup_cols.xlsx",
  "processed_at": "2026-01-21T00:29:37Z"
}

=== [FAIL 기대] L1 - 빈 컬럼명 ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L1_empty_col.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (10, 2)
  - period_start: 2025-10-01 00:00:00
  - period_end: 2025-12-31 04:00:00
[L1] EMPTY_COLUMN_NAME
  - columns: ['', 'B']
{
  "status": "FAIL",
  "error": {
    "code": "L1_INVALID_COLUMNS",
    "message": "빈 컬럼명이 존재합니다(strip 후 '').",
    "location": "df.columns"
  },
  "fi

Cell 16 — FAIL(L2) 필수 컬럼 탐지 실패
역할

L2에서 date/flow 후보를 못 찾으면 REQUIRED_FIELD_NOT_FOUND로 FAIL 나는지 증명

In [17]:
# ===== Cell 16: FAIL tests - L2 =====

print("=== [FAIL 기대] L2 - REQUIRED_FIELD_NOT_FOUND ===")
df = pd.DataFrame({
    "비정상_날짜필드": ["x"]*480,
    "알수없는_수치": ["foo"]*480,
    "메모": ["memo"]*480
})
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":"2026-02-01T00:00:00","period_end":"2026-02-05T23:45:00","dataframe":df}
print_full(validate_structured_upstream(obj, file_path="L2_required_not_found.xlsx"))


=== [FAIL 기대] L2 - REQUIRED_FIELD_NOT_FOUND ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L2_required_not_found.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (480, 3)
  - period_start: 2026-02-01 00:00:00
  - period_end: 2026-02-05 23:45:00
[L1] PASS
  - columns: ['비정상_날짜필드', '알수없는_수치', '메모']
[L2] AUTO_LLM_TRIGGERED
  - reason: rule_missing
  - USE_LLM: False
{
  "status": "FAIL",
  "error": {
    "code": "REQUIRED_FIELD_NOT_FOUND",
    "message": "필수 컬럼(date/flow)을 찾지 못했습니다.",
    "location": "columns"
  },
  "file_path": "L2_required_not_found.xlsx",
  "processed_at": "2026-01-21T00:29:38Z"
}


Cell 17 — FAIL(L3) 단위 불일치(전기인데 m3)
역할

L3에서 단위 mismatch가 제대로 FAIL 처리되는지 증명

In [18]:
# ===== Cell 17: FAIL tests - L3 (UNIT_MISMATCH) =====

print("=== [FAIL 기대] L3 - UNIT_MISMATCH ===")
ts = pd.date_range("2026-01-10 00:00:00", "2026-01-23 23:45:00", freq="15min")
df = pd.DataFrame({
    "일시": ts,
    "사용량(m3)": np.round(np.random.uniform(10, 60, len(ts)), 3),  # 전기인데 m3로 들어온 케이스
    "비고": ["x"]*len(ts)
})
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":"2026-01-10T00:00:00","period_end":"2026-01-23T23:45:00","dataframe":df}
print_full(validate_structured_upstream(obj, file_path="L3_unit_mismatch.xlsx"))


=== [FAIL 기대] L3 - UNIT_MISMATCH ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L3_unit_mismatch.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (1344, 3)
  - period_start: 2026-01-10 00:00:00
  - period_end: 2026-01-23 23:45:00
[L1] PASS
  - columns: ['일시', '사용량(m3)', '비고']
[L2] picked
  - date_col: 일시
  - flow_col: 사용량(m3)
  - cum_col: None
  - unit_cols: []
{
  "status": "FAIL",
  "error": {
    "code": "UNIT_MISMATCH",
    "message": "단위 불일치: got=m3, expected=kWh",
    "location": "col:사용량(m3)"
  },
  "file_path": "L3_unit_mismatch.xlsx",
  "processed_at": "2026-01-21T00:29:39Z"
}


Cell 18 — FAIL(L4) 기간 내 timestamp 1개 누락
역할

L4에서 “기간 커버리지 100%”가 실제로 강제되는지 증명

In [19]:
# ===== Cell 18: FAIL tests - L4 (missing timestamp) =====

print("=== [FAIL 기대] L4 - PERIOD_MISSING_TIMESTAMPS ===")
ps = "2026-01-24T00:00:00"
pe = "2026-01-31T23:45:00"
ts = pd.date_range(pd.to_datetime(ps), pd.to_datetime(pe), freq="15min")
df = pd.DataFrame({
    "일시": ts,
    "사용량(kWh)": np.round(np.random.uniform(50, 180, len(ts)), 3),
    "역률": np.round(np.random.uniform(0.7, 1.0, len(ts)), 3),
})
df = df.drop(index=100).reset_index(drop=True)  # timestamp 1개 제거
obj = {"slotName":"electricity_usage","kind":"EXCEL","ext":"xlsx","period_start":ps,"period_end":pe,"dataframe":df}
print_full(validate_structured_upstream(obj, file_path="L4_missing_timestamp.xlsx"))


=== [FAIL 기대] L4 - PERIOD_MISSING_TIMESTAMPS ===
[MAIN] start
  - slotName: electricity_usage
  - file_path: L4_missing_timestamp.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (767, 3)
  - period_start: 2026-01-24 00:00:00
  - period_end: 2026-01-31 23:45:00
[L1] PASS
  - columns: ['일시', '사용량(kWh)', '역률']
[L2] picked
  - date_col: 일시
  - flow_col: 사용량(kWh)
  - cum_col: None
  - unit_cols: ['사용량(kWh)']
{
  "status": "FAIL",
  "error": {
    "code": "PERIOD_MISSING_TIMESTAMPS",
    "message": "기간 내 timestamp 누락",
    "location": "timestamp"
  },
  "file_path": "L4_missing_timestamp.xlsx",
  "processed_at": "2026-01-21T00:29:40Z"
}


Cell 20 - 테스트용 셀 (19보다 먼저 실행)

In [23]:
# ===== Cell 20: LLM Stub/Spy utilities =====

_LLM_ORIG_FUNCS = {}
_LLM_STUB_INSTALLED = False
_LLM_SPY_INSTALLED = False

_LLM_CALL_COUNTER = {"n": 0}

def llm_calls() -> int:
    return int(_LLM_CALL_COUNTER["n"])

def reset_llm_calls():
    _LLM_CALL_COUNTER["n"] = 0

def install_llm_stub(mapping: dict, llm_func_name: str = "llm_classify_columns_min_tokens"):
    """
    llm_classify_columns_min_tokens를 스텁으로 바꿔서
    항상 {"mapping": mapping}을 반환하게 함.
    - mapping 예: {"A":"date","B":"flow","C":"other"}
    """
    global _LLM_STUB_INSTALLED

    if llm_func_name not in globals():
        raise NameError(f"{llm_func_name} is not defined in globals(). 먼저 Cell 7(LLM 함수) 정의를 확인하세요.")

    if llm_func_name not in _LLM_ORIG_FUNCS:
        _LLM_ORIG_FUNCS[llm_func_name] = globals()[llm_func_name]

    def _stub(columns, df, slotName):
        _LLM_CALL_COUNTER["n"] += 1
        log("LLM_STUB", "CALLED", n=_LLM_CALL_COUNTER["n"], slotName=slotName, columns=len(columns))
        # 스텁은 무조건 매핑만 반환(설명 문장 X)
        return {"mapping": dict(mapping)}

    globals()[llm_func_name] = _stub
    _LLM_STUB_INSTALLED = True
    print("LLM stub installed.")

def install_llm_spy(llm_func_name: str = "llm_classify_columns_min_tokens"):
    """
    실제 llm_classify_columns_min_tokens 호출 횟수만 체크하는 spy.
    (원본을 감싸서 counter만 증가)
    """
    global _LLM_SPY_INSTALLED

    if llm_func_name not in globals():
        raise NameError(f"{llm_func_name} is not defined in globals().")

    if llm_func_name not in _LLM_ORIG_FUNCS:
        _LLM_ORIG_FUNCS[llm_func_name] = globals()[llm_func_name]

    orig = _LLM_ORIG_FUNCS[llm_func_name]

    def _spy(columns, df, slotName):
        _LLM_CALL_COUNTER["n"] += 1
        log("LLM_SPY", "CALLED", n=_LLM_CALL_COUNTER["n"], slotName=slotName, columns=len(columns))
        return orig(columns, df, slotName)

    globals()[llm_func_name] = _spy
    _LLM_SPY_INSTALLED = True
    print("LLM spy installed.")

def restore_llm(llm_func_name: str = "llm_classify_columns_min_tokens"):
    """
    stub/spy로 교체한 함수를 원복
    """
    global _LLM_STUB_INSTALLED, _LLM_SPY_INSTALLED

    if llm_func_name in _LLM_ORIG_FUNCS:
        globals()[llm_func_name] = _LLM_ORIG_FUNCS[llm_func_name]
        del _LLM_ORIG_FUNCS[llm_func_name]

    _LLM_STUB_INSTALLED = False
    _LLM_SPY_INSTALLED = False
    print("Restored.")


Cell 19 — LLM “호출 여부”를 확실히 증명하는 테스트(스파이/스텁 이용)
역할

팀원이 제일 많이 묻는 게 이거임:
“LLM이 진짜 호출되긴 해?”

이 셀은 force_llm=True로 LLM 경로를 강제로 태우고,
Cell 20의 spy/stub 카운트로 ‘진짜 호출됐다’를 증명한다.

In [24]:
# ===== Cell 19: LLM call proof (NO force_llm param) =====

reset_llm_calls()
USE_LLM = True

# 1) LLM이 항상 date/flow를 이렇게 찍도록 스텁 설치
install_llm_stub(mapping={"A":"date", "B":"flow", "C":"other"}, llm_func_name="llm_classify_columns_min_tokens")

# 2) 룰/폴백이 못 맞추게: 컬럼명 A,B,C + 데이터도 일부러 애매하게(신호 부족)
#    - A: 날짜처럼 보이지만 일부는 깨짐 -> dt_rate 낮게
#    - B: 숫자처럼 보이지만 일부는 문자 -> num_rate 낮게
ps = "2026-02-01T00:00:00"
pe = "2026-02-05T23:45:00"
ts = pd.date_range(pd.to_datetime(ps), pd.to_datetime(pe), freq="15min")

A = ts.astype(str).tolist()
# 날짜 일부 깨기
for i in np.random.choice(len(A), size=40, replace=False):
    A[i] = "날짜아님"

B = np.round(np.random.uniform(10, 60, len(ts)), 3).astype(object)
# 숫자 일부 깨기
for i in np.random.choice(len(B), size=40, replace=False):
    B[i] = "수치아님"

df = pd.DataFrame({"A": A, "B": B, "C": ["memo"]*len(ts)})

# 3) L2 호출: 현재 너의 L2는 (rule 부족 && USE_LLM=True)면 LLM 호출함
date_col, value_cols, unit_cols, err2, dbg = layer2_find_columns(df, slotName="electricity_usage")

print("used_llm =", dbg.get("used_llm"))
print("date_col =", date_col)
print("flow_col =", value_cols.get("flow"))
print("LLM_CALLS =", llm_calls())
print("err2 =", err2)

restore_llm()
USE_LLM = False

LLM stub installed.
[L2] AUTO_LLM_TRIGGERED
  - reason: rule_missing
  - USE_LLM: True
[LLM_STUB] CALLED
  - n: 1
  - slotName: electricity_usage
  - columns: 3
[L2] picked
  - date_col: A
  - flow_col: B
  - cum_col: None
  - unit_cols: []
used_llm = True
date_col = A
flow_col = B
LLM_CALLS = 1
err2 = None
Restored.


llm 최종 확인

In [25]:
# (1) Cell 20 먼저 실행된 상태여야 함
reset_llm_calls()
USE_LLM = True

# (2) LLM이 date/flow를 강제로 찍게 stub
install_llm_stub(mapping={"A":"date", "B":"flow", "C":"other"})

# (3) 애매한 컬럼명 A/B/C + 값은 "통과 가능하게" 만든 df
ps = "2026-02-01T00:00:00"
pe = "2026-02-05T23:45:00"
ts = pd.date_range(pd.to_datetime(ps), pd.to_datetime(pe), freq="15min")

df = pd.DataFrame({
    "A": ts,  # 날짜 신호 확실
    "B": np.round(np.random.uniform(50, 180, len(ts)), 3),  # 숫자 신호 확실
    "C": ["memo"] * len(ts)
})

obj = {
    "slotName": "electricity_usage",
    "kind": "EXCEL",
    "ext": "xlsx",
    "period_start": ps,
    "period_end": pe,
    "dataframe": df
}

res = validate_structured_upstream(obj, file_path="LLM_FINAL_OUTPUT.xlsx")
print_full(res)

print("LLM_CALLS =", llm_calls())

restore_llm()
USE_LLM = False


LLM stub installed.
[MAIN] start
  - slotName: electricity_usage
  - file_path: LLM_FINAL_OUTPUT.xlsx
[L0] PASS
  - slotName: electricity_usage
  - shape: (480, 3)
  - period_start: 2026-02-01 00:00:00
  - period_end: 2026-02-05 23:45:00
[L1] PASS
  - columns: ['A', 'B', 'C']
[L2] AUTO_LLM_TRIGGERED
  - reason: rule_missing
  - USE_LLM: True
[LLM_STUB] CALLED
  - n: 1
  - slotName: electricity_usage
  - columns: 3
[L2] picked
  - date_col: A
  - flow_col: B
  - cum_col: None
  - unit_cols: []
[L4] PASS
  - rows_in_period: 480
{
  "status": "PASS",
  "file_path": "LLM_FINAL_OUTPUT.xlsx",
  "payload": {
    "time_granularity": "15min",
    "unit_schema": [
      "time",
      "kWh",
      "-"
    ],
    "validated_fields": [
      "timestamp",
      "flow_kwh",
      "C"
    ]
  },
  "processed_at": "2026-01-21T00:38:44Z"
}
LLM_CALLS = 1
Restored.
