##프로젝트 소개
Topic : Automatic Data-pipeline agent
- 이미 운영 중인 데이터 플랫폼에 신규로 수집된 데이터가 유입이 될 때,
  신규로 수집된 비정형 데이터가 기존 DB의 어떤 테이블에 저장되는 것이
  적합한지를 분류
- 적합한 테이블을 찾으면 스키마에 맞게 정보 추출 -> 저장
- 적합한 테이블 없으면 신규 테이블 생성 -> 저장 -> Metadata 업데이트

Data:
- 신규로 수집된 비정형 데이터
- 이미 설계 및 운영 중인 DB 스키마 및 Metadata

Baseline:
- 경량 LLM 모델 혹은 최적화된 분류기 모델

Novel Hypothesis
- Base 모델은 이미 운영 중인 DB 스키마 및 Metadata 정보를 모르고,
  Domain 데이터셋이 매우 적은 상황에서,
  -> 학습 방식이 효율적일지 vs Context/Prompt Engineering이 효율적일지 비교

##1. Base Model 선정


*   어떤 LLM 모델을 사용할까?

    모델은 3가지 역할을 수행해야 함

    1). Reasoning (추론): 데이터의 의미를 파악하여 적합한 테이블을 논리적으로 연결해야 합니다. (단순 키워드 매칭 아님)


    2). Generation (생성): 적합한 테이블이 없으면 새로운 테이블 이름과 스키마를 만들어내야 합니다.


    3). Formatting (형식화): 데이터를 추출하여 DB에 넣을 수 있는 JSON 형태로 변환해야 합니다.



*   이 프로젝트에 적합한 최적화된 분류 모델이나 오픈 소스는?



추천 1순위: Qwen-2.5-7B-Instruct (강력 추천)
제작사: Alibaba Cloud

이유:

현재 7B 사이즈 모델 중 Coding(코딩)과 Structured Output(JSON 생성) 능력이 가장 압도적입니다.

데이터 파이프라인은 정확한 포맷(JSON)이 생명인데, Llama 모델보다 포맷을 덜 틀립니다.

한국어 이해도도 준수합니다.

Colab 실행법: 4-bit 양자화(Quantization) 적용 시 VRAM 약 6GB 소모. (매우 쾌적)

추천 2순위: Llama-3.1-8B-Instruct
제작사: Meta

이유:

가장 범용적이고 커뮤니티 지원이 많습니다.

일반적인 대화나 추론 능력은 매우 뛰어나지만, 복잡한 JSON 스키마를 엄격하게 지키는 능력은 Qwen보다 약간 떨어질 수 있습니다.

#1. 필수 라이브러리 설치

In [1]:
# 1. 모델 로드 및 설정 (라이브러리 설치)
!pip install -q --upgrade transformers bitsandbytes>=0.46.1 accelerate

In [2]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig


# 1. 모델 ID 설정 (Hugging Face 공식 리포지토리)
model_id = "Qwen/Qwen2.5-7B-Instruct"

# 2. 4-bit 양자화 설정 (Colab 무료 T4 GPU에서 구동하기 위함)
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,              # 4비트로 로딩
    bnb_4bit_quant_type="nf4",      # 정규 분포 4비트 (성능 우수)
    bnb_4bit_compute_dtype=torch.float16  # 연산은 16비트로 수행 (속도 향상)
)

print("모델 다운로드 및 로딩 중... (약 2~3분 소요)")

# 3. 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(model_id)

# 4. 모델 로드
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=bnb_config, # 위에서 정의한 양자화 설정 적용
    device_map="auto",              # GPU 자동 할당
    trust_remote_code=True
)

print("모델 로딩 완료!")

모델 다운로드 및 로딩 중... (약 2~3분 소요)


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]



model.safetensors.index.json: 0.00B [00:00, ?B/s]

Downloading (incomplete total...): 0.00B [00:00, ?B/s]

Fetching 4 files:   0%|          | 0/4 [00:00<?, ?it/s]

Loading weights:   0%|          | 0/339 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]

모델 로딩 완료!


In [3]:
def generate_response(prompt_text):
    messages = [
        {"role": "system", "content": "You are a helpful AI assistant."},
        {"role": "user", "content": prompt_text}
    ]

    # 프롬프트 포맷팅 (Chat Template 적용)
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )

    model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

    # 답변 생성
    generated_ids = model.generate(
        **model_inputs,
        max_new_tokens=512,  # 생성할 최대 토큰 수
        temperature=0.1,     # 창의성 조절 (낮을수록 사실적/결정적)
        top_p=0.9
    )

    # 결과 디코딩 (입력 토큰 제외하고 답변만 추출)
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]

    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return response

# 테스트 실행
test_input = "데이터 엔지니어링에서 'Schema Matching'이 뭐야? 한 문장으로 설명해줘."
print(f"질문: {test_input}\n")
print(f"답변: {generate_response(test_input)}")

질문: 데이터 엔지니어링에서 'Schema Matching'이 뭐야? 한 문장으로 설명해줘.

답변: 데이터 엔지니어링에서 스키마 매칭은 서로 다른 출처에서 유래한 데이터의 구조를 일치시켜 비교하고 통합하는 과정입니다.


#3. 가상 데이터셋 생성
Base Model이 헷갈려서 틀릴 수밖에 없는 "모호하고(Ambiguous), 혼합된(Mixed), 노이즈가 많은(Noisy)" 데이터
    
###3-1). 가상의 RDB, VectorDB에 대한 Scheme, Metadata 생성
    

1). 영업
- 영업 실적 DB (RDB)
- 영업 전략 문서 기반의 VectorDB (본문 정보는 임베딩, 문서의 metadata, 문서의 주제 등은 metadata field)

2). 개발
- 제품 Spec DB (RDB)
- 개발 이슈 DB (VectorDB)
- 설계 문서 DB (VectorDB)

3). 품질
- 품질 이슈 DB (VectorDB)
- 시험 이력 DB (RDB)

4). 생산
- 생산 실적 DB (RDB)
- 생산 이슈 DB (VectorDB)

5). 고객
- 고객 VOC DB (VectorDB)
- 시장 동향 DB (VectorDB)
- 고객 정보 DB (RDB)

In [4]:
import json

def generate_detailed_schema():
    """
    Data Catalog 수준의 상세 스키마 정보를 생성합니다.
    각 컬럼에 Type, Length, Description을 포함하여 LLM이 정확한 ETL을 수행하도록 돕습니다.
    """

    # 반복 사용되는 VectorDB 필수 필드 정의 함수
    def get_vector_common_columns():
        return [
            {"name": "content_raw", "type": "TEXT", "length": None, "description": "비정형 데이터의 원문 텍스트 전체"},
            {"name": "contents_vector", "type": "FLOAT_ARRAY", "length": 1024, "description": "원문 텍스트의 임베딩 벡터 값"}
        ]

    schema = {
        "sales_domain": {
            "description": "영업 활동, 실적, 전략 관련 데이터",
            "tables": [
                {
                    "table_name": "tb_sales_performance",
                    "type": "RDB",
                    "description": "월별/분기별 영업 사원의 판매 실적 및 매출 데이터",
                    "columns": [
                        {"name": "sale_id", "type": "VARCHAR", "length": 20, "description": "매출 건별 고유 ID (PK)"},
                        {"name": "salesperson_id", "type": "VARCHAR", "length": 10, "description": "영업 사원 사번"},
                        {"name": "product_id", "type": "VARCHAR", "length": 10, "description": "판매된 제품 코드"},
                        {"name": "amount", "type": "DECIMAL", "length": "15,2", "description": "매출 금액 (원화)"},
                        {"name": "sale_date", "type": "DATE", "length": None, "description": "실제 판매가 이루어진 날짜 (YYYY-MM-DD)"},
                        {"name": "region_code", "type": "CHAR", "length": 3, "description": "영업 지역 코드 (예: SEL, BUS)"}
                    ]
                },
                {
                    "table_name": "vec_sales_strategy",
                    "type": "VectorDB",
                    "description": "영업 전략 보고서, 경쟁사 분석 문서 등 비정형 문서",
                    "columns": get_vector_common_columns() + [
                        {"name": "doc_id", "type": "VARCHAR", "length": 50, "description": "문서 고유 ID"},
                        {"name": "author", "type": "VARCHAR", "length": 30, "description": "문서 작성자 이름"},
                        {"name": "strategy_year", "type": "INT", "length": 4, "description": "전략 해당 연도"},
                        {"name": "target_market", "type": "VARCHAR", "length": 50, "description": "주요 타겟 시장/국가"},
                        {"name": "security_level", "type": "VARCHAR", "length": 10, "description": "보안 등급 (Public/Internal/Confidential)"}
                    ]
                }
            ]
        },
        "dev_domain": {
            "description": "제품 개발, 설계, 이슈 트래킹 관련 데이터",
            "tables": [
                {
                    "table_name": "tb_product_spec",
                    "type": "RDB",
                    "description": "제품의 하드웨어/소프트웨어 상세 스펙 정보",
                    "columns": [
                        {"name": "spec_id", "type": "INT", "length": None, "description": "스펙 정보 ID (Auto Increment)"},
                        {"name": "product_model", "type": "VARCHAR", "length": 50, "description": "제품 모델명"},
                        {"name": "cpu_type", "type": "VARCHAR", "length": 30, "description": "탑재된 CPU 정보"},
                        {"name": "memory_size", "type": "INT", "length": None, "description": "RAM 용량 (GB 단위)"},
                        {"name": "os_version", "type": "VARCHAR", "length": 20, "description": "운영체제 버전"}
                    ]
                },
                {
                    "table_name": "vec_dev_issues",
                    "type": "VectorDB",
                    "description": "개발 중 발생한 버그, 이슈 리포트 및 해결 과정 로그",
                    "columns": get_vector_common_columns() + [
                        {"name": "issue_id", "type": "VARCHAR", "length": 20, "description": "이슈 트래킹 ID (예: JIRA-101)"},
                        {"name": "reporter", "type": "VARCHAR", "length": 30, "description": "이슈 보고자"},
                        {"name": "severity", "type": "VARCHAR", "length": 10, "description": "심각도 (Critical/Major/Minor)"},
                        {"name": "module_name", "type": "VARCHAR", "length": 50, "description": "문제가 발생한 소프트웨어 모듈명"},
                        {"name": "status", "type": "VARCHAR", "length": 15, "description": "현재 처리 상태 (Open/In-Progress/Closed)"}
                    ]
                },
                {
                    "table_name": "vec_design_docs",
                    "type": "VectorDB",
                    "description": "시스템 아키텍처 설계서, API 명세서 등 기술 문서",
                    "columns": get_vector_common_columns() + [
                        {"name": "doc_id", "type": "VARCHAR", "length": 50, "description": "문서 ID"},
                        {"name": "doc_type", "type": "VARCHAR", "length": 30, "description": "문서 종류 (Architecture/API/Flowchart)"},
                        {"name": "version", "type": "VARCHAR", "length": 10, "description": "문서 버전 (v1.0)"},
                        {"name": "tech_stack", "type": "VARCHAR", "length": 100, "description": "관련된 주요 기술 스택 나열"}
                    ]
                }
            ]
        },
        "quality_domain": {
            "description": "제품 품질 테스트 및 이슈 관리 데이터",
            "tables": [
                {
                    "table_name": "vec_quality_issues",
                    "type": "VectorDB",
                    "description": "QA 과정 또는 필드에서 리포트된 품질 불량 사례",
                    "columns": get_vector_common_columns() + [
                        {"name": "report_id", "type": "VARCHAR", "length": 20, "description": "리포트 번호"},
                        {"name": "defect_type", "type": "VARCHAR", "length": 30, "description": "불량 유형 (Crash/UI/Performance)"},
                        {"name": "detected_phase", "type": "VARCHAR", "length": 20, "description": "발견 시점 (QA/Beta/Production)"},
                        {"name": "risk_level", "type": "INT", "length": 1, "description": "위험도 (1~5, 5가 최고 위험)"}
                    ]
                },
                {
                    "table_name": "tb_test_history",
                    "type": "RDB",
                    "description": "QA 테스트 수행 이력 및 결과",
                    "columns": [
                        {"name": "test_id", "type": "INT", "length": None, "description": "테스트 수행 고유 번호"},
                        {"name": "test_case_id", "type": "VARCHAR", "length": 20, "description": "수행한 테스트 케이스 ID"},
                        {"name": "result", "type": "CHAR", "length": 4, "description": "결과 (PASS/FAIL)"},
                        {"name": "execution_date", "type": "DATETIME", "length": None, "description": "실행 일시"},
                        {"name": "duration_sec", "type": "FLOAT", "length": None, "description": "소요 시간(초)"}
                    ]
                }
            ]
        },
        "production_domain": {
            "description": "공장 생산 라인 실적 및 설비 이슈 데이터",
            "tables": [
                {
                    "table_name": "tb_production_log",
                    "type": "RDB",
                    "description": "일별/라인별 생산량 및 불량률 집계",
                    "columns": [
                        {"name": "log_id", "type": "BIGINT", "length": None, "description": "로그 ID"},
                        {"name": "factory_line", "type": "VARCHAR", "length": 10, "description": "생산 라인 번호 (Line-A, Line-B)"},
                        {"name": "target_output", "type": "INT", "length": None, "description": "목표 생산량"},
                        {"name": "actual_output", "type": "INT", "length": None, "description": "실제 생산량"},
                        {"name": "work_date", "type": "DATE", "length": None, "description": "작업 일자"}
                    ]
                },
                {
                    "table_name": "vec_production_incidents",
                    "type": "VectorDB",
                    "description": "생산 설비 고장, 라인 중단 원인 분석 보고서",
                    "columns": get_vector_common_columns() + [
                        {"name": "incident_id", "type": "VARCHAR", "length": 20, "description": "사고 ID"},
                        {"name": "machine_id", "type": "VARCHAR", "length": 20, "description": "고장난 설비 ID"},
                        {"name": "downtime_minutes", "type": "INT", "length": None, "description": "가동 중단 시간(분)"},
                        {"name": "error_code", "type": "VARCHAR", "length": 10, "description": "장비 에러 코드"}
                    ]
                }
            ]
        },
        "customer_domain": {
            "description": "고객 정보 및 VOC, 시장 반응 데이터",
            "tables": [
                {
                    "table_name": "vec_customer_voc",
                    "type": "VectorDB",
                    "description": "고객의 불만 접수(VOC), 문의 메일, 상담 녹취",
                    "columns": get_vector_common_columns() + [
                        {"name": "voc_id", "type": "VARCHAR", "length": 20, "description": "VOC 접수 번호"},
                        {"name": "customer_id", "type": "VARCHAR", "length": 20, "description": "고객 ID (익명 가능)"},
                        {"name": "channel", "type": "VARCHAR", "length": 10, "description": "접수 채널 (Email/Call/App)"},
                        {"name": "sentiment_score", "type": "FLOAT", "length": None, "description": "감성 분석 점수 (0.0~1.0, 낮을수록 부정)"},
                        {"name": "received_at", "type": "DATETIME", "length": None, "description": "접수 일시"}
                    ]
                },
                {
                    "table_name": "vec_market_trends",
                    "type": "VectorDB",
                    "description": "시장 뉴스, 소셜 미디어 트렌드 분석 리포트",
                    "columns": get_vector_common_columns() + [
                        {"name": "trend_id", "type": "VARCHAR", "length": 20, "description": "트렌드 ID"},
                        {"name": "source", "type": "VARCHAR", "length": 20, "description": "출처 (News/Twitter/Blog)"},
                        {"name": "keyword", "type": "VARCHAR", "length": 30, "description": "주요 키워드"},
                        {"name": "impact_score", "type": "INT", "length": 1, "description": "시장 영향도 (1~10)"}
                    ]
                },
                {
                    "table_name": "tb_customer_info",
                    "type": "RDB",
                    "description": "고객의 기본 인적 사항 및 등급 정보",
                    "columns": [
                        {"name": "cust_id", "type": "VARCHAR", "length": 20, "description": "고객 고유 ID"},
                        {"name": "name", "type": "VARCHAR", "length": 50, "description": "고객명"},
                        {"name": "email", "type": "VARCHAR", "length": 100, "description": "이메일 주소"},
                        {"name": "membership_level", "type": "VARCHAR", "length": 10, "description": "멤버십 등급 (Gold/Silver/Bronze)"},
                        {"name": "join_date", "type": "DATE", "length": None, "description": "가입 일자"}
                    ]
                }
            ]
        }
    }
    return schema

# 상세 스키마 생성
metadata = generate_detailed_schema()

# 변경된 구조 확인 (개발 이슈 DB 예시)
print(json.dumps(metadata['dev_domain']['tables'][1], indent=2, ensure_ascii=False))

{
  "table_name": "vec_dev_issues",
  "type": "VectorDB",
  "description": "개발 중 발생한 버그, 이슈 리포트 및 해결 과정 로그",
  "columns": [
    {
      "name": "content_raw",
      "type": "TEXT",
      "length": null,
      "description": "비정형 데이터의 원문 텍스트 전체"
    },
    {
      "name": "contents_vector",
      "type": "FLOAT_ARRAY",
      "length": 1024,
      "description": "원문 텍스트의 임베딩 벡터 값"
    },
    {
      "name": "issue_id",
      "type": "VARCHAR",
      "length": 20,
      "description": "이슈 트래킹 ID (예: JIRA-101)"
    },
    {
      "name": "reporter",
      "type": "VARCHAR",
      "length": 30,
      "description": "이슈 보고자"
    },
    {
      "name": "severity",
      "type": "VARCHAR",
      "length": 10,
      "description": "심각도 (Critical/Major/Minor)"
    },
    {
      "name": "module_name",
      "type": "VARCHAR",
      "length": 50,
      "description": "문제가 발생한 소프트웨어 모듈명"
    },
    {
      "name": "status",
      "type": "VARCHAR",
      "length": 15,
      "description": "현재 처리 상

###3-2). 가상의 비정형 데이터셋 만들기 (사용 안 함)

In [None]:
!pip install faker

Collecting faker
  Downloading faker-40.4.0-py3-none-any.whl.metadata (16 kB)
Downloading faker-40.4.0-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m37.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: faker
Successfully installed faker-40.4.0


In [None]:
import random
import json
from faker import Faker

fake_ko = Faker('ko_KR')
fake_en = Faker('en_US')

def generate_nightmare_dataset(num_samples=20):
    dataset = []

    # ==========================================
    # 1. 고밀도 노이즈 생성기 (Irrelevant Contexts)
    # ==========================================

    def get_lunch_menu_chat():
        return f"""
        [Chat Log: #lunch-buddies]
        {fake_en.first_name()}: 오늘 점심 뭐 먹을래? 순대국?
        {fake_en.first_name()}: 아 어제 술 마셔서 해장이 필요해.
        {fake_en.first_name()}: 길 건너 새로 생긴 파스타집 별로더라. 가격만 비싸고.
        {fake_en.first_name()}: 그럼 그냥 구내식당 가자. 오늘 메뉴 제육볶음이래.
        {fake_en.first_name()}: ㅇㅋ 12시에 로비에서 봐.
        """

    def get_spam_footer():
        return f"""
        ----------------------------------------------------------------------
        This email and any files transmitted with it are confidential and intended
        solely for the use of the individual or entity to whom they are addressed.
        If you have received this email in error please notify the system manager.
        This message contains confidential information and is intended only for
        the individual named. If you are not the named addressee you should not
        disseminate, distribute or copy this e-mail.
        ----------------------------------------------------------------------
        [Company News] 사내 헬스장 리모델링 공사 안내 (기간: 10/1 ~ 10/15)
        [Event] 이번 달 생일자 축하 파티가 3층 휴게실에서 있습니다.
        """

    def get_prev_email_chain():
        """무의미한 이전 이메일 회신 내역 (길이 늘리기용)"""
        chain = ""
        for _ in range(3):
            chain += f"""
            -----Original Message-----
            From: {fake_en.name()} <{fake_en.email()}>
            Sent: {fake_en.date_time_this_year()}
            Subject: Re: Update request

            확인 부탁드립니다. 지난번 요청하신 자료는 아직 취합 중입니다.
            타 부서 협조가 늦어지고 있어서 조금 더 시간이 필요할 것 같습니다.
            죄송합니다. 최대한 독촉해 보겠습니다.

            > On {fake_en.date()}, {fake_en.name()} wrote:
            > > {fake_en.paragraph()}
            """
        return chain

    # ==========================================
    # 2. 도메인별 함정 시나리오 (Traps & Signals)
    # ==========================================

    # [1] Sales Strategy (함정: 개발 이슈인 척하다가 전략으로 끝남)
    def create_sales_trap():
        doc_id = f"STRAT-{random.randint(100,999)}"
        author = fake_ko.name()
        year = 2025
        target = "중동 건설 시장"

        # 미끼 데이터 (Decoy)
        wrong_amount = "$50,000"  # 매출 데이터인 척

        full_text = f"""
        {get_prev_email_chain()}

        수신: 전략기획실
        발신: {author}

        팀장님, 아까 말씀하신 '서버 다운 건'은 해결됐나요? (Dev Domain 함정)
        개발팀에서 로그 확인한다고 난리던데... 에휴.

        그건 그렇고, 이번에 김대리가 보고한 매출 실적({wrong_amount}) 말인데요. (Sales Performance 함정)
        그거 다 허수입니다. 실질적인 수익성이 너무 떨어져요.

        그래서 제가 주말 동안 고민을 좀 해봤는데, 아예 판을 바꿔야 합니다.
        [2025년도 영업 전략 수정안]을 기안합니다.

        핵심은 기존 북미 시장을 버리고 '{target}'으로 집중하는 것입니다.
        오일머니가 다시 돌고 있어서 기회가 큽니다.

        {get_lunch_menu_chat()}

        첨부된 기획서({doc_id}) 검토 부탁드립니다.
        올해({year}) 안에 쇼부 봐야 합니다.

        {get_spam_footer()}
        """
        return {
            "input_text": full_text,
            "ground_truth": {
                "domain": "sales_domain",
                "table": "vec_sales_strategy",
                "extracted_data": {
                    "doc_id": doc_id, "author": author, "strategy_year": year, "target_market": target
                }
            }
        }

    # [2] Dev Issue (함정: 그냥 잡담인 척하다가 로그가 섞임)
    def create_dev_trap():
        issue_id = f"BUG-{random.randint(1000,9999)}"
        module = "UserAuth"
        severity = "Minor"

        full_text = f"""
        [Slack #random]
        박프로: 야, 어제 축구 봤냐? 손흥민 골 대박이더라.
        이수석: 못 봤어. 어제 야근하느라... ㅠㅠ
        박프로: 무슨 일 있었어?
        이수석: 아니 그놈의 로그인 모듈이 또 말썽이라.

        [Slack #dev-alerts] (여기서부터 진짜 정보)
        System: Alert! {module} Response Time > 5000ms.

        이수석: 아 이거 봐. 또 떴네.
        박프로: 그거 저번에 고친 거 아니야?
        이수석: 롤백됐나 봐. 일단 티켓 하나 끊어놔. ({issue_id})
        박프로: 심각해?
        이수석: 아니 뭐, 유저는 잘 모를 거야. 그냥 내부 타임아웃이라. ({severity})

        박프로: ㅇㅋ. 근데 점심은 뭐 먹지?
        {get_lunch_menu_chat()}

        이수석: 로그 일단 긁어놓을게.
        Caused by: java.util.ConcurrentModificationException
        at {module}.SessionManager.validate(SessionManager.java:105)

        박프로: 알았어. 내가 나중에 볼게. 커피나 마시러 가자.
        """
        return {
            "input_text": full_text,
            "ground_truth": {
                "domain": "dev_domain",
                "table": "vec_dev_issues",
                "extracted_data": {
                    "issue_id": issue_id, "severity": severity, "module_name": module
                }
            }
        }

    # [3] Production Incident (함정: 인사팀 공지인 척)
    def create_prod_trap():
        machine = f"MCH-{random.randint(10,99)}"
        downtime = 120
        error = "E-505"

        full_text = f"""
        [전사 공지] 사내 식당 메뉴 가격 인상 안내
        ... (생략) ...

        [전사 공지] 주차장 청소 안내
        ... (생략) ...

        [긴급: 생산관리팀]
        안녕하세요. 생산 2팀입니다.
        아까 공유드린 식당 메뉴랑 별개로, 지금 라인이 섰습니다.

        범인은 {machine} 호기고요.
        갑자기 센서가 나가면서 에러코드 {error} 띄우고 죽었네요.

        지금 수리 기사님 오시려면 2시간({downtime}분)은 걸린답니다.
        오늘 야근 확정이네요. 다들 저녁 뭐 드실래요?

        참고로 저번달 실적(Production Log)은 다 채웠는데 이번 달은 망했습니다.
        부품비 결제 올려주세요.

        {get_spam_footer()}
        """
        return {
            "input_text": full_text,
            "ground_truth": {
                "domain": "production_domain",
                "table": "vec_production_incidents",
                "extracted_data": {
                    "machine_id": machine, "downtime_minutes": downtime, "error_code": error
                }
            }
        }

    # [4] Customer VOC (함정: 기술 문서인 척)
    def create_voc_trap():
        voc_id = f"V-{random.randint(1000,9999)}"
        channel = "Twitter"
        sentiment = 0.1

        full_text = f"""
        [Technical Review Meeting]
        Date: 2024-10-10
        Topic: API Latency Optimization

        Dev Team: We optimized the query. It is now 20% faster.
        QA Team: Verified. No regression found.

        Marketing Team:
        잠시만요, 기술적인 것도 중요한데 지금 SNS 반응이 최악입니다.
        API 속도가 문제가 아니라, 디자인이 구리다고 난리예요.

        특히 이 트윗({voc_id}) 좀 보세요.
        "{channel}에서 보고 샀는데 완전 속았다. 버튼 위치가 숨바꼭질 수준이다. 환불해줘!"

        감정 분석 돌려보니까 점수가 {sentiment}점 나왔습니다. 거의 욕설 수준이에요.
        개발팀, 기능 추가 멈추고 UI부터 고쳐야 합니다.

        Dev Team: That is not in our sprint backlog.
        Marketing Team: This is critical!

        {get_prev_email_chain()}
        """
        return {
            "input_text": full_text,
            "ground_truth": {
                "domain": "customer_domain",
                "table": "vec_customer_voc",
                "extracted_data": {
                    "voc_id": voc_id, "channel": channel, "sentiment_score": sentiment
                }
            }
        }

    # [5] HR (New Schema) Trap
    def create_hr_trap():
        name = fake_en.name()
        univ = "Hankook Univ"

        full_text = f"""
        Fwd: Fwd: Re: 점심 메뉴 추천

        김대리, 아까 말한 맛집 리스트 보낸다.
        1. 할매 순대국
        2. 김가네

        아 그리고, 내 지인이 이력서 하나 보냈는데 검토 좀 해줘.
        우리 팀 TO 없으면 다른 팀이라도.

        [Attached: {name}_Resume.pdf]
        - {univ} 졸업 (컴공)
        - 학점 4.0
        - 성격 좋음. 술 잘 마심.

        개발팀이나 영업팀이나 아무 데나 찔러봐 줘.
        안되면 뭐 어쩔 수 없고.

        오늘 저녁에 삼겹살 콜?
        """
        return {
            "input_text": full_text,
            "ground_truth": {
                "domain": "NEW_SCHEMA",
                "table": "NEW_SCHEMA",
                "extracted_data": {
                    "applicant_name": name, "university": univ
                }
            }
        }

    # 데이터 생성 루프
    generators = [create_sales_trap, create_dev_trap, create_prod_trap, create_voc_trap]

    for _ in range(num_samples):
        if random.random() < 0.2:
            dataset.append(create_hr_trap())
        else:
            gen_func = random.choice(generators)
            dataset.append(gen_func())

    return dataset

# ------------------------------------------------------------------
# [중요] 변수 생성부
# ------------------------------------------------------------------
print("😈 Nightmare Dataset Generator Loaded.")

# 1. 검증용 데이터셋 생성 (20개)
# 이 변수가 있어야 검증 코드가 돌아갑니다.
final_dataset = generate_nightmare_dataset(num_samples=20)
test_dataset = final_dataset[:5] # 소규모 테스트용

print(f"✅ final_dataset created (N={len(final_dataset)})")
print("   -> Contains emails, chat logs, spam footers, and fake data.")

😈 Nightmare Dataset Generator Loaded.
✅ final_dataset created (N=20)
   -> Contains emails, chat logs, spam footers, and fake data.


In [5]:
import pandas as pd
import ast
import os

def parse_custom_extracted_data(data_str):
    """
    Parses a string in the format "['key': val, ...]" into a Python dictionary.
    """
    if not isinstance(data_str, str):
        return {}

    data_str = data_str.strip()

    # 1. Handle the non-standard list-like dict syntax: ['key': val] -> {'key': val}
    if data_str.startswith('[') and data_str.endswith(']'):
        # Replace outer brackets with braces
        data_str = '{' + data_str[1:-1] + '}'

    try:
        # 2. Safely evaluate the string as a Python literal (dictionary)
        return ast.literal_eval(data_str)
    except (ValueError, SyntaxError):
        print(f"⚠️ Warning: Could not parse extracted_data: {data_str}")
        return {}

def generate_dataset_from_csvs(base_path='/content/dataset/'):
    # Define mappings
    file_metadata = {
        'tb_sales_performance.csv': {
            'domain': 'sales_domain',
            'columns': ['sale_id', 'salesperson_id', 'product_id', 'amount', 'sale_date', 'region_code']
        },
        'vec_sales_strategy.csv': {
            'domain': 'sales_domain',
            'columns': ['doc_id', 'author', 'strategy_year', 'target_market', 'security_level']
        },
        'tb_product_spec.csv': {
            'domain': 'dev_domain',
            'columns': ['spec_id', 'product_model', 'cpu_type', 'memory_size', 'os_version']
        },
        'vec_dev_issues.csv': {
            'domain': 'dev_domain',
            'columns': ['issue_id', 'reporter', 'severity', 'module_name', 'status']
        },
        'vec_design_docs.csv': {
            'domain': 'dev_domain',
            'columns': ['doc_id', 'doc_type', 'version', 'tech_stack']
        },
        'vec_quality_issues.csv': {
            'domain': 'quality_domain',
            'columns': ['report_id', 'defect_type', 'detected_phase', 'risk_level']
        },
        'vec_production_incidents.csv': {
            'domain': 'production_domain',
            'columns': ['incident_id', 'machine_id', 'downtime_minutes', 'error_code']
        },
        'vec_customer_voc.csv': {
            'domain': 'customer_domain',
            'columns': ['voc_id', 'customer_id', 'channel', 'sentiment_score', 'received_at']
        },
        'vec_market_trends.csv': {
            'domain': 'customer_domain',
            'columns': ['trend_id', 'source', 'keyword', 'impact_score']
        }
    }

    dataset = []

    print(f"📂 Reading CSV files from: {base_path}")

    for filename, meta in file_metadata.items():
        filepath = os.path.join(base_path, filename)

        if not os.path.exists(filepath):
            print(f"⚠️ Warning: File not found at {filepath}, skipping.")
            continue

        try:
            df = pd.read_csv(filepath)
            target_table = filename.replace('.csv', '')
            target_domain = meta['domain']

            for _, row in df.iterrows():
                # Construct full text input
                title = str(row.get('title', '')).strip()
                contents = str(row.get('contents', '')).strip()
                full_text = f"{title}\n\n{contents}"

                # Parse extracted data using the custom parser
                extracted_data_raw = row.get('extracted_data', '')
                extracted_data = parse_custom_extracted_data(extracted_data_raw)

                # Create dataset item
                item = {
                    "input_text": full_text,
                    "ground_truth": {
                        "domain": target_domain,
                        "table": target_table,
                        "extracted_data": extracted_data
                    }
                }
                dataset.append(item)

            print(f"✅ Loaded {len(df)} samples from {filename}")

        except Exception as e:
            print(f"❌ Error processing {filename}: {e}")

    return dataset

# --- Execution ---
final_dataset = generate_dataset_from_csvs(base_path='/content/dataset/')
test_dataset = final_dataset[:5] if len(final_dataset) > 5 else final_dataset

print(f"\n🎉 Total {len(final_dataset)} samples generated.")

if len(final_dataset) > 0:
    import json
    print("\n[Sample Data Preview]")
    print(json.dumps(final_dataset[0], indent=2, ensure_ascii=False))

📂 Reading CSV files from: /content/dataset/
✅ Loaded 5 samples from tb_sales_performance.csv
✅ Loaded 5 samples from vec_sales_strategy.csv
✅ Loaded 5 samples from tb_product_spec.csv
✅ Loaded 5 samples from vec_dev_issues.csv
✅ Loaded 5 samples from vec_design_docs.csv
✅ Loaded 5 samples from vec_quality_issues.csv
✅ Loaded 5 samples from vec_production_incidents.csv
✅ Loaded 5 samples from vec_customer_voc.csv
✅ Loaded 5 samples from vec_market_trends.csv

🎉 Total 45 samples generated.

[Sample Data Preview]
{
  "input_text": "[정기] 2025년 3분기 영남 지역 매출 결산 및 실적 분석 보고\n\n본 보고서는 2025-07-11까지 영남권역(BUS)에서 발생한 모든 영업 활동을 수치화하여 분석한 자료입니다. 해당 기간 동안 생성된 고유 매출 ID(#1029734)는 총 1,420건에 달하며, 이는 전년 동기 대비 약 18% 증가한 수치입니다. 주요 성과 지표를 살펴보면, 영업 사원 사번(salesperson_id) SP-9901부터 SP-9915까지의 인원들이 전략적으로 추진한 '동남권 중견기업 대상 솔루션 영업'이 주효했습니다. 특히 매출 금액(amount) 측면에서 단일 계약당 평균 4,500만 원 이상의 고액 매출이 3분기 중반인 8월 2025-07에 집중적으로 발생하였습니다. 판매된 제품 코드를 분석해 보면 'P-ENT-001' 모델이 전체 매출의 42%를 차지하며 영남 지역의 주력 모델로 자리매김했음을 알 수 있습니다. 지역 코드 '

#4. Agent 구현

In [None]:
import json
import torch

class DataPipelineAgent:
    def __init__(self, model, tokenizer, schema_metadata):
        self.model = model
        self.tokenizer = tokenizer
        self.schema = schema_metadata

    def _format_schema_for_prompt(self):
        """
        JSON 형태의 메타데이터를 LLM이 이해하기 쉬운 텍스트 설명문으로 변환
        """
        schema_text = ""
        for domain, d_info in self.schema.items():
            schema_text += f"\n## Domain: {domain} ({d_info['description']})\n"
            for table in d_info['tables']:
                schema_text += f"  - Table: {table['table_name']} ({table['type']})\n"
                schema_text += f"    Desc: {table['description']}\n"
                schema_text += f"    Columns: {', '.join([col['name'] for col in table['columns']])}\n"
        return schema_text

    def construct_prompt(self, input_data, mode="vanilla"):
        if mode == "vanilla":
            # [Baseline] 단순한 분류 및 추출 요청
            return f"""
            Task: Classify the input data into an existing table or suggest a new one. Extract data as JSON.

            Input Data:
            {input_data}

            Available Tables:
            {list(self.schema.keys())} (and sub-tables)

            Output Format: JSON only.
            """

        else:
            # [Context Engineering] Role, Context, CoT, Constraint 적용
            schema_context = self._format_schema_for_prompt()

            return f"""
            ### Role
            You are a Senior Data Engineer. Your goal is to map unstructured data to the correct database table based on the schema definition.

            ### Database Schema (Context)
            {schema_context}

            ### Input Data
            {input_data}

            ### Instruction (Chain of Thought)
            1. **Analyze**: Understand the semantic meaning of the input data.
            2. **Search**: Compare the input against the 'Description' and 'Columns' of the provided Schema.
            3. **Decision**:
               - If a suitable table exists (Match Confidence > 80%), select it.
               - If NO suitable table exists, suggest a NEW table name and schema.
            4. **Extraction**: Extract values fitting the target table's columns. Convert types (e.g., "Critical" -> "Critical", "1 hour" -> 60) if necessary.

            ### Output Format
            Provide ONLY the JSON output in the following format (No explanations):
            {{
                "thought_process": "Brief reasoning here...",
                "target_domain": "string",
                "target_table": "string",
                "is_new_table": boolean,
                "extracted_data": {{ "column_name": "value", ... }}
            }}
            """

    def run(self, input_text, mode="cot"):
        prompt = self.construct_prompt(input_text, mode)

        messages = [
            {"role": "system", "content": "You are a helpful AI assistant that outputs strict JSON."},
            {"role": "user", "content": prompt}
        ]

        # Chat Template 적용
        text = self.tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True
        )

        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)

        # 추론
        generated_ids = self.model.generate(
            **model_inputs,
            max_new_tokens=1024,
            temperature=0.1, # 일관된 포맷을 위해 낮음 유지
            top_p=0.9
        )

        # 디코딩
        generated_ids = [
            output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]
        response = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

        # JSON 파싱 시도 (후처리)
        try:
            # Markdown Code block 제거 (```json ... ```)
            clean_response = response.replace("```json", "").replace("```", "").strip()
            return json.loads(clean_response)
        except json.JSONDecodeError:
            return {"error": "JSON Parsing Failed", "raw_output": response}

# Agent 인스턴스 생성
agent = DataPipelineAgent(model, tokenizer, metadata)

print("Agent 생성 완료! 테스트 준비 끝.")

Agent 생성 완료! 테스트 준비 끝.


#4-1. Agent 고도화 버전 구현

In [11]:
import json
import torch

class DataPipelineAgent:
    def __init__(self, model, tokenizer, schema_metadata):
        self.model = model
        self.tokenizer = tokenizer
        self.schema = schema_metadata

    def _format_schema_detailed(self):
        text = ""
        for domain, d_info in self.schema.items():
            text += f"\n## Domain: {domain}\n"
            text += f"   - Description: {d_info['description']}\n"
            for table in d_info['tables']:
                text += f"   - Table: {table['table_name']} ({table['type']})\n"
                text += f"     Usage: {table['description']}\n"
                col_strs = [f"{c['name']}({c['type']})" for c in table['columns']]
                text += f"     Columns: {', '.join(col_strs)}\n"
        return text

    def _format_schema_simple(self):
        text = "Available Tables:\n"
        for domain, d_info in self.schema.items():
            for table in d_info['tables']:
                text += f"- {table['table_name']} (in {domain})\n"
        return text

    def construct_prompt(self, input_data, mode="vanilla", prev_result=None):

        if mode == "vanilla":
            return f"""Task: Map input to schema. Schema: {self._format_schema_simple()} Input: {input_data} Output: JSON."""

        # === [CoT 개선] 보고서/요약본 처리 및 대표값 추출 전략 ===
        elif mode == "cot":
            return f"""
            ### Role
            Senior Data Engineer specializing in unstructured-to-structured ETL.

            ### Schema
            {self._format_schema_detailed()}

            ### Input Data
            {input_data}

            ### Instructions (Critical)
            1. **Flexible Mapping**:
               - Even if the input is a **"Report"**, **"Summary"**, or **"Guide"**, if it discusses the *content* of a specific table, MAP IT to that table.
               - Example: "Sales Analysis Report" -> Map to `tb_sales_performance`.
               - Example: "API Specification" -> Map to `vec_design_docs`.

            2. **Representative Extraction**:
               - If the text mentions multiple items (e.g., "Sales from SP-001 to SP-005"), extract the **most specific single example** (e.g., "SP-001") into the column.
               - If a value is described as an aggregate (e.g., "Average 45,000"), extract it if it fits the data type.
               - **Do NOT** be afraid of partial extraction. Extract what you see.

            3. **Noise Filtering**:
               - Ignore completely irrelevant chatter (Lunch, Parking).
               - HR/Resume data is `NEW_SCHEMA`.

            ### Output Format
            {{
                "thought_process": "1. Analyze Content... 2. Select Table (Why?)... 3. Extract Fields...",
                "target_domain": "...",
                "target_table": "...",
                "is_new_table": boolean,
                "extracted_data": {{ ... }}
            }}
            """

        # === [Review 개선] 적극적 데이터 복구 (Active Repair) ===
        elif mode == "review":
            return f"""
            ### Role
            Lead Data Architect. Correct the Junior Engineer's extraction.

            ### Schema
            {self._format_schema_detailed()}

            ### Input Data
            {input_data}

            ### Candidate Result
            {json.dumps(prev_result, indent=2, ensure_ascii=False)}

            ### Validation Logic (Strictly Follow Steps)

            **Step 1. Semantic Type Check (The "Is it a Bug or a Doc?" Test)**
            - Read the text's *INTENT* carefully.
            - **DOCUMENTATION** (Spec, Guide, Manual, Plan) -> MUST map to `vec_design_docs` or `vec_sales_strategy`.
            - **PROBLEM/EVENT** (Error, Crash, Bug, Issue, Failure) -> MUST map to `vec_dev_issues`, `vec_production_incidents`, or `vec_quality_issues`.
            - **TRANSACTION/STATS** (Sales Report, Log, Count) -> MUST map to `tb_sales_performance`, `tb_production_log`.
            - **CORRECTION**: If Input is a "Specification" but Candidate chose `vec_dev_issues`, **OVERRIDE** to `vec_design_docs`.

            **Step 2. "Better Fit" (RDB Check)**
            - Does the text contain **specific structured values** (CPU Model, RAM Size, Sales Amount) that fit perfectly into an **RDB Table** (tb_*)?
            - If extracted fields match an RDB table's columns better than a VectorDB, **OVERRIDE**.
            - *Example*: Input has 'cpu_type', 'memory_size'. Candidate chose `vec_design_docs`. -> **CORRECT to `tb_product_spec`**.

            **Step 3. "Empty Shell" Repair**
            - Is the `extracted_data` missing values (null)?
            - Go back to the text and **FILL** them. Do not leave them null if the text has the answer.

            ### Output Format
            {{
                "thought_process": "Step 1 Semantic Check: [Result]. Step 2 RDB Check: [Result]. Decision: ...",
                "target_domain": "...",
                "target_table": "...",
                "is_new_table": boolean,
                "extracted_data": {{ ... }}
            }}
            """

    def _generate(self, prompt):
        messages = [{"role": "system", "content": "You are a helpful AI assistant that outputs strict JSON."},
                    {"role": "user", "content": prompt}]
        text = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        model_inputs = self.tokenizer([text], return_tensors="pt").to(self.model.device)
        generated_ids = self.model.generate(**model_inputs, max_new_tokens=2048, temperature=0.1, top_p=0.9)
        response = self.tokenizer.batch_decode([out_ids[len(in_ids):] for in_ids, out_ids in zip(model_inputs.input_ids, generated_ids)], skip_special_tokens=True)[0]
        return self._parse_json(response)

    def _parse_json(self, text):
        try:
            clean_text = text.replace("```json", "").replace("```", "").strip()
            start, end = clean_text.find("{"), clean_text.rfind("}") + 1
            if start != -1 and end != -1: return json.loads(clean_text[start:end])
            return json.loads(clean_text)
        except:
            return {"error": "JSON Parsing Failed", "raw_output": text}

    def run(self, input_text, mode="cot", verbose=False):
        if mode == "review_resolve":
            res1 = self._generate(self.construct_prompt(input_text, mode="cot"))
            res2 = self._generate(self.construct_prompt(input_text, mode="review", prev_result=res1))
            return res2
        else:
            return self._generate(self.construct_prompt(input_text, mode=mode))

# Agent 업데이트
agent = DataPipelineAgent(model, tokenizer, metadata)
print("✅ Agent V5 (Flexible Mapping & Active Repair) is ready.")

✅ Agent V5 (Flexible Mapping & Active Repair) is ready.


#5. 검증

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# (보조 함수들은 그대로 유지)
def calculate_extraction_score(gt_data, pred_data):
    if not isinstance(pred_data, dict): return 0.0
    target_keys = [k for k in gt_data.keys() if k != 'content_raw']
    if not target_keys: return 1.0
    match_count = 0
    total_keys = len(target_keys)
    for key in target_keys:
        gt_val = str(gt_data.get(key, "")).strip().lower()
        pred_val = str(pred_data.get(key, "")).strip().lower()
        if key not in pred_data: continue
        if gt_val in pred_val or pred_val in gt_val: match_count += 1
    return match_count / total_keys

def get_existing_tables(schema_metadata):
    tables = []
    for domain in schema_metadata.values():
        for table in domain['tables']: tables.append(table['table_name'])
    return tables

# === [수정됨] 전체 데이터 저장 함수 ===
def evaluate_and_save_full_log(agent, dataset):
    existing_tables = get_existing_tables(agent.schema)
    records = []

    print(f"🚀 총 {len(dataset)}개 데이터에 대해 [Full Log] 평가를 시작합니다...")

    for idx, item in enumerate(tqdm(dataset)):
        input_text = item['input_text']
        gt_raw_table = item['ground_truth']['table']
        gt_extracted = item['ground_truth']['extracted_data']

        # Ground Truth Label
        gt_label = gt_raw_table if gt_raw_table in existing_tables else "NEW_SCHEMA"

        # [수정] 자르지 않고 전체 텍스트 저장
        row = {
            "id": idx,
            "input_full_text": input_text,  # <--- 전체 텍스트 저장 (수정됨)
            "ground_truth_table": gt_label,
            # 편의상 정답 데이터도 JSON 문자열로 함께 저장
            "ground_truth_data": str(gt_extracted)
        }

        # 3가지 모드 실행
        modes = ["vanilla", "cot", "review_resolve"]
        for mode in modes:
            try:
                # verbose=False로 설정 (로그는 DataFrame에만 남김)
                res = agent.run(input_text, mode=mode, verbose=False)

                # 예측 테이블
                if res.get("is_new_table") is True:
                    pred_cls = "NEW_SCHEMA"
                else:
                    pred_cls = res.get("target_table", "UNKNOWN")

                if pred_cls not in existing_tables and pred_cls != "NEW_SCHEMA":
                    pred_cls = "NEW_SCHEMA"

                # 데이터 추출 점수
                pred_data = res.get("extracted_data", {})
                if pred_cls != gt_label:
                    ext_score = 0.0
                else:
                    ext_score = calculate_extraction_score(gt_extracted, pred_data)

                # 사고 과정
                thought = res.get("thought_process", "N/A")

                # 결과 기록
                row[f"{mode}_pred"] = pred_cls
                row[f"{mode}_score"] = ext_score
                row[f"{mode}_thought"] = thought
                # 추출된 데이터도 확인용으로 저장
                row[f"{mode}_extracted_data"] = str(pred_data)

            except Exception as e:
                row[f"{mode}_pred"] = "ERROR"
                row[f"{mode}_score"] = 0.0
                row[f"{mode}_thought"] = f"Error: {str(e)}"
                row[f"{mode}_extracted_data"] = "{}"

        records.append(row)

    return pd.DataFrame(records)

# === 실행 및 CSV 저장 ===
# 1. 평가 수행
df_full_results = evaluate_and_save_full_log(agent, test_dataset)

# 2. CSV 파일로 저장 (한글 깨짐 방지 utf-8-sig)
filename = "agent_full_evaluation_log_test.csv"
df_full_results.to_csv(filename, index=False, encoding='utf-8-sig')

print(f"\n✅ 전체 데이터가 '{filename}' 파일로 저장되었습니다.")
print("   이제 엑셀에서 열어보시면 입력 데이터와 사고 과정이 잘리지 않고 전부 보일 겁니다.")

🚀 총 5개 데이터에 대해 [Full Log] 평가를 시작합니다...


100%|██████████| 5/5 [06:21<00:00, 76.21s/it]


✅ 전체 데이터가 'agent_full_evaluation_log_test.csv' 파일로 저장되었습니다.
   이제 엑셀에서 열어보시면 입력 데이터와 사고 과정이 잘리지 않고 전부 보일 겁니다.





In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, accuracy_score

def print_performance_report(df):
    """
    평가 결과 DataFrame을 입력받아 모드별 상세 성능 지표를 출력합니다.
    """
    print("\n" + "="*80)
    print(">>> 📊 FINAL PERFORMANCE REPORT (Detailed Metrics) <<<")
    print("="*80)

    # 3가지 모드 정의
    modes = ["vanilla", "cot", "review_resolve"]

    # 모든 가능한 라벨(클래스) 수집 (Classification Report용)
    all_labels = sorted(list(set(df['ground_truth_table'].unique())))

    for mode in modes:
        print(f"\n[Mode: {mode.upper()}]")
        print("-" * 60)

        # 1. 데이터 준비
        y_true = df['ground_truth_table']
        y_pred = df[f'{mode}_pred']

        # 2. Classification Metrics (Precision, Recall, F1)
        # zero_division=0: 예측되지 않은 클래스에 대한 경고 방지
        cls_report = classification_report(y_true, y_pred, labels=all_labels, zero_division=0)
        acc = accuracy_score(y_true, y_pred)

        print(f"1. Table Classification (Accuracy: {acc:.2%})")
        print(cls_report)

        # 3. Extraction Metrics (Average Score)
        # 추출 점수는 0~1 사이의 실수이므로 평균값으로 평가
        avg_ext_score = df[f'{mode}_score'].mean()
        print(f"2. Data Extraction Quality (Average Score)")
        print(f"   👉 Score: {avg_ext_score:.4f} ({avg_ext_score:.2%})")
        print("-" * 60)

    # === 종합 비교 (Summary) ===
    print("\n" + "="*80)
    print(">>> 🏆 SUMMARY & IMPROVEMENT <<<")
    print("="*80)

    score_v = df['vanilla_score'].mean()
    score_c = df['cot_score'].mean()
    score_r = df['review_resolve_score'].mean()

    print(f"1. Vanilla Extraction Score : {score_v:.2%}")
    print(f"2. CoT Extraction Score     : {score_c:.2%}")
    print(f"3. Review Extraction Score  : {score_r:.2%}")

    print("-" * 40)

    # CoT vs Review 개선율 확인
    improvement = score_r - score_c
    if improvement > 0:
        print(f"🚀 [Success] Review 모드가 CoT 대비 {improvement:+.2%}p 성능을 향상시켰습니다!")
    elif improvement == 0:
        print(f"🤔 [Same] Review 모드와 CoT 모드의 성능이 동일합니다.")
    else:
        print(f"📉 [Fail] Review 모드가 오히려 성능을 떨어뜨렸습니다. (과도한 수정 주의)")

# ==========================================
# 실행 코드
# ==========================================

# 1. (이전 단계에서 이미 수행했다면 생략 가능) 평가 및 로그 저장
# df_full_results = evaluate_and_save_full_log(agent, final_dataset)

# 2. 성능 지표 출력 함수 실행
# df_full_results 변수가 메모리에 있어야 합니다.
if 'df_full_results' in locals():
    print_performance_report(df_full_results)
else:
    print("❌ 'df_full_results' 변수가 없습니다. 먼저 평가 함수(evaluate_and_save_full_log)를 실행해주세요.")


>>> 📊 FINAL PERFORMANCE REPORT (Detailed Metrics) <<<

[Mode: VANILLA]
------------------------------------------------------------
1. Table Classification (Accuracy: 0.00%)
                      precision    recall  f1-score   support

tb_sales_performance       0.00      0.00      0.00       5.0

           micro avg       0.00      0.00      0.00       5.0
           macro avg       0.00      0.00      0.00       5.0
        weighted avg       0.00      0.00      0.00       5.0

2. Data Extraction Quality (Average Score)
   👉 Score: 0.0000 (0.00%)
------------------------------------------------------------

[Mode: COT]
------------------------------------------------------------
1. Table Classification (Accuracy: 100.00%)
                      precision    recall  f1-score   support

tb_sales_performance       1.00      1.00      1.00         5

            accuracy                           1.00         5
           macro avg       1.00      1.00      1.00         5
        weigh

In [7]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np

def visualize_dashboard_english(results, schema_metadata):
    """
    [English Version Dashboard]
    - Removes Korean characters to prevent font issues in Colab.
    - Professional layout and color scheme.
    """

    # === Data Preparation ===
    table_to_domain = {}
    for domain, info in schema_metadata.items():
        for table in info['tables']:
            table_to_domain[table['table_name']] = domain

    df = pd.DataFrame({
        'GroundTruth': results['ground_truth_cls'],
        'Pred_Vanilla': results['pred_vanilla_cls'],
        'Pred_CoT': results['pred_cot_cls'],
        'Score_Vanilla': results['score_vanilla_ext'],
        'Score_CoT': results['score_cot_ext']
    })

    def get_domain(table_name):
        return table_to_domain.get(table_name, 'New/Unknown')

    df['Domain'] = df['GroundTruth'].apply(get_domain)
    df['Correct_Vanilla'] = df['GroundTruth'] == df['Pred_Vanilla']
    df['Correct_CoT'] = df['GroundTruth'] == df['Pred_CoT']

    # === Visualization Setup ===
    # Use default font (sans-serif) which supports English perfectly
    plt.rc('font', family='sans-serif')

    # Layout settings
    fig = plt.figure(figsize=(18, 20), constrained_layout=True)
    gs = fig.add_gridspec(4, 2)

    # Professional Color Palette (Grey vs Navy Blue)
    color_v = '#95a5a6'
    color_c = '#2c3e50'
    palette = [color_v, color_c]

    # -------------------------------------------------
    # 1. Overall Performance
    # -------------------------------------------------
    ax1 = fig.add_subplot(gs[0, :])

    overall_data = {
        'Metric': ['Table Classification', 'Table Classification',
                   'Data Extraction', 'Data Extraction'],
        'Model': ['Vanilla', 'CoT', 'Vanilla', 'CoT'],
        'Value': [df['Correct_Vanilla'].mean(), df['Correct_CoT'].mean(),
                  df['Score_Vanilla'].mean(), df['Score_CoT'].mean()]
    }
    df_overall = pd.DataFrame(overall_data)

    sns.barplot(data=df_overall, x='Metric', y='Value', hue='Model',
                palette=palette, ax=ax1, edgecolor='black', linewidth=1)

    ax1.set_title('1. Overall Performance Summary', fontsize=16, fontweight='bold', pad=15)
    ax1.set_ylim(0, 1.2)
    ax1.legend(loc='upper right', fontsize=12)
    ax1.set_xlabel('')

    for p in ax1.patches:
        height = p.get_height()
        if not np.isnan(height):
            ax1.text(p.get_x() + p.get_width() / 2., height + 0.02,
                     f'{height:.1%}', ha="center", fontsize=12, fontweight='bold')

    # -------------------------------------------------
    # 2. Domain-wise Accuracy
    # -------------------------------------------------
    ax2 = fig.add_subplot(gs[1, 0])

    domain_acc = df.groupby('Domain')[['Correct_Vanilla', 'Correct_CoT']].mean().reset_index()
    domain_acc = pd.melt(domain_acc, id_vars='Domain', var_name='Model', value_name='Accuracy')
    domain_acc['Model'] = domain_acc['Model'].map({'Correct_Vanilla': 'Vanilla', 'Correct_CoT': 'CoT'})

    sns.barplot(data=domain_acc, x='Domain', y='Accuracy', hue='Model',
                palette=palette, ax=ax2, edgecolor='black')

    ax2.set_title('2. Classification Accuracy by Domain', fontsize=14, fontweight='bold')
    ax2.set_ylim(0, 1.1)
    ax2.set_xlabel('')
    ax2.tick_params(axis='x', rotation=30)
    ax2.get_legend().remove()

    # -------------------------------------------------
    # 3. Table-wise Accuracy (Horizontal)
    # -------------------------------------------------
    ax3 = fig.add_subplot(gs[1, 1])

    table_acc = df.groupby('GroundTruth')[['Correct_Vanilla', 'Correct_CoT']].mean().reset_index()
    table_acc = pd.melt(table_acc, id_vars='GroundTruth', var_name='Model', value_name='Accuracy')
    table_acc['Model'] = table_acc['Model'].map({'Correct_Vanilla': 'Vanilla', 'Correct_CoT': 'CoT'})

    sns.barplot(data=table_acc, y='GroundTruth', x='Accuracy', hue='Model',
                palette=palette, ax=ax3, orient='h', edgecolor='black')

    ax3.set_title('3. Classification Accuracy by Table', fontsize=14, fontweight='bold')
    ax3.set_xlim(0, 1.1)
    ax3.set_ylabel('')
    ax3.get_legend().remove()

    # -------------------------------------------------
    # 4. Extraction Quality
    # -------------------------------------------------
    ax4 = fig.add_subplot(gs[2, :])

    ext_score = df.groupby('GroundTruth')[['Score_Vanilla', 'Score_CoT']].mean().reset_index()
    ext_score = pd.melt(ext_score, id_vars='GroundTruth', var_name='Model', value_name='Score')
    ext_score['Model'] = ext_score['Model'].map({'Score_Vanilla': 'Vanilla', 'Score_CoT': 'CoT'})

    sns.barplot(data=ext_score, x='GroundTruth', y='Score', hue='Model',
                palette=palette, ax=ax4, edgecolor='black')

    ax4.set_title('4. Data Extraction Quality (Which table is hard?)', fontsize=14, fontweight='bold', pad=10)
    ax4.set_ylim(0, 1.2)
    ax4.set_xticklabels(ax4.get_xticklabels(), rotation=45, ha='right')
    ax4.set_xlabel('')
    ax4.legend(loc='upper right')

    # Mark Issues
    low_performers = ext_score[(ext_score['Model'] == 'CoT') & (ext_score['Score'] < 0.9)]
    if not low_performers.empty:
        tables = [t.get_text() for t in ax4.get_xticklabels()]
        for _, row in low_performers.iterrows():
            if row['GroundTruth'] in tables:
                idx = tables.index(row['GroundTruth'])
                ax4.text(idx + 0.2, row['Score'] + 0.05, 'Issue!',
                         color='red', ha='center', fontweight='bold')

    # -------------------------------------------------
    # 5. Text Report
    # -------------------------------------------------
    ax5 = fig.add_subplot(gs[3, :])
    ax5.axis('off')

    # Calculate Metrics
    try:
        worst_domain = domain_acc[domain_acc['Model']=='CoT'].sort_values('Accuracy').iloc[0]
        worst_domain_txt = f"'{worst_domain['Domain']}' (Acc: {worst_domain['Accuracy']:.1%})"
    except:
        worst_domain_txt = "None (All Perfect)"

    try:
        worst_table_row = ext_score[ext_score['Model']=='CoT'].sort_values('Score').iloc[0]
        worst_table_txt = f"'{worst_table_row['GroundTruth']}' (Score: {worst_table_row['Score']:.1%})"
    except:
        worst_table_txt = "None (All Perfect)"

    improvement = df_overall.iloc[3]['Value'] - df_overall.iloc[2]['Value']

    report_text = (
        f"=== [Analysis Report Summary] ===\n\n"
        f"1. Weakest Domain (Classification): {worst_domain_txt}\n"
        f"   -> Suggestion: Enrich the schema description or add few-shot examples for this domain.\n\n"
        f"2. Hardest Table (Extraction): {worst_table_txt}\n"
        f"   -> Suggestion: Check if the column definitions (types/formats) are ambiguous.\n\n"
        f"3. CoT Improvement: Context Engineering improved data extraction quality by {improvement:+.1%}p."
    )

    ax5.text(0.05, 0.9, report_text, fontsize=14, va='top', family='monospace', linespacing=1.8)

    plt.show()

# === Execution ===
# Using 'metadata' variable for schema information
visualize_dashboard_english(adv_results, metadata)

🚀 30개 샘플에 대한 검증을 시작합니다...


NameError: name 'evaluate_performance_advanced' is not defined

#6. 검증 : 데이터 규모 키워서

In [12]:
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# (보조 함수들은 그대로 유지)
def calculate_extraction_score(gt_data, pred_data):
    if not isinstance(pred_data, dict): return 0.0
    target_keys = [k for k in gt_data.keys() if k != 'content_raw']
    if not target_keys: return 1.0
    match_count = 0
    total_keys = len(target_keys)
    for key in target_keys:
        gt_val = str(gt_data.get(key, "")).strip().lower()
        pred_val = str(pred_data.get(key, "")).strip().lower()
        if key not in pred_data: continue
        if gt_val in pred_val or pred_val in gt_val: match_count += 1
    return match_count / total_keys

def get_existing_tables(schema_metadata):
    tables = []
    for domain in schema_metadata.values():
        for table in domain['tables']: tables.append(table['table_name'])
    return tables

# === [수정됨] 전체 데이터 저장 함수 ===
def evaluate_and_save_full_log(agent, dataset):
    existing_tables = get_existing_tables(agent.schema)
    records = []

    print(f"🚀 총 {len(dataset)}개 데이터에 대해 [Full Log] 평가를 시작합니다...")

    for idx, item in enumerate(tqdm(dataset)):
        input_text = item['input_text']
        gt_raw_table = item['ground_truth']['table']
        gt_extracted = item['ground_truth']['extracted_data']

        # Ground Truth Label
        gt_label = gt_raw_table if gt_raw_table in existing_tables else "NEW_SCHEMA"

        # [수정] 자르지 않고 전체 텍스트 저장
        row = {
            "id": idx,
            "input_full_text": input_text,  # <--- 전체 텍스트 저장 (수정됨)
            "ground_truth_table": gt_label,
            # 편의상 정답 데이터도 JSON 문자열로 함께 저장
            "ground_truth_data": str(gt_extracted)
        }

        # 3가지 모드 실행
        modes = ["vanilla", "cot", "review_resolve"]
        for mode in modes:
            try:
                # verbose=False로 설정 (로그는 DataFrame에만 남김)
                res = agent.run(input_text, mode=mode, verbose=False)

                # 예측 테이블
                if res.get("is_new_table") is True:
                    pred_cls = "NEW_SCHEMA"
                else:
                    pred_cls = res.get("target_table", "UNKNOWN")

                if pred_cls not in existing_tables and pred_cls != "NEW_SCHEMA":
                    pred_cls = "NEW_SCHEMA"

                # 데이터 추출 점수
                pred_data = res.get("extracted_data", {})
                if pred_cls != gt_label:
                    ext_score = 0.0
                else:
                    ext_score = calculate_extraction_score(gt_extracted, pred_data)

                # 사고 과정
                thought = res.get("thought_process", "N/A")

                # 결과 기록
                row[f"{mode}_pred"] = pred_cls
                row[f"{mode}_score"] = ext_score
                row[f"{mode}_thought"] = thought
                # 추출된 데이터도 확인용으로 저장
                row[f"{mode}_extracted_data"] = str(pred_data)

            except Exception as e:
                row[f"{mode}_pred"] = "ERROR"
                row[f"{mode}_score"] = 0.0
                row[f"{mode}_thought"] = f"Error: {str(e)}"
                row[f"{mode}_extracted_data"] = "{}"

        records.append(row)

    return pd.DataFrame(records)

# === 실행 및 CSV 저장 ===
# 1. 평가 수행
df_full_results = evaluate_and_save_full_log(agent, final_dataset[:30])

# 2. CSV 파일로 저장 (한글 깨짐 방지 utf-8-sig)
filename = "agent_full_evaluation_log_final.csv"
df_full_results.to_csv(filename, index=False, encoding='utf-8-sig')

print(f"\n✅ 전체 데이터가 '{filename}' 파일로 저장되었습니다.")
print("   이제 엑셀에서 열어보시면 입력 데이터와 사고 과정이 잘리지 않고 전부 보일 겁니다.")

🚀 총 30개 데이터에 대해 [Full Log] 평가를 시작합니다...



  0%|          | 0/30 [00:00<?, ?it/s][A
  3%|▎         | 1/30 [01:25<41:06, 85.04s/it][A
  7%|▋         | 2/30 [02:26<33:15, 71.28s/it][A
 10%|█         | 3/30 [03:41<32:50, 72.97s/it][A
 13%|█▎        | 4/30 [04:56<31:53, 73.61s/it][A
 17%|█▋        | 5/30 [06:10<30:45, 73.83s/it][A
 20%|██        | 6/30 [07:32<30:36, 76.52s/it][A
 23%|██▎       | 7/30 [09:24<33:45, 88.07s/it][A
 27%|██▋       | 8/30 [10:32<29:59, 81.81s/it][A
 30%|███       | 9/30 [11:56<28:55, 82.62s/it][A
 33%|███▎      | 10/30 [13:14<27:02, 81.10s/it][A
 37%|███▋      | 11/30 [16:39<37:38, 118.87s/it][A
 40%|████      | 12/30 [17:50<31:18, 104.34s/it][A
 43%|████▎     | 13/30 [19:49<30:50, 108.86s/it][A
 47%|████▋     | 14/30 [21:00<25:57, 97.36s/it] [A
 50%|█████     | 15/30 [22:35<24:11, 96.78s/it][A
 53%|█████▎    | 16/30 [24:13<22:38, 97.01s/it][A
 57%|█████▋    | 17/30 [25:23<19:15, 88.90s/it][A
 60%|██████    | 18/30 [26:29<16:24, 82.04s/it][A
 63%|██████▎   | 19/30 [28:23<16:48, 91.68s/


✅ 전체 데이터가 'agent_full_evaluation_log_final.csv' 파일로 저장되었습니다.
   이제 엑셀에서 열어보시면 입력 데이터와 사고 과정이 잘리지 않고 전부 보일 겁니다.





#7. 평가 지표 출력

In [13]:
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, accuracy_score

def print_performance_report(df):
    """
    평가 결과 DataFrame을 입력받아 모드별 상세 성능 지표를 출력합니다.
    """
    print("\n" + "="*80)
    print(">>> 📊 FINAL PERFORMANCE REPORT (Detailed Metrics) <<<")
    print("="*80)

    # 3가지 모드 정의
    modes = ["vanilla", "cot", "review_resolve"]

    # 모든 가능한 라벨(클래스) 수집 (Classification Report용)
    all_labels = sorted(list(set(df['ground_truth_table'].unique())))

    for mode in modes:
        print(f"\n[Mode: {mode.upper()}]")
        print("-" * 60)

        # 1. 데이터 준비
        y_true = df['ground_truth_table']
        y_pred = df[f'{mode}_pred']

        # 2. Classification Metrics (Precision, Recall, F1)
        # zero_division=0: 예측되지 않은 클래스에 대한 경고 방지
        cls_report = classification_report(y_true, y_pred, labels=all_labels, zero_division=0)
        acc = accuracy_score(y_true, y_pred)

        print(f"1. Table Classification (Accuracy: {acc:.2%})")
        print(cls_report)

        # 3. Extraction Metrics (Average Score)
        # 추출 점수는 0~1 사이의 실수이므로 평균값으로 평가
        avg_ext_score = df[f'{mode}_score'].mean()
        print(f"2. Data Extraction Quality (Average Score)")
        print(f"   👉 Score: {avg_ext_score:.4f} ({avg_ext_score:.2%})")
        print("-" * 60)

    # === 종합 비교 (Summary) ===
    print("\n" + "="*80)
    print(">>> 🏆 SUMMARY & IMPROVEMENT <<<")
    print("="*80)

    score_v = df['vanilla_score'].mean()
    score_c = df['cot_score'].mean()
    score_r = df['review_resolve_score'].mean()

    print(f"1. Vanilla Extraction Score : {score_v:.2%}")
    print(f"2. CoT Extraction Score     : {score_c:.2%}")
    print(f"3. Review Extraction Score  : {score_r:.2%}")

    print("-" * 40)

    # CoT vs Review 개선율 확인
    improvement = score_r - score_c
    if improvement > 0:
        print(f"🚀 [Success] Review 모드가 CoT 대비 {improvement:+.2%}p 성능을 향상시켰습니다!")
    elif improvement == 0:
        print(f"🤔 [Same] Review 모드와 CoT 모드의 성능이 동일합니다.")
    else:
        print(f"📉 [Fail] Review 모드가 오히려 성능을 떨어뜨렸습니다. (과도한 수정 주의)")

# ==========================================
# 실행 코드
# ==========================================

# 1. (이전 단계에서 이미 수행했다면 생략 가능) 평가 및 로그 저장
# df_full_results = evaluate_and_save_full_log(agent, final_dataset)

# 2. 성능 지표 출력 함수 실행
# df_full_results 변수가 메모리에 있어야 합니다.
if 'df_full_results' in locals():
    print_performance_report(df_full_results)
else:
    print("❌ 'df_full_results' 변수가 없습니다. 먼저 평가 함수(evaluate_and_save_full_log)를 실행해주세요.")


>>> 📊 FINAL PERFORMANCE REPORT (Detailed Metrics) <<<

[Mode: VANILLA]
------------------------------------------------------------
1. Table Classification (Accuracy: 0.00%)
                      precision    recall  f1-score   support

     tb_product_spec       0.00      0.00      0.00       5.0
tb_sales_performance       0.00      0.00      0.00       5.0
     vec_design_docs       0.00      0.00      0.00       5.0
      vec_dev_issues       0.00      0.00      0.00       5.0
  vec_quality_issues       0.00      0.00      0.00       5.0
  vec_sales_strategy       0.00      0.00      0.00       5.0

           micro avg       0.00      0.00      0.00      30.0
           macro avg       0.00      0.00      0.00      30.0
        weighted avg       0.00      0.00      0.00      30.0

2. Data Extraction Quality (Average Score)
   👉 Score: 0.0000 (0.00%)
------------------------------------------------------------

[Mode: COT]
----------------------------------------------------------