# 🏗️ 온톨로지 구축 ETL 파이프라인

실무 데이터로 온톨로지를 자동으로 구축하는 방법을 배워봅시다.

## 📋 ETL이란?

- **E**xtract (추출): 다양한 소스에서 데이터 수집
- **T**ransform (변환): 데이터를 RAG에 적합한 형태로 가공
- **L**oad (적재): 벡터 DB에 저장

### 실무 시나리오
회사의 금융 용어집, 규정 문서, FAQ를 RAG 시스템에 자동으로 적재

## 1️⃣ Extract: 다양한 소스에서 데이터 추출

### 1-1. JSON 파일에서 용어집 추출

In [None]:
# 금융 용어집 예시 데이터
glossary_data = {
    "terms": [
        {
            "term": "온투금융",
            "full_name": "온라인투자연계금융",
            "definition": "온라인 플랫폼을 통해 투자자와 대출자를 직접 연결하는 금융 서비스",
            "category": "금융상품"
        },
        {
            "term": "P2P 대출",
            "full_name": "Peer-to-Peer Lending",
            "definition": "개인 간 직접 대출을 중개하는 서비스로, 온투금융의 이전 명칭",
            "category": "금융상품"
        },
        {
            "term": "신용등급",
            "full_name": "Credit Rating",
            "definition": "개인이나 기업의 신용도를 평가한 등급 (1~10등급)",
            "category": "신용평가"
        },
        {
            "term": "연체율",
            "full_name": "Delinquency Rate",
            "definition": "대출금을 약정일에 상환하지 못한 비율",
            "category": "리스크관리"
        },
        {
            "term": "중금리",
            "full_name": "Mid-rate Interest",
            "definition": "연 10~20% 수준의 금리로, 저금리와 고금리의 중간",
            "category": "금리"
        }
    ]
}

# JSON 파일로 저장
data_dir = Path("../data")
data_dir.mkdir(exist_ok=True)

with open(data_dir / "glossary.json", "w", encoding="utf-8") as f:
    json.dump(glossary_data, f, ensure_ascii=False, indent=2)

print(f"✅ 용어집 저장: {len(glossary_data['terms'])}개 용어")

### 1-2. CSV 형태의 FAQ 데이터

In [None]:
# FAQ 데이터
faq_data = """질문,답변,카테고리
"최소 투자금액은 얼마인가요?","크플의 최소 투자금액은 10만원입니다.","투자"
"투자 수익은 언제 받나요?","매월 원리금이 상환되며, 투자자 계좌로 자동 입금됩니다.","투자"
"대출 한도는 어떻게 결정되나요?","AI 신용평가 시스템으로 개인별 신용도를 분석하여 최대 3,000만원까지 결정됩니다.","대출"
"중도 상환 수수료가 있나요?","크플 대출은 중도상환 수수료가 없습니다.","대출"
"투자 원금 손실 가능성이 있나요?","대출자의 연체나 부도 시 원금 손실 가능성이 있으며, 이는 투자 위험에 해당합니다.","리스크"
"""

with open(data_dir / "faq.csv", "w", encoding="utf-8") as f:
    f.write(faq_data)

print("✅ FAQ 데이터 저장")

## 2️⃣ Transform: 데이터 변환 및 구조화

### 2-1. JSON 용어집 → 문서 변환

In [None]:
def transform_glossary(json_path):
    """용어집 JSON을 RAG 문서로 변환"""
    with open(json_path, "r", encoding="utf-8") as f:
        data = json.load(f)
    
    documents = []
    for term_info in data["terms"]:
        # 구조화된 문서 생성
        doc = f"""[용어] {term_info['term']}
[영문/전체명] {term_info['full_name']}
[정의] {term_info['definition']}
[분류] {term_info['category']}"""
        documents.append(doc)
    
    return documents

glossary_docs = transform_glossary(data_dir / "glossary.json")

print(f"✅ 용어집 변환 완료: {len(glossary_docs)}개 문서")
print(f"\n예시:\n{glossary_docs[0]}")

### 2-2. CSV FAQ → 문서 변환

In [None]:
import csv

def transform_faq(csv_path):
    """FAQ CSV를 RAG 문서로 변환"""
    documents = []
    
    with open(csv_path, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            doc = f"""[FAQ - {row['카테고리']}]
질문: {row['질문']}
답변: {row['답변']}"""
            documents.append(doc)
    
    return documents

faq_docs = transform_faq(data_dir / "faq.csv")

print(f"✅ FAQ 변환 완료: {len(faq_docs)}개 문서")
print(f"\n예시:\n{faq_docs[0]}")

### 2-3. 텍스트 전처리 (선택사항)

In [None]:
def preprocess_text(text):
    """텍스트 정제"""
    # 불필요한 공백 제거
    text = " ".join(text.split())
    # 특수문자 정리 (필요시)
    # text = re.sub(r'[^가-힣a-zA-Z0-9\s\[\]\-:,.]', '', text)
    return text.strip()

# 모든 문서 전처리
all_docs = glossary_docs + faq_docs
processed_docs = [preprocess_text(doc) for doc in all_docs]

print(f"✅ 전처리 완료: 총 {len(processed_docs)}개 문서")

## 3️⃣ Load: 벡터 DB에 적재

### 3-1. 기존 데이터 초기화 (선택)

In [None]:
# 깨끗하게 시작하려면 주석 해제
# store.clear()
# print("🗑️ 기존 데이터 삭제")

print(f"현재 저장된 문서: {store.count()}개")

### 3-2. 배치 적재

In [None]:
# 문서 적재
rag.add_documents(processed_docs)

print(f"✅ 적재 완료!")
print(f"총 문서 수: {store.count()}개")
print(f"- 용어집: {len(glossary_docs)}개")
print(f"- FAQ: {len(faq_docs)}개")

## 4️⃣ 검증: 구축된 온톨로지 테스트

### 4-1. 용어 검색 테스트

In [None]:
test_questions = [
    "온투금융이 뭐야?",
    "중금리가 뭔가요?",
    "P2P 대출 설명해줘",
]

print("🧪 용어 검색 테스트\n" + "="*80)

for q in test_questions:
    answer = rag.query(q, top_k=2)
    print(f"\n질문: {q}")
    print(f"답변: {answer}")
    print("-"*80)

### 4-2. FAQ 검색 테스트

In [None]:
faq_questions = [
    "최소 투자금액 알려줘",
    "대출 한도는 얼마야?",
    "중도 상환 수수료 있어?",
]

print("🧪 FAQ 검색 테스트\n" + "="*80)

for q in faq_questions:
    answer = rag.query(q, top_k=2)
    print(f"\n질문: {q}")
    print(f"답변: {answer}")
    print("-"*80)

## 5️⃣ 고급: 자동 업데이트 파이프라인

### 5-1. 증분 업데이트 (새 데이터만 추가)

In [None]:
def incremental_update(new_data_path):
    """새로운 데이터만 추가"""
    # 1. Extract
    with open(new_data_path, "r", encoding="utf-8") as f:
        new_data = json.load(f)
    
    # 2. Transform
    new_docs = transform_glossary(new_data_path)
    
    # 3. Load
    rag.add_documents(new_docs)
    
    return len(new_docs)

# 예시: 새 용어 추가
new_terms = {
    "terms": [
        {
            "term": "에어팩",
            "full_name": "AIRPACK",
            "definition": "PFCT의 AI 기반 신용 리스크 관리 B2B 솔루션",
            "category": "제품"
        }
    ]
}

with open(data_dir / "new_terms.json", "w", encoding="utf-8") as f:
    json.dump(new_terms, f, ensure_ascii=False, indent=2)

added = incremental_update(data_dir / "new_terms.json")
print(f"✅ {added}개 새 용어 추가")
print(f"총 문서 수: {store.count()}개")

### 5-2. 전체 ETL 파이프라인 함수

In [None]:
def run_etl_pipeline(data_sources, clear_existing=False):
    """전체 ETL 파이프라인 실행"""
    print("🚀 ETL 파이프라인 시작\n")
    
    # 0. 초기화 (선택)
    if clear_existing:
        store.clear()
        print("🗑️ 기존 데이터 삭제")
    
    all_documents = []
    
    # 1. Extract & Transform
    for source in data_sources:
        source_type = source["type"]
        source_path = source["path"]
        
        print(f"📥 처리 중: {source_path}")
        
        if source_type == "glossary":
            docs = transform_glossary(source_path)
        elif source_type == "faq":
            docs = transform_faq(source_path)
        else:
            print(f"⚠️ 알 수 없는 타입: {source_type}")
            continue
        
        all_documents.extend(docs)
        print(f"  ✓ {len(docs)}개 문서 변환")
    
    # 2. Preprocess
    processed = [preprocess_text(doc) for doc in all_documents]
    print(f"\n🔧 전처리 완료: {len(processed)}개 문서")
    
    # 3. Load
    rag.add_documents(processed)
    print(f"\n✅ 적재 완료: 총 {store.count()}개 문서")
    
    return store.count()

# 사용 예시
data_sources = [
    {"type": "glossary", "path": data_dir / "glossary.json"},
    {"type": "faq", "path": data_dir / "faq.csv"},
]

# total = run_etl_pipeline(data_sources, clear_existing=True)
print("\n💡 위 코드 주석을 해제하면 전체 파이프라인이 실행됩니다.")

## 6️⃣ 모니터링 및 품질 관리

### 6-1. 문서 통계

In [None]:
def get_statistics():
    """문서 통계 확인"""
    results = store.get_all()
    docs = results['documents']
    
    total = len(docs)
    avg_length = sum(len(doc) for doc in docs) / total if total > 0 else 0
    
    # 카테고리별 분류
    categories = {}
    for doc in docs:
        if "[용어]" in doc:
            categories["용어집"] = categories.get("용어집", 0) + 1
        elif "[FAQ" in doc:
            categories["FAQ"] = categories.get("FAQ", 0) + 1
        else:
            categories["기타"] = categories.get("기타", 0) + 1
    
    print("📊 문서 통계")
    print("="*50)
    print(f"총 문서 수: {total}개")
    print(f"평균 문서 길이: {avg_length:.0f}자")
    print(f"\n카테고리별 분포:")
    for cat, count in categories.items():
        print(f"  - {cat}: {count}개 ({count/total*100:.1f}%)")

get_statistics()

### 6-2. 검색 품질 테스트

In [None]:
def test_search_quality(test_cases):
    """검색 품질 평가"""
    print("🎯 검색 품질 테스트\n" + "="*80)
    
    for i, (question, expected_keyword) in enumerate(test_cases, 1):
        answer = rag.query(question, top_k=2)
        
        # 기대 키워드가 답변에 포함되는지 확인
        is_correct = expected_keyword.lower() in answer.lower()
        status = "✅" if is_correct else "❌"
        
        print(f"\n{i}. {status} 질문: {question}")
        print(f"   기대 키워드: {expected_keyword}")
        print(f"   답변: {answer[:100]}...")
        print("-"*80)

# 테스트 케이스
test_cases = [
    ("온투금융이 뭐야?", "온라인투자연계금융"),
    ("최소 투자금액은?", "10만원"),
    ("중금리 설명해줘", "10~20%"),
]

test_search_quality(test_cases)

## 🎯 정리

### ETL 파이프라인 구축 완료! 🎉

**배운 내용:**
1. ✅ 다양한 형식(JSON, CSV)에서 데이터 추출
2. ✅ 구조화된 문서로 변환
3. ✅ 벡터 DB에 효율적으로 적재
4. ✅ 증분 업데이트 구현
5. ✅ 품질 모니터링

### 실무 적용 팁

1. **자동화**: 크론잡이나 스케줄러로 주기적 실행
2. **버전 관리**: 문서 변경 이력 추적
3. **A/B 테스트**: 다른 청크 크기, 전처리 방법 비교
4. **모니터링**: 검색 품질 지표 추적

### 다음 단계

- PDF, Word 문서 처리
- 웹 크롤링 자동화
- 실시간 업데이트 파이프라인
- 멀티모달 데이터 (이미지, 표) 처리