## 정형 지표화

In [None]:
import numpy as np
import pandas as pd
from datetime import datetime, UTC
from typing import Optional, Any

# ─────────────────────────────────────────────────────────────────────────────
# 정형 데이터 지표 레지스트리
# ─────────────────────────────────────────────────────────────────────────────

STRUCTURED_REGISTRY = {
    "electricity_usage": {
        "display_name": "전력 사용량",
        "constants": {},
        "metrics": [
            {"metric_key": "electricity_total_kwh", "op": "sum", "field": "flow_kwh", "unit": "kwh", "is_per_hour": False},
            {"metric_key": "electricity_avg_hourly_kwh", "op": "mean", "field": "flow_kwh", "unit": "kwh_per_hour", "is_per_hour": True},
            {"metric_key": "electricity_peak_hourly_kwh", "op": "max", "field": "flow_kwh", "unit": "kwh_per_hour", "is_per_hour": True},
        ],
    },
    "citygas_usage": {
        "display_name": "도시가스 사용량",
        "constants": {"gj_per_m3": 0.043, "tco2e_per_m3": 0.00245},
        "metrics": [
            {"metric_key": "citygas_total_m3", "op": "sum", "field": "flow_m3", "unit": "m3", "is_per_hour": False},
            {"metric_key": "citygas_avg_hourly_m3", "op": "mean", "field": "flow_m3", "unit": "m3_per_hour", "is_per_hour": True},
            {"metric_key": "citygas_peak_hourly_m3", "op": "max", "field": "flow_m3", "unit": "m3_per_hour", "is_per_hour": True},
            {"metric_key": "citygas_total_energy_gj", "op": "sum_mul_const", "field": "flow_m3", "const": "gj_per_m3", "unit": "gj", "is_per_hour": False},
            {"metric_key": "scope1_total_tco2e", "op": "sum_mul_const", "field": "flow_m3", "const": "tco2e_per_m3", "unit": "tco2e", "is_per_hour": False},
        ],
    },
    "water_usage": {
        "display_name": "수도 사용량",
        "constants": {},
        "metrics": [
            {"metric_key": "water_total_m3", "op": "sum", "field": "flow_m3", "unit": "m3", "is_per_hour": False},
            {"metric_key": "water_avg_hourly_m3", "op": "mean", "field": "flow_m3", "unit": "m3_per_hour", "is_per_hour": True},
            {"metric_key": "water_peak_hourly_m3", "op": "max", "field": "flow_m3", "unit": "m3_per_hour", "is_per_hour": True},
            {"metric_key": "water_usage_cv", "op": "cv", "field": "flow_m3", "unit": "ratio", "is_per_hour": False},
        ],
    },
}

# ─────────────────────────────────────────────────────────────────────────────
# 연산 함수
# ─────────────────────────────────────────────────────────────────────────────

def aggregate(series: np.ndarray, op: str, constant: Optional[float] = None) -> Optional[float]:
    """집계 연산 수행"""
    ops = {
        "sum": lambda s, c: float(np.nansum(s)),
        "mean": lambda s, c: float(np.nanmean(s)),
        "max": lambda s, c: float(np.nanmax(s)),
        "min": lambda s, c: float(np.nanmin(s)),
        "count": lambda s, c: float(np.count_nonzero(~np.isnan(s))),
        "cv": lambda s, c: float(np.nanstd(s) / np.nanmean(s)) if np.nanmean(s) != 0 else None,
        "sum_mul_const": lambda s, c: float(np.nansum(s) * c) if c else None,
    }
    return ops.get(op, lambda s, c: None)(series, constant)


def parse_granularity_hours(granularity: str) -> float:
    """시간 세분화 -> 시간 단위 변환"""
    mapping = {"10min": 10/60, "15min": 15/60, "30min": 30/60, "hourly": 1.0, "day": 24.0, "week": 168.0, "month": 720.0}
    return mapping.get((granularity or "").strip().lower(), 1.0)


def convert_unit(from_unit: str, to_unit: str) -> float:
    """단위 변환 배수 반환"""
    conversions = {("wh", "kwh"): 0.001, ("mwh", "kwh"): 1000.0, ("l", "m3"): 0.001, ("m3", "l"): 1000.0}
    f, t = (from_unit or "").lower(), (to_unit or "").lower()
    return conversions.get((f, t), 1.0) if f and f not in ("-", "time") and f != t else 1.0


# ─────────────────────────────────────────────────────────────────────────────
# 정형 데이터 지표화 메인 함수
# ─────────────────────────────────────────────────────────────────────────────

def calculate_structured_metrics(input1: dict, input2: dict) -> dict:
    """
    정형 데이터 지표화
    
    Args:
        input1: {slotName, kind, ext, period_start, period_end, dataframe}
        input2: {status, file_path, payload: {time_granularity, unit_schema, validated_fields}, processed_at}
    
    Returns:
        지표화 결과 딕셔너리
    """
    slot_name = input1["slotName"]
    if slot_name not in STRUCTURED_REGISTRY:
        raise ValueError(f"Unknown slot: {slot_name}")
    
    config = STRUCTURED_REGISTRY[slot_name]
    df = input1["dataframe"].copy()
    payload = input2["payload"]
    
    # 1) 컬럼명 교체
    validated_fields = payload.get("validated_fields", [])
    if len(validated_fields) == len(df.columns):
        df.columns = validated_fields
    
    # 2) timestamp 처리 및 기간 필터링
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
        df = df.dropna(subset=["timestamp"])
        
        start = pd.to_datetime(input1.get("period_start"), errors="coerce")
        end = pd.to_datetime(input1.get("period_end"), errors="coerce")
        if pd.notna(start):
            df = df[df["timestamp"] >= start]
        if pd.notna(end):
            df = df[df["timestamp"] < end]
    
    if df.empty:
        return _build_structured_output(input1, input2, [])
    
    # 3) 단위 변환
    unit_schema = payload.get("unit_schema", [])
    unit_map = {validated_fields[i]: unit_schema[i] for i in range(min(len(validated_fields), len(unit_schema)))}
    
    for m in config["metrics"]:
        field = m["field"]
        if field in df.columns:
            target_unit = m["unit"].replace("_per_hour", "") if m["unit"].endswith("_per_hour") else m["unit"]
            mult = convert_unit(unit_map.get(field, "-"), target_unit)
            df[field] = pd.to_numeric(df[field], errors="coerce") * mult
    
    # 4) 지표 계산
    constants = config.get("constants", {})
    granularity = payload.get("time_granularity", "hourly")
    interval_hours = parse_granularity_hours(granularity)
    
    metrics = []
    for m in config["metrics"]:
        field = m["field"]
        if field not in df.columns:
            continue
        
        series = df[field].to_numpy(dtype=float)
        const_val = constants.get(m.get("const")) if m["op"] == "sum_mul_const" else None
        value = aggregate(series, m["op"], const_val)
        
        # per_hour 처리
        if m.get("is_per_hour") and value is not None:
            if m["op"] in ("sum", "sum_mul_const"):
                value = value / interval_hours if interval_hours else value
            elif m["op"] == "max" and "timestamp" in df.columns:
                hourly = df.set_index("timestamp")[field].resample("H").sum()
                value = float(np.nanmax(hourly.to_numpy())) if len(hourly) else None
            elif m["op"] == "mean" and "timestamp" in df.columns:
                hourly = df.set_index("timestamp")[field].resample("H").sum()
                value = float(np.nanmean(hourly.to_numpy())) if len(hourly) else None
        
        metric_key = f"{m['metric_key']} ({granularity})" if m.get("unit") == "ratio" else m["metric_key"]
        metrics.append({"metric_key": metric_key, "value": value, "unit": m["unit"], "source_field": field})
    
    return _build_structured_output(input1, input2, metrics)


def _build_structured_output(input1: dict, input2: dict, metrics: list) -> dict:
    """정형 데이터 출력 구조 생성"""
    return {
        "status": "OK",
        "data_type": "structured",
        "period": {"start": input1.get("period_start"), "end": input1.get("period_end")},
        "source": {"slotName": input1["slotName"], "file_path": input2.get("file_path"), "kind": input1.get("kind")},
        "metrics": metrics,
        "processed_at": input2.get("processed_at") or datetime.now(UTC).isoformat(),
    }


# ─────────────────────────────────────────────────────────────────────────────
# 테스트
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # 샘플 데이터 생성
    sample_df = df = pd.read_excel('test_file/xlsx_elec.xlsx')
    
    test_input1 = {
        "slotName": "electricity_usage",
        "kind": "EXCEL",
        "ext": "xlsx",
        "period_start": "2025-10-01T00:00:00",
        "period_end": "2025-12-31T04:00:00",
        "dataframe": sample_df
    }
    
    test_input2 = {
        "status": "PASS",
        "file_path": "성광벤드_전기요금_검증용_정형증빙.xlsx",
        "payload": {
            "time_granularity": "15min",
            "unit_schema": ["time", "kWh", "-", "-", "-", "-"],
            "validated_fields": ["timestamp", "flow_kwh", "Lagging_Current_Reactive_Power_kVarh", 
                                 "Leading_Current_Reactive_Power_kVarh", "Lagging_Current_Power_Factor", 
                                 "Leading_Current_Power_Factor"]
        },
        "processed_at": "2026-01-20T07:03:51Z"
    }
    
    result = calculate_structured_metrics(test_input1, test_input2)
    print("=== 정형 데이터 지표화 결과 ===")
    import json
    print(json.dumps(result, indent=2, ensure_ascii=False, default=str))

=== 정형 데이터 지표화 결과 ===
{
  "status": "OK",
  "data_type": "structured",
  "period": {
    "start": "2025-10-01T00:00:00",
    "end": "2025-12-31T04:00:00"
  },
  "source": {
    "slotName": "electricity_usage",
    "file_path": "성광벤드_전기요금_검증용_정형증빙.xlsx",
    "kind": "EXCEL"
  },
  "metrics": [
    {
      "metric_key": "electricity_total_kwh",
      "value": 134339.9999999997,
      "unit": "kwh",
      "source_field": "flow_kwh"
    },
    {
      "metric_key": "electricity_avg_hourly_kwh",
      "value": 186.58333333333292,
      "unit": "kwh_per_hour",
      "source_field": "flow_kwh"
    },
    {
      "metric_key": "electricity_peak_hourly_kwh",
      "value": 303.710042310757,
      "unit": "kwh_per_hour",
      "source_field": "flow_kwh"
    }
  ],
  "processed_at": "2026-01-20T07:03:51Z"
}


  hourly = df.set_index("timestamp")[field].resample("H").sum()
  hourly = df.set_index("timestamp")[field].resample("H").sum()


## 비정형 지표화

In [None]:
import os
import json
import re
from datetime import datetime, UTC
from typing import Any, Optional
from dotenv import load_dotenv
from openai import OpenAI

# .env 로드
load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL1", "gpt-4o-mini")

# ─────────────────────────────────────────────────────────────────────────────
# 비정형 데이터 지표 레지스트리
# ─────────────────────────────────────────────────────────────────────────────

UNSTRUCTURED_REGISTRY = {
    "iso_14001_certificate": {
        "display_name": "ISO 14001 환경경영시스템 인증서",
        "category": "E",
        "extraction_prompt": "ISO 14001 인증서에서 다음 정보를 추출하세요.",
        "summary_prompt": "인증 범위, 적용 사업장, 특이사항 등 지표에 포함되지 않은 중요 정보를 요약하세요.",
        "metrics": [
            {"metric_key": "iso14001_certified", "type": "boolean", "prompt_hint": "유효한 ISO 14001 인증서인지 확인"},
            {"metric_key": "iso14001_cert_number", "type": "text", "prompt_hint": "인증서 번호"},
            {"metric_key": "iso14001_issue_date", "type": "date", "prompt_hint": "발급일 (YYYY-MM-DD)"},
            {"metric_key": "iso14001_expiry_date", "type": "date", "prompt_hint": "만료일 (YYYY-MM-DD)"},
            {"metric_key": "iso14001_certifying_body", "type": "text", "prompt_hint": "인증기관명"},
        ],
    },
    "iso_45001_certificate": {
        "display_name": "ISO 45001 안전보건경영시스템 인증서",
        "category": "S",
        "extraction_prompt": "ISO 45001 인증서에서 다음 정보를 추출하세요.",
        "summary_prompt": "인증 범위, 적용 사업장, 안전보건 관련 특이사항 등을 요약하세요.",
        "metrics": [
            {"metric_key": "iso45001_certified", "type": "boolean", "prompt_hint": "유효한 ISO 45001 인증서인지 확인"},
            {"metric_key": "iso45001_cert_number", "type": "text", "prompt_hint": "인증서 번호"},
            {"metric_key": "iso45001_issue_date", "type": "date", "prompt_hint": "발급일 (YYYY-MM-DD)"},
            {"metric_key": "iso45001_expiry_date", "type": "date", "prompt_hint": "만료일 (YYYY-MM-DD)"},
            {"metric_key": "iso45001_scope", "type": "text", "prompt_hint": "인증 범위"},
        ],
    },
    "safety_training_log": {
        "display_name": "안전보건교육일지",
        "category": "S",
        "extraction_prompt": "안전보건교육일지에서 교육 정보를 추출하세요.",
        "summary_prompt": "교육 내용의 핵심 사항, 교육 대상 부서, 강사 정보, 특이사항 등을 요약하세요.",
        "metrics": [
            {"metric_key": "training_date", "type": "date", "prompt_hint": "교육 실시일 (YYYY-MM-DD)"},
            {"metric_key": "training_hours", "type": "number", "unit": "hours", "prompt_hint": "총 교육 시간"},
            {"metric_key": "training_participants", "type": "number", "unit": "persons", "prompt_hint": "참석 인원"},
            {"metric_key": "training_type", "type": "category", "prompt_hint": "정기교육/신규채용시교육/작업내용변경시교육/특별교육"},
            {"metric_key": "training_topic", "type": "text", "prompt_hint": "교육 주제"},
        ],
    },
    "ethics_code": {
        "display_name": "윤리강령",
        "category": "G",
        "extraction_prompt": "윤리강령 문서에서 핵심 정책 포함 여부를 확인하세요.",
        "summary_prompt": "윤리강령의 주요 원칙, 적용 대상, 위반 시 제재 조항, 기타 특이사항 등을 요약하세요.",
        "metrics": [
            {"metric_key": "ethics_code_exists", "type": "boolean", "prompt_hint": "윤리강령 문서인지 확인"},
            {"metric_key": "anti_corruption_clause", "type": "boolean", "prompt_hint": "반부패 조항 포함 여부"},
            {"metric_key": "child_labor_prohibition", "type": "boolean", "prompt_hint": "아동노동 금지 조항 포함 여부"},
            {"metric_key": "forced_labor_prohibition", "type": "boolean", "prompt_hint": "강제노동 금지 조항 포함 여부"},
            {"metric_key": "whistleblower_protection", "type": "boolean", "prompt_hint": "내부고발자 보호 조항 포함 여부"},
        ],
    },
    "no_child_forced_labor_policy": {
        "display_name": "아동/강제노동 금지 정책",
        "category": "S",
        "extraction_prompt": "아동/강제노동 금지 정책 문서를 분석하세요.",
        "summary_prompt": "정책의 구체적 이행 방안, 모니터링 체계, 협력사 적용 여부, 위반 시 조치 등을 요약하세요.",
        "metrics": [
            {"metric_key": "policy_exists", "type": "boolean", "prompt_hint": "정책 문서 존재 여부"},
            {"metric_key": "child_labor_prohibition", "type": "boolean", "prompt_hint": "아동노동 금지 명시"},
            {"metric_key": "forced_labor_prohibition", "type": "boolean", "prompt_hint": "강제노동 금지 명시"},
            {"metric_key": "policy_effective_date", "type": "date", "prompt_hint": "정책 시행일"},
            {"metric_key": "policy_scope", "type": "text", "prompt_hint": "정책 적용 범위"},
        ],
    },
    "air_quality_report": {
        "display_name": "대기 측정기록부",
        "category": "E",
        "extraction_prompt": "대기 측정기록부에서 오염물질 데이터를 추출하세요.",
        "summary_prompt": "측정 장소, 측정 방법, 배출시설 정보, 법적 기준 대비 현황, 개선 필요 사항 등을 요약하세요.",
        "metrics": [
            {"metric_key": "measurement_date", "type": "date", "prompt_hint": "측정일 (YYYY-MM-DD)"},
            {"metric_key": "nox_emission", "type": "number", "unit": "ppm", "prompt_hint": "NOx 측정값"},
            {"metric_key": "sox_emission", "type": "number", "unit": "ppm", "prompt_hint": "SOx 측정값"},
            {"metric_key": "dust_emission", "type": "number", "unit": "mg/m3", "prompt_hint": "먼지 측정값"},
            {"metric_key": "compliance_status", "type": "boolean", "prompt_hint": "배출허용기준 충족 여부"},
        ],
    },
}

# ─────────────────────────────────────────────────────────────────────────────
# 텍스트 추출 헬퍼
# ─────────────────────────────────────────────────────────────────────────────

def extract_text_from_content(content: dict) -> str:
    """input1['content']에서 전체 텍스트 추출"""
    texts = []
    pages = content.get("pages", {})
    for page_key in sorted(pages.keys()):
        page = pages[page_key]
        if page.get("text"):
            texts.append(page["text"])
        # 테이블 데이터도 텍스트로 변환
        tables = page.get("tables", {})
        for table_key in sorted(tables.keys()):
            table = tables[table_key]
            for row in table.get("rows", []):
                texts.append(" | ".join(str(cell) for cell in row))
    return "\n".join(texts)


# ─────────────────────────────────────────────────────────────────────────────
# JSON 스키마 및 프롬프트 빌더
# ─────────────────────────────────────────────────────────────────────────────

def get_json_type(extraction_type: str) -> str:
    """추출 타입을 JSON 스키마 타입으로 변환"""
    type_map = {
        "number": "number",
        "boolean": "boolean",
        "category": "string",
        "date": "string",
        "text": "string",
    }
    return type_map.get(extraction_type, "string")


def build_json_schema(metrics: list[dict], include_summary: bool = True) -> dict:
    """지표 정의로부터 JSON 스키마 생성"""
    properties = {}
    
    # metrics 필드들
    for m in metrics:
        json_type = get_json_type(m["type"])
        properties[m["metric_key"]] = {
            "type": [json_type, "null"],
            "description": m["prompt_hint"],
        }
    
    # summary 필드 추가
    if include_summary:
        properties["summary"] = {
            "type": ["string", "null"],
            "description": "지표에 담지 못한 추가 정보, 맥락, 특이사항 요약",
        }
    
    required = list(properties.keys())
    
    return {
        "type": "object",
        "properties": properties,
        "required": required,
        "additionalProperties": False,
    }


def build_llm_prompt(config: dict, text_content: str) -> tuple[str, str]:
    """LLM용 시스템 프롬프트와 유저 프롬프트 생성"""
    
    metrics = config["metrics"]
    schema = build_json_schema(metrics, include_summary=True)
    
    # 각 필드별 상세 설명 생성
    field_descriptions = []
    for m in metrics:
        type_info = m["type"]
        desc = f"  - \"{m['metric_key']}\": {m['prompt_hint']}"
        
        if type_info == "boolean":
            desc += " → true 또는 false로 응답"
        elif type_info == "date":
            desc += " → 반드시 YYYY-MM-DD 형식 (예: 2024-03-15)"
        elif type_info == "number":
            desc += " → 숫자만 (단위 제외)"
            if m.get("unit"):
                desc += f" [단위: {m['unit']}]"
        elif type_info == "category":
            desc += " → 지정된 카테고리 중 하나만"
        elif type_info == "text":
            desc += " → 텍스트 문자열"
        
        field_descriptions.append(desc)
    
    # summary 힌트
    summary_hint = config.get("summary_prompt", "지표에 포함되지 않은 중요 정보나 맥락을 요약하세요.")
    
    system_prompt = f"""당신은 ESG(환경·사회·지배구조) 문서 분석 전문가입니다.
주어진 문서에서 정보를 정확하게 추출하여 JSON 형식으로 반환합니다.

## 핵심 규칙 (반드시 준수)

1. **출력 형식**: 반드시 유효한 JSON만 출력하세요. 다른 텍스트, 설명, 마크다운 없이 순수 JSON만 반환합니다.

2. **필드별 타입 규칙**:
   - boolean: true 또는 false (문자열 "true" 아님)
   - date: "YYYY-MM-DD" 형식 문자열 (예: "2024-03-15")
   - number: 숫자 (정수 또는 소수, 단위 제외)
   - text: 문자열

3. **null 처리**: 문서에서 해당 정보를 찾을 수 없거나 확인할 수 없으면 반드시 null을 사용하세요.

4. **정확성**: 추측하지 마세요. 문서에 명시된 정보만 추출합니다.

## 추출할 지표 필드

{chr(10).join(field_descriptions)}

## summary 필드 (필수)

  - "summary": {summary_hint}
    → 위 지표들에 담기지 않은 추가 정보, 문서의 맥락, 특이사항, 주의할 점 등을 2-4문장으로 요약
    → 지표로 이미 추출된 내용은 반복하지 말 것
    → 문서에서 추가로 발견한 중요 정보가 없으면 null
    
## JSON 스키마
````json
{json.dumps(schema, ensure_ascii=False, indent=2)}
```"""

    user_prompt = f"""## 작업

{config['extraction_prompt']}

## 분석할 문서

<document>
{text_content}
</document>

## 응답

위 문서를 분석하여 JSON 형식으로 응답하세요.
- 각 지표 필드에 해당하는 값을 추출하세요.
- summary 필드에는 지표에 담지 못한 추가 정보를 요약하세요.
- JSON 외의 다른 텍스트는 포함하지 마세요."""

    return system_prompt, user_prompt


# ─────────────────────────────────────────────────────────────────────────────
# LLM 호출
# ─────────────────────────────────────────────────────────────────────────────

def call_llm(system_prompt: str, user_prompt: str) -> dict:
    """OpenAI API 호출"""
    if not OPENAI_API_KEY:
        raise ValueError("OPENAI_API_KEY가 설정되지 않았습니다. .env 파일을 확인하세요.")
    
    client = OpenAI(api_key=OPENAI_API_KEY)
    
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        temperature=0,
        max_tokens=2000,
        response_format={"type": "json_object"},
    )
    
    content = response.choices[0].message.content
    
    try:
        return json.loads(content)
    except json.JSONDecodeError as e:
        json_match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', content)
        if json_match:
            return json.loads(json_match.group(1))
        raise ValueError(f"LLM 응답을 JSON으로 파싱할 수 없습니다: {e}\n응답: {content}")


# ─────────────────────────────────────────────────────────────────────────────
# 비정형 데이터 지표화 메인 함수
# ─────────────────────────────────────────────────────────────────────────────

def calculate_unstructured_metrics(input1: dict, input2: dict) -> dict:
    """
    비정형 데이터 지표화
    
    Args:
        input1: {slotName, kind, ext, mode, period_start, period_end, content}
        input2: {status, file_name, validated_fields, processed_at}
    
    Returns:
        지표화 결과 딕셔너리
    """
    slot_name = input1["slotName"]
    if slot_name not in UNSTRUCTURED_REGISTRY:
        raise ValueError(f"Unknown slot: {slot_name}")
    
    config = UNSTRUCTURED_REGISTRY[slot_name]
    
    # 1) 텍스트 추출
    text_content = extract_text_from_content(input1.get("content", {}))
    
    if not text_content.strip():
        return _build_unstructured_output(input1, input2, [], None, config)
    
    # 2) LLM 프롬프트 생성 및 호출
    system_prompt, user_prompt = build_llm_prompt(config, text_content)
    extracted = call_llm(system_prompt, user_prompt)
    
    # 3) summary 분리
    summary = extracted.pop("summary", None)
    
    # 4) 지표 결과 변환 및 검증
    metrics = []
    for m in config["metrics"]:
        key = m["metric_key"]
        raw_value = extracted.get(key)
        validated_value = _validate_extracted_value(raw_value, m["type"])
        
        metrics.append({
            "metric_key": key,
            "value": validated_value,
            "unit": m.get("unit"),
            "extraction_type": m["type"],
        })
    
    return _build_unstructured_output(input1, input2, metrics, summary, config)


def _validate_extracted_value(value: Any, value_type: str) -> Any:
    """추출 값 검증"""
    if value is None:
        return None
    
    if value_type == "date":
        try:
            datetime.strptime(str(value), "%Y-%m-%d")
            return str(value)
        except ValueError:
            return None
    
    elif value_type == "number":
        try:
            return float(value)
        except (ValueError, TypeError):
            return None
    
    elif value_type == "boolean":
        if isinstance(value, bool):
            return value
        if str(value).lower() in ("true", "yes", "1"):
            return True
        if str(value).lower() in ("false", "no", "0"):
            return False
        return None
    
    return str(value) if value else None


def _build_unstructured_output(
    input1: dict, 
    input2: dict, 
    metrics: list, 
    summary: Optional[str],
    config: dict
) -> dict:
    """비정형 데이터 출력 구조 생성"""
    return {
        "status": "OK",
        "data_type": "unstructured",
        "period": {"start": input1.get("period_start"), "end": input1.get("period_end")},
        "source": {
            "slotName": input1["slotName"],
            "file_name": input2.get("file_name"),
            "kind": input1.get("kind"),
            "mode": input1.get("mode"),
        },
        "metrics": metrics,
        "summary": summary,  # LLM이 생성한 추가 정보 요약
        "meta": {
            "display_name": config["display_name"],
            "category": config["category"],
        },
        "processed_at": input2.get("processed_at") or datetime.now(UTC).isoformat(),
    }


# ─────────────────────────────────────────────────────────────────────────────
# 테스트
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    print("=" * 60)
    print("비정형 데이터 지표화 테스트")
    print(f"모델: {OPENAI_MODEL}")
    print("=" * 60)
    
    test_input1 = {
        "slotName": "no_child_forced_labor_policy",
        "kind": "PDF",
        "ext": "pdf",
        "mode": "ocr",
        "period_start": "2025-01-01T00:00:00",
        "period_end": "2025-12-31T23:59:59",
        "content": {
            "pageCount": 1,
            "pages": {
                "page1": {
                    "text": """아동 및 강제노동 금지 정책

주식회사 OO (이하 "회사")는 인권 존중의 가치를 바탕으로 
아동노동 및 강제노동을 엄격히 금지합니다.

제1조 (시행일)
본 정책은 2023년 1월 1일부터 시행됩니다.

제2조 (아동노동 금지)
1. 회사는 만 18세 미만의 아동을 고용하지 않습니다.
2. 채용 시 신분증을 통해 연령을 반드시 확인합니다.

제3조 (강제노동 금지)
1. 모든 근로는 자발적이어야 하며, 강제노동을 금지합니다.
2. 신분증, 여권 등 신분서류를 압류하지 않습니다.
3. 퇴직의 자유를 보장합니다.

제4조 (적용범위)
본 정책은 회사의 전 사업장 및 1차 협력사에 적용됩니다.
2차 이하 협력사에 대해서는 권고 사항으로 적용합니다.

제5조 (위반 시 조치)
본 정책 위반 시 즉시 시정 조치하며, 
중대한 위반의 경우 거래 중단을 포함한 제재를 가할 수 있습니다.

제6조 (모니터링)
연 1회 이상 정기 점검을 실시하며, 
필요 시 현장 실사를 진행합니다.

인사팀 문의: hr@company.com
""",
                    "tableCount": 0,
                    "tables": {}
                }
            }
        }
    }
    
    test_input2 = {
        "status": "PASS",
        "file_name": "no_child_forced_labor_policy",
        "validated_fields": ["pageCount_valid", "text_present", "min_text_len_ok", "keyword_min_hits_ok"],
        "processed_at": "2026-01-20T04:47:31Z"
    }
    
    try:
        result = calculate_unstructured_metrics(test_input1, test_input2)
        print("\n=== 비정형 데이터 지표화 결과 ===")
        print(json.dumps(result, indent=2, ensure_ascii=False))
        
        print("\n--- 지표 ---")
        for m in result['metrics']:
            print(f"  {m['metric_key']}: {m['value']}")
        
        print("\n--- Summary ---")
        print(f"  {result['summary']}")
        
    except Exception as e:
        print(f"\n오류 발생: {e}")
        import traceback
        traceback.print_exc()


비정형 데이터 지표화 테스트
모델: gpt-4o-mini

=== 비정형 데이터 지표화 결과 ===
{
  "status": "OK",
  "data_type": "unstructured",
  "period": {
    "start": "2025-01-01T00:00:00",
    "end": "2025-12-31T23:59:59"
  },
  "source": {
    "slotName": "no_child_forced_labor_policy",
    "file_name": "no_child_forced_labor_policy",
    "kind": "PDF",
    "mode": "ocr"
  },
  "metrics": [
    {
      "metric_key": "policy_exists",
      "value": true,
      "unit": null,
      "extraction_type": "boolean"
    },
    {
      "metric_key": "child_labor_prohibition",
      "value": true,
      "unit": null,
      "extraction_type": "boolean"
    },
    {
      "metric_key": "forced_labor_prohibition",
      "value": true,
      "unit": null,
      "extraction_type": "boolean"
    },
    {
      "metric_key": "policy_effective_date",
      "value": "2023-01-01",
      "unit": null,
      "extraction_type": "date"
    },
    {
      "metric_key": "policy_scope",
      "value": "회사의 전 사업장 및 1차 협력사에 적용되며, 2차 이하 협력사에 대해서는

## 분기로직 (통합 지표화)

In [None]:
from typing import Optional
import json

# ─────────────────────────────────────────────────────────────────────────────
# 데이터 타입 판별
# ─────────────────────────────────────────────────────────────────────────────

STRUCTURED_KINDS = {"EXCEL", "CSV", "TSV", "XLSX", "XLS"}
UNSTRUCTURED_KINDS = {"PDF", "IMAGE", "DOCX", "DOC", "PNG", "JPG", "JPEG"}


def detect_data_type(input1: dict) -> str:
    """
    input1의 kind를 기반으로 데이터 타입 판별
    
    Returns:
        "structured" | "unstructured"
    """
    kind = (input1.get("kind") or "").upper()
    
    if kind in STRUCTURED_KINDS:
        return "structured"
    elif kind in UNSTRUCTURED_KINDS:
        return "unstructured"
    else:
        # ext로 추가 판별
        ext = (input1.get("ext") or "").lower()
        if ext in ("xlsx", "xls", "csv", "tsv"):
            return "structured"
        elif ext in ("pdf", "docx", "doc", "png", "jpg", "jpeg"):
            return "unstructured"
        else:
            raise ValueError(f"Cannot determine data type for kind={kind}, ext={ext}")


# ─────────────────────────────────────────────────────────────────────────────
# 통합 지표화 함수
# ─────────────────────────────────────────────────────────────────────────────

def calculate_metrics(input1: dict, input2: dict) -> dict:
    """통합 지표화 함수 - 데이터 타입에 따라 자동 분기"""
    
    if input2.get("status") != "PASS":
        return {
            "status": "SKIPPED",
            "reason": f"Validation failed: {input2.get('status')}",
            "source": {"slotName": input1.get("slotName")},
            "metrics": [],
        }
    
    data_type = detect_data_type(input1)
    
    try:
        if data_type == "structured":
            return calculate_structured_metrics(input1, input2)
        else:
            return calculate_unstructured_metrics(input1, input2) 
    
    except Exception as e:
        return {
            "status": "ERROR",
            "error": str(e),
            "error_type": type(e).__name__,
            "source": {"slotName": input1.get("slotName"), "kind": input1.get("kind")},
            "metrics": [],
        }


# ─────────────────────────────────────────────────────────────────────────────
# 배치 처리
# ─────────────────────────────────────────────────────────────────────────────

def calculate_metrics_batch(
    items: list[tuple[dict, dict]]
) -> list[dict]:

    results = []
    for input1, input2 in items:
        result = calculate_metrics(input1, input2)
        results.append(result)
    return results


# ─────────────────────────────────────────────────────────────────────────────
# 값 포맷팅 헬퍼 함수
# ─────────────────────────────────────────────────────────────────────────────

def format_metric_value(value, decimals: int = 4) -> str:
    """지표 값을 문자열로 포맷팅"""
    if value is None:
        return "N/A"
    elif isinstance(value, bool):
        return str(value)
    elif isinstance(value, (int, float)):
        return f"{value:.{decimals}f}"
    else:
        return str(value)


# ─────────────────────────────────────────────────────────────────────────────
# 테스트
# ─────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    import pandas as pd
    import numpy as np
    
    print("=" * 60)
    print("통합 지표화 테스트")
    print("=" * 60)
    
    # 정형 데이터 테스트
    print("\n[1] 정형 데이터 (전력 사용량)")
    sample_df = df = pd.read_excel('test_file/xlsx_elec.xlsx')
    structured_input1 = {
        "slotName": "electricity_usage",
        "kind": "EXCEL",
        "ext": "xlsx",
        "period_start": "2025-10-01T00:00:00",
        "period_end": "2025-12-31T04:00:00",
        "dataframe": sample_df
    }
    structured_input2 = {
        "status": "PASS",
        "file_path": "electricity_data.xlsx",
        "payload": {
            "time_granularity": "15min",
            "unit_schema": ["time", "kWh", "-", "-", "-", "-"],
            "validated_fields": ["timestamp", "flow_kwh", "col3", "col4", "col5", "col6"]
        },
        "processed_at": "2026-01-20T07:03:51Z"
    }
    
    result1 = calculate_metrics(structured_input1, structured_input2)
    print(f"  데이터 타입: {result1['data_type']}")
    print(f"  지표 개수: {len(result1['metrics'])}")
    for m in result1['metrics']:
        val_str = format_metric_value(m['value'])
        print(f"    - {m['metric_key']}: {val_str} {m['unit']}")
    
    # 비정형 데이터 테스트
    print("\n[2] 비정형 데이터 (윤리강령)")
    unstructured_input1 = {
        "slotName": "no_child_forced_labor_policy",
        "kind": "PDF",
        "ext": "pdf",
        "mode": "ocr",
        "period_start": "2025-01-01T00:00:00",
        "period_end": "2025-12-31T23:59:59",
        "content": {
            "pageCount": 1,
            "pages": {
                "page1": {
                    "text": """아동 및 강제노동 금지 정책

주식회사 OO (이하 "회사")는 인권 존중의 가치를 바탕으로 
아동노동 및 강제노동을 엄격히 금지합니다.

제1조 (시행일)
본 정책은 2023년 1월 1일부터 시행됩니다.

제2조 (아동노동 금지)
1. 회사는 만 18세 미만의 아동을 고용하지 않습니다.
2. 채용 시 신분증을 통해 연령을 반드시 확인합니다.

제3조 (강제노동 금지)
1. 모든 근로는 자발적이어야 하며, 강제노동을 금지합니다.
2. 신분증, 여권 등 신분서류를 압류하지 않습니다.
3. 퇴직의 자유를 보장합니다.

제4조 (적용범위)
본 정책은 회사의 전 사업장 및 1차 협력사에 적용됩니다.
2차 이하 협력사에 대해서는 권고 사항으로 적용합니다.

제5조 (위반 시 조치)
본 정책 위반 시 즉시 시정 조치하며, 
중대한 위반의 경우 거래 중단을 포함한 제재를 가할 수 있습니다.

제6조 (모니터링)
연 1회 이상 정기 점검을 실시하며, 
필요 시 현장 실사를 진행합니다.

인사팀 문의: hr@company.com
""",
                    "tableCount": 0,
                    "tables": {}
                }
            }
        }
    }
    unstructured_input2 = {
        "status": "PASS",
        "file_name": "no_child_forced_labor_policy",
        "validated_fields": ["pageCount_valid", "text_present", "min_text_len_ok", "keyword_min_hits_ok"],
        "processed_at": "2026-01-20T04:47:31Z"
    }
    
    result2 = calculate_metrics(unstructured_input1, unstructured_input2)
    print(f"  데이터 타입: {result2['data_type']}")
    print(f"  지표 개수: {len(result2['metrics'])}")
    
    
    for m in result2['metrics']:
        val_str = format_metric_value(m['value'])
        print(f"    - {m['metric_key']}: {val_str}")
    
    # 검증 실패 테스트
    print("\n[3] 검증 실패 케이스")
    failed_input2 = {"status": "FAIL", "file_name": "test"}
    result3 = calculate_metrics(unstructured_input1, failed_input2)
    print(f"  상태: {result3['status']}")
    print(f"  사유: {result3['reason']}")
    
    print("\n" + "=" * 60)
    print("테스트 완료")
    print("=" * 60)

통합 지표화 테스트

[1] 정형 데이터 (전력 사용량)


  hourly = df.set_index("timestamp")[field].resample("H").sum()
  hourly = df.set_index("timestamp")[field].resample("H").sum()



[2] 비정형 데이터 (윤리강령)
  데이터 타입: unstructured
  지표 개수: 5
    - policy_exists: True
    - child_labor_prohibition: True
    - forced_labor_prohibition: True
    - policy_effective_date: 2023-01-01
    - policy_scope: 회사의 전 사업장 및 1차 협력사에 적용되며, 2차 이하 협력사에 대해서는 권고 사항으로 적용됩니다.

[3] 검증 실패 케이스
  상태: SKIPPED
  사유: Validation failed: FAIL

테스트 완료


In [34]:
result2

{'status': 'OK',
 'data_type': 'unstructured',
 'period': {'start': '2025-01-01T00:00:00', 'end': '2025-12-31T23:59:59'},
 'source': {'slotName': 'no_child_forced_labor_policy',
  'file_name': 'no_child_forced_labor_policy',
  'kind': 'PDF',
  'mode': 'ocr'},
 'metrics': [{'metric_key': 'policy_exists',
   'value': True,
   'unit': None,
   'extraction_type': 'boolean'},
  {'metric_key': 'child_labor_prohibition',
   'value': True,
   'unit': None,
   'extraction_type': 'boolean'},
  {'metric_key': 'forced_labor_prohibition',
   'value': True,
   'unit': None,
   'extraction_type': 'boolean'},
  {'metric_key': 'policy_effective_date',
   'value': '2023-01-01',
   'unit': None,
   'extraction_type': 'date'},
  {'metric_key': 'policy_scope',
   'value': '회사의 전 사업장 및 1차 협력사에 적용되며, 2차 이하 협력사에 대해서는 권고 사항으로 적용됩니다.',
   'unit': None,
   'extraction_type': 'text'}],
 'summary': '정기 점검을 연 1회 이상 실시하며, 필요 시 현장 실사를 진행합니다. 정책 위반 시 즉시 시정 조치를 취하고, 중대한 위반의 경우 거래 중단 등의 제재를 가할 수 있습니다.',
 'meta': {'displ