# 📄 Smart CLM 문서 처리 파이프라인

Smart CLM 시스템의 문서 처리 및 RAG 평가를 위한 통합 노트북입니다.

## 🏗️ 시스템 구성
- **문서 처리 엔진**: PDF 문서 분석 및 구조화
- **로컬 스토리지**: 파일 시스템 기반 데이터 저장
- **벡터 저장소**: 로컬 파일 기반 벡터 데이터 관리
- **Jupyter Notebook**: 문서 처리 및 평가 도구

## 📋 주요 기능
1. **docs 폴더 스캔**: PDF 파일 자동 탐색
2. **문서 분석**: 내장 PDF 처리 엔진으로 분석
3. **로컬 저장**: 파일 시스템에 결과 저장
4. **RAG 평가**: 문서 처리 품질 및 검색 성능 평가
5. **결과 시각화**: 처리 결과 및 성능 지표 대시보드

## 🚀 사용 방법
1. `docs/` 폴더에 처리할 PDF 파일 배치
2. 노트북 셀 순서대로 실행
3. 결과 확인 및 분석


In [1]:
# 0. 의존성 설치
!pip install ragas docling streamlit

Collecting ragas
  Using cached ragas-0.2.15-py3-none-any.whl.metadata (9.0 kB)
Collecting docling
  Using cached docling-2.41.0-py3-none-any.whl.metadata (10 kB)
Collecting streamlit
  Using cached streamlit-1.46.1-py3-none-any.whl.metadata (9.0 kB)
Collecting tiktoken (from ragas)
  Using cached tiktoken-0.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Collecting langchain_openai (from ragas)
  Using cached langchain_openai-0.3.27-py3-none-any.whl.metadata (2.3 kB)
Collecting openai>1 (from ragas)
  Using cached openai-1.95.1-py3-none-any.whl.metadata (29 kB)
Collecting docling-core<3.0.0,>=2.42.0 (from docling-core[chunking]<3.0.0,>=2.42.0->docling)
  Using cached docling_core-2.42.0-py3-none-any.whl.metadata (6.5 kB)
Collecting docling-parse<5.0.0,>=4.0.0 (from docling)
  Using cached docling_parse-4.1.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)
Collecting docling-ibm-models<4,>=3.6.0 (from docling)
  Using cached

## 🛠️ 1. 환경 설정 및 라이브러리 import

In [3]:
# 필수 라이브러리 import
import os
import json
import logging
import time
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
import warnings
warnings.filterwarnings('ignore')

# 한글 폰트 설정
plt.rcParams['font.family'] = 'DejaVu Sans'
plt.rcParams['axes.unicode_minus'] = False

# 시각화 스타일 설정
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 8)

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 기본 디렉토리 설정
ROOT_DIR = Path(".")
DOCS_DIR = ROOT_DIR / "docs"
RESULTS_DIR = ROOT_DIR / "results"
PROCESSED_DIR = ROOT_DIR / "processed"
VECTORS_DIR = ROOT_DIR / "vectors"
CHUNKS_DIR = ROOT_DIR / "chunks"

# 디렉토리 생성
for dir_path in [DOCS_DIR, RESULTS_DIR, PROCESSED_DIR, VECTORS_DIR, CHUNKS_DIR]:
    dir_path.mkdir(exist_ok=True)

print("✅ 환경 설정 완료")
print(f"📁 문서 폴더: {DOCS_DIR}")
print(f"📊 결과 폴더: {RESULTS_DIR}")
print(f"💾 처리 폴더: {PROCESSED_DIR}")
print(f"🔢 벡터 폴더: {VECTORS_DIR}")
print(f"📑 청크 폴더: {CHUNKS_DIR}")

# docs 폴더 상태 확인
pdf_files = list(DOCS_DIR.glob("*.pdf"))
print(f"\n📄 docs 폴더 내 PDF 파일: {len(pdf_files)}개")
for pdf_file in pdf_files:
    file_size = pdf_file.stat().st_size
    print(f"  - {pdf_file.name}: {file_size:,} bytes")


✅ 환경 설정 완료
📁 문서 폴더: docs
📊 결과 폴더: results
💾 처리 폴더: processed
🔢 벡터 폴더: vectors
📑 청크 폴더: chunks

📄 docs 폴더 내 PDF 파일: 0개


## 📄 2. PDF 처리 엔진 구현

In [None]:
# docling 라이브러리 import
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import DocumentConverter, PdfFormatOption

class DocumentProcessingConfig:
    def __init__(
        self,
        use_ocr: bool = True,
        extract_tables: bool = True,
        extract_images: bool = True,
        extract_outline: bool = True,
        min_chunk_size: int = 100,
        max_chunk_size: int = 1000,
        overlap_size: int = 50,
        language: str = "kor",
    ):
        self.use_ocr = use_ocr
        self.extract_tables = extract_tables
        self.extract_images = extract_images
        self.extract_outline = extract_outline
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        self.overlap_size = overlap_size
        self.language = language
    
    def to_dict(self) -> Dict:
        return {
            "use_ocr": self.use_ocr,
            "extract_tables": self.extract_tables,
            "extract_images": self.extract_images,
            "extract_outline": self.extract_outline,
            "min_chunk_size": self.min_chunk_size,
            "max_chunk_size": self.max_chunk_size,
            "overlap_size": self.overlap_size,
            "language": self.language,
        }
    
    @classmethod
    def from_dict(cls, config_dict: Dict) -> "DocumentProcessingConfig":
        return cls(**config_dict)

def quick_layout_analysis(doc_analysis) -> dict:
    """
    빠른 레이아웃 분석으로 문서 구조 파악
    """
    try:
        # 문서 특성 분석
        text_content = doc_analysis.export_to_text()
        text_length = len(text_content.strip())

        analysis = {
            "has_tables": len(doc_analysis.tables) > 0,
            "table_count": len(doc_analysis.tables),
            "has_images": len(doc_analysis.pictures) > 0,
            "image_count": len(doc_analysis.pictures),
            "is_scanned": text_length < 100,  # 텍스트가 거의 없으면 스캔 문서
            "text_length": text_length,
            "is_text_heavy": text_length > 5000,  # 텍스트 위주 문서
            "page_count": len(doc_analysis.pages),
        }

        logger.info("📊 분석 결과:")
        logger.info(f"  - 페이지: {analysis['page_count']}페이지")
        logger.info(
            f"  - 텍스트: {analysis['text_length']}자 ({'스캔' if analysis['is_scanned'] else '디지털'})"
        )
        logger.info(f"  - 테이블: {analysis['table_count']}개")
        logger.info(f"  - 이미지: {analysis['image_count']}개")

        return analysis

    except Exception as e:
        logger.error(f"레이아웃 분석 실패: {str(e)}")
        return {
            "has_tables": True,  # 안전을 위해 True
            "table_count": 0,
            "has_images": True,  # 안전을 위해 True
            "image_count": 0,
            "is_scanned": False,
            "text_length": 0,
            "is_text_heavy": False,
            "page_count": 1,
        }

def create_optimized_pipeline(config: DocumentProcessingConfig) -> PdfPipelineOptions:
    """
    설정에 따라 최적의 파이프라인 옵션 생성
    """
    options = PdfPipelineOptions()

    # OCR 설정
    options.do_ocr = config.use_ocr
    if config.use_ocr:
        options.ocr_options.lang = ["ko", "en"]
        logger.info("🔍 OCR 활성화")
    else:
        logger.info("📄 OCR 비활성화")

    # 테이블 처리 설정
    options.do_table_structure = config.extract_tables
    if config.extract_tables:
        options.table_structure_options.do_cell_matching = True
        logger.info("📊 테이블 구조 인식 활성화")
    else:
        logger.info("📝 테이블 구조 인식 비활성화")

    # 이미지 처리 설정
    options.generate_picture_images = config.extract_images
    if config.extract_images:
        options.images_scale = 2.0
        logger.info("🖼️ 이미지 처리 활성화")
    else:
        logger.info("📝 이미지 처리 비활성화")

    # 페이지 이미지는 기본적으로 비활성화 (용량 절약)
    options.generate_page_images = False

    return options

class DocumentProcessor:
    def __init__(self, config: DocumentProcessingConfig):
        self.config = config
        self.pipeline = create_optimized_pipeline(config)
        self.converter = DocumentConverter(
            format_options={
                InputFormat.PDF: PdfFormatOption(pipeline_options=self.pipeline)
            }
        )
    
    def process_document(self, pdf_path: Path) -> Dict:
        """문서를 처리하고 결과를 반환합니다."""
        results = {
            "metadata": {},
            "tables": [],
            "images": [],
            "outline": [],
            "chunks": [],
            "errors": []
        }
        
        try:
            # 1. PDF 파일 처리
            start_time = time.time()
            doc_analysis = self.converter.convert(str(pdf_path)).document
            
            # 2. 레이아웃 분석
            layout_info = quick_layout_analysis(doc_analysis)
            results["metadata"]["layout"] = layout_info
            
            # 3. 테이블 추출
            if self.config.extract_tables:
                tables = self._extract_tables(doc_analysis)
                results["tables"] = tables
            
            # 4. 이미지 추출
            if self.config.extract_images:
                images = self._extract_images(doc_analysis)
                results["images"] = images
            
            # 5. 아웃라인 추출
            if self.config.extract_outline:
                outline = self._extract_outline(doc_analysis)
                results["outline"] = outline
            
            # 6. 청크 생성
            chunks = self._create_chunks(doc_analysis)
            results["chunks"] = chunks
            
            # 7. 메타데이터 추가
            processing_time = time.time() - start_time
            results["metadata"].update({
                "filename": pdf_path.name,
                "page_count": len(doc_analysis.pages),
                "processing_time": f"{processing_time:.2f}초",
                "processing_config": self.config.to_dict()
            })
            
            logger.info(f"✅ 문서 처리 완료 ({processing_time:.2f}초)")
            
        except Exception as e:
            error_msg = f"문서 처리 실패: {str(e)}"
            logger.error(error_msg)
            results["errors"].append(error_msg)
        
        return results
    
    def _extract_tables(self, doc_analysis) -> List[Dict]:
        """테이블 정보를 추출합니다."""
        tables = []
        for page_idx, page in enumerate(doc_analysis.pages):
            page_tables = page.find_tables()
            for table_idx, table in enumerate(page_tables):
                table_info = {
                    "page": page_idx + 1,
                    "table_idx": table_idx,
                    "content": table.extract(),
                    "bbox": table.bbox.to_dict()
                }
                tables.append(table_info)
        return tables
    
    def _extract_images(self, doc_analysis) -> List[Dict]:
        """이미지 정보를 추출합니다."""
        images = []
        for page_idx, page in enumerate(doc_analysis.pages):
            page_images = page.find_images()
            for img_idx, img in enumerate(page_images):
                img_info = {
                    "page": page_idx + 1,
                    "image_idx": img_idx,
                    "bbox": img.bbox.to_dict()
                }
                images.append(img_info)
        return images
    
    def _extract_outline(self, doc_analysis) -> List[Dict]:
        """문서 아웃라인을 추출합니다."""
        outline = []
        if hasattr(doc_analysis, "outline"):
            for item in doc_analysis.outline:
                outline_item = {
                    "title": item.title,
                    "level": item.level,
                    "page": item.page
                }
                outline.append(outline_item)
        return outline
    
    def _create_chunks(self, doc_analysis) -> List[Dict]:
        """텍스트를 청크로 분할합니다."""
        chunks = []
        current_chunk = ""
        current_page = 1
        
        for page_idx, page in enumerate(doc_analysis.pages):
            page_text = page.extract_text()
            words = page_text.split()
            
            for word in words:
                if len(current_chunk) + len(word) + 1 <= self.config.max_chunk_size:
                    current_chunk += word + " "
                else:
                    if len(current_chunk) >= self.config.min_chunk_size:
                        chunk_info = {
                            "text": current_chunk.strip(),
                            "page": current_page,
                            "size": len(current_chunk)
                        }
                        chunks.append(chunk_info)
                    
                    current_chunk = word + " "
                    current_page = page_idx + 1
            
            # 페이지 끝에서 청크 저장
            if len(current_chunk) >= self.config.min_chunk_size:
                chunk_info = {
                    "text": current_chunk.strip(),
                    "page": current_page,
                    "size": len(current_chunk)
                }
                chunks.append(chunk_info)
                current_chunk = ""
        
        return chunks

# 기본 설정으로 프로세서 생성
default_config = DocumentProcessingConfig()
processor = DocumentProcessor(default_config)
print("✅ 문서 처리기가 준비되었습니다.")

INFO:__main__:🔍 OCR 활성화
INFO:__main__:📊 테이블 구조 인식 활성화
INFO:__main__:🖼️ 이미지 처리 활성화


✅ 문서 처리기가 준비되었습니다.


In [5]:
def process_documents(
    config: Optional[Dict] = None,
    input_dir: Optional[Path] = None,
    output_dir: Optional[Path] = None
) -> List[Dict]:
    """여러 문서를 일괄 처리합니다."""
    # 설정 로드
    if config is None:
        processing_config = default_config
    else:
        processing_config = DocumentProcessingConfig.from_dict(config)
    
    # 입/출력 경로 설정
    input_dir = input_dir or DOCS_DIR
    output_dir = output_dir or PROCESSED_DIR
    
    # PDF 파일 목록 가져오기
    pdf_files = list(input_dir.glob("*.pdf"))
    print(f"총 {len(pdf_files)}개의 PDF 파일을 처리합니다.")
    
    # 결과 저장용 리스트
    results = []
    
    # 각 파일 처리
    for pdf_file in pdf_files:
        print(f"\n{pdf_file.name} 처리 중...")
        
        try:
            # 문서 처리
            result = processor.process_document(pdf_file)
            
            # 결과 저장
            output_file = output_dir / f"{pdf_file.stem}_processed.json"
            with open(output_file, "w", encoding="utf-8") as f:
                json.dump(result, f, ensure_ascii=False, indent=2)
            
            print(f"처리 완료: {output_file}")
            results.append(result)
            
        except Exception as e:
            print(f"처리 실패: {str(e)}")
            results.append({
                "filename": pdf_file.name,
                "error": str(e)
            })
    
    return results

# 사용 예시
custom_config = {
    "use_ocr": True,
    "extract_tables": True,
    "extract_images": True,
    "extract_outline": True,
    "min_chunk_size": 200,
    "max_chunk_size": 800,
    "overlap_size": 100,
    "language": "kor"
}

print("\n문서 처리를 시작하려면 다음과 같이 실행하세요:")
print("results = process_documents(config=custom_config)")
print("또는 기본 설정으로 실행:")
print("results = process_documents()")



문서 처리를 시작하려면 다음과 같이 실행하세요:
results = process_documents(config=custom_config)
또는 기본 설정으로 실행:
results = process_documents()


In [6]:
def extract_tables_info(document) -> Dict[str, Any]:
    """문서에서 테이블 정보를 추출합니다."""
    tables_info = []

    if hasattr(document, "tables") and document.tables:
        for idx, table in enumerate(document.tables):
            try:
                table_data = {
                    "table_index": idx + 1,
                    "rows": getattr(table, "num_rows", 0),
                    "cols": getattr(table, "num_cols", 0),
                    "markdown": table.export_to_markdown()
                    if hasattr(table, "export_to_markdown")
                    else "",
                    "html": table.export_to_html()
                    if hasattr(table, "export_to_html")
                    else "",
                }
                # 페이지 번호 추출
                if hasattr(table, "prov") and table.prov:
                    for prov in table.prov:
                        if hasattr(prov, "page_no"):
                            table_data["page"] = prov.page_no
                            break
                tables_info.append(table_data)
            except Exception as e:
                logger.warning(f"테이블 {idx} 처리 오류: {e}")

    return {"count": len(tables_info), "data": tables_info}


def extract_images_info(document) -> Dict[str, Any]:
    """문서에서 이미지 정보를 추출합니다."""
    images_info = []

    if hasattr(document, "pictures") and document.pictures:
        for idx, picture in enumerate(document.pictures):
            try:
                image_data = {
                    "image_index": idx + 1,
                    "caption": picture.caption_text(doc=document)
                    if hasattr(picture, "caption_text")
                    else "",
                    "has_image": hasattr(picture, "image")
                    and picture.image is not None,
                }
                # 페이지 번호 추출
                if hasattr(picture, "prov") and picture.prov:
                    for prov in picture.prov:
                        if hasattr(prov, "page_no"):
                            image_data["page"] = prov.page_no
                            break
                images_info.append(image_data)
            except Exception as e:
                logger.warning(f"이미지 {idx} 처리 오류: {e}")

    return {
        "count": len(images_info),
        "data": images_info,
        "processing": {"advanced_enabled": False, "ocr_enabled": False},
    }


def extract_outline_info(result) -> Dict[str, Any]:
    """모든 문서에서 아웃라인 정보를 추출합니다."""
    outline_data = _extract_outline_from_text(result)
    logger.info(f"아웃라인 {len(outline_data)}개 추출 완료")

    return {
        "enabled": len(outline_data) > 0,
        "reason": "전체 문서 아웃라인 추출",
        "data": outline_data,
    }


def _extract_outline_from_text(result) -> List[Dict[str, Any]]:
    """section_header 라벨만 필터링해서 아웃라인 추출 (내부 함수)"""
    outline_data = []

    try:
        document = result.document

        logger.info("=== section_header 라벨 기반 아웃라인 추출 ===")
        logger.info(f"document.pages 타입: {type(document.pages)}")
        logger.info(
            f"document.pages 길이: {len(document.pages) if hasattr(document.pages, '__len__') else 'N/A'}"
        )

        if hasattr(document, "texts"):
            for item in document.texts:
                # section_header 라벨만 찾기
                if hasattr(item, "label") and item.label == "section_header":
                    text_content = ""
                    if hasattr(item, "text") and item.text:
                        text_content = item.text.strip()

                    if text_content:
                        logger.info(f"제목 발견: '{text_content}'")

                        # 위치 정보 추출
                        page_idx = 0
                        y_position = 0.0
                        bbox_info = {}

                        if hasattr(item, "prov") and item.prov:
                            for prov in item.prov:
                                if hasattr(prov, "page_no"):
                                    page_idx = prov.page_no - 1  # 1-based to 0-based

                                if hasattr(prov, "bbox"):
                                    bbox_info, y_position = _extract_bbox_info(
                                        prov.bbox, document, page_idx
                                    )
                                    break

                        # 첫 번째면 introduction, 나머지는 main body
                        element_type = (
                            "introduction" if len(outline_data) == 0 else "main body"
                        )

                        outline_item = {
                            "title": text_content,
                            "pageIndex": page_idx,
                            "y": y_position,
                            "bbox": bbox_info,
                            "dest": f"Section_{len(outline_data) + 2}",
                            "size": 23.3333,
                            "type": element_type,
                            "items": [],
                        }
                        outline_data.append(outline_item)

        logger.info(f"section_header로 찾은 아웃라인: {len(outline_data)}개")

    except Exception as e:
        logger.error(f"아웃라인 추출 에러: {e}")
        import traceback
        traceback.print_exc()

    return outline_data


def _extract_bbox_info(bbox, document, page_idx: int) -> tuple[Dict[str, Any], float]:
    """bbox 정보를 추출하고 정규화합니다."""
    # 페이지 크기 구하기 (안전하게)
    page_width = 595  # A4 기본 너비
    page_height = 842  # A4 기본 높이
    y_position = 0.0

    try:
        if hasattr(document, "pages") and page_idx < len(document.pages):
            # 다양한 접근 방식 시도
            if isinstance(document.pages, list):
                page = document.pages[page_idx]
            elif isinstance(document.pages, dict):
                page = document.pages.get(page_idx) or document.pages.get(str(page_idx))
            else:
                page = None

            if page and hasattr(page, "size"):
                if hasattr(page.size, "height"):
                    page_height = page.size.height
                if hasattr(page.size, "width"):
                    page_width = page.size.width
    except (KeyError, IndexError, AttributeError) as e:
        logger.warning(f"페이지 정보 접근 오류: {e}, 기본값 사용")

    # bbox 전체 정보 수집
    bbox_info = {}

    # 다양한 bbox 속성 확인
    if (
        hasattr(bbox, "l")
        and hasattr(bbox, "r")
        and hasattr(bbox, "t")
        and hasattr(bbox, "b")
    ):
        # l, r, t, b 형식
        bbox_info = {
            "left": bbox.l / page_width,
            "right": bbox.r / page_width,
            "top": bbox.t / page_height,
            "bottom": bbox.b / page_height,
            "width": (bbox.r - bbox.l) / page_width,
            "height": (bbox.b - bbox.t) / page_height,
        }
        y_position = bbox.t / page_height

    return bbox_info, y_position


## 📋 3. 문서 처리 파이프라인 정의

In [7]:
def process_documents():
    """문서 처리 파이프라인 실행"""
    logger.info("🚀 문서 처리 파이프라인 시작")
    
    # 1. docs 폴더의 PDF 파일 목록 가져오기
    pdf_files = list(DOCS_DIR.glob("*.pdf"))
    logger.info(f"📁 처리할 PDF 파일: {len(pdf_files)}개")
    
    # 2. 각 PDF 파일 처리
    for pdf_file in pdf_files:
        try:
            logger.info(f"\n{'='*50}")
            logger.info(f"📄 처리 시작: {pdf_file.name}")
            
            # 2.1. PDF 파일 분석
            result = pdf_service.analyze_pdf(str(pdf_file))
            
            # 2.2. 청크 생성
            chunks = []
            
            # 2.2.1. 텍스트 기반 청크
            text_content = result["markdown_content"]
            if text_content:
                # 섹션 단위로 분할
                sections = text_content.split("\n## ")
                for section in sections:
                    if section.strip():
                        chunks.append({
                            "type": "text",
                            "content": section.strip(),
                            "metadata": {
                                "source": pdf_file.name,
                                "type": "text_section"
                            }
                        })
            
            # 2.2.2. 테이블 기반 청크
            for table in result["tables"]["data"]:
                if table["markdown"]:
                    chunks.append({
                        "type": "table",
                        "content": table["markdown"],
                        "metadata": {
                            "source": pdf_file.name,
                            "type": "table",
                            "page": table.get("page", 1),
                            "rows": table["rows"],
                            "cols": table["cols"]
                        }
                    })
            
            # 2.2.3. 이미지 기반 청크 (캡션이 있는 경우)
            for image in result["images"]["data"]:
                if image["caption"]:
                    chunks.append({
                        "type": "image",
                        "content": image["caption"],
                        "metadata": {
                            "source": pdf_file.name,
                            "type": "image_caption",
                            "page": image.get("page", 1)
                        }
                    })
            
            # 2.3. 청크 저장
            chunk_file = CHUNKS_DIR / f"chunks_{pdf_file.stem}.json"
            with open(chunk_file, 'w', encoding='utf-8') as f:
                json.dump(chunks, f, ensure_ascii=False, indent=2)
            
            logger.info(f"✅ 청크 생성 완료: {len(chunks)}개")
            logger.info(f"  - 저장 위치: {chunk_file}")
            
            # 2.4. 벡터 저장 (향후 구현)
            # TODO: 벡터 저장 로직 구현
            
        except Exception as e:
            logger.error(f"❌ 처리 실패 ({pdf_file.name}): {str(e)}")
            continue
    
    logger.info("\n🎉 모든 문서 처리 완료")

# 파이프라인 준비
print("✅ 파이프라인 준비 완료")

✅ 파이프라인 준비 완료


## 🚀 4. 파이프라인 실행

In [8]:
# 파이프라인 실행
process_documents()

INFO:__main__:🚀 문서 처리 파이프라인 시작
INFO:__main__:📁 처리할 PDF 파일: 0개
INFO:__main__:
🎉 모든 문서 처리 완료


## 🧪 3. RAG 평가 클래스 정의

In [11]:
# Smart CLM 청킹 로직
class HierarchicalChunker:
    """
    Docling에서 추출한 마크다운을 기반으로 계층적 청킹을 수행하는 클래스
    """

    def __init__(self):
        # 헤더 정의 (헤더 레벨과 메타데이터 키 이름)
        self.headers_to_split_on = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
            ("####", "Header 4"),
        ]
        
        # Child 문서용 텍스트 분할기 설정
        self.chunk_size = 500
        self.chunk_overlap = 50

    def chunk_markdown(self, markdown_content: str, filename: str = "document") -> Dict[str, Any]:
        """
        마크다운 콘텐츠를 계층적으로 청킹합니다.
        """
        # 단순화된 청킹 로직 (실제 구현을 기반으로 간소화)
        lines = markdown_content.split('\n')
        chunks = []
        current_chunk = []
        current_headers = {}
        
        for line in lines:
            # 헤더 감지
            if line.startswith('#'):
                if current_chunk:
                    # 이전 청크 저장
                    chunk_content = '\n'.join(current_chunk)
                    if len(chunk_content.strip()) > 0:
                        chunks.append({
                            'content': chunk_content,
                            'headers': current_headers.copy(),
                            'char_count': len(chunk_content)
                        })
                    current_chunk = []
                
                # 헤더 레벨 파악
                level = len(line) - len(line.lstrip('#'))
                header_text = line.lstrip('#').strip()
                current_headers[f'header_{level}'] = header_text
                
                # 하위 레벨 헤더 초기화
                for i in range(level + 1, 5):
                    if f'header_{i}' in current_headers:
                        del current_headers[f'header_{i}']
            
            current_chunk.append(line)
        
        # 마지막 청크 저장
        if current_chunk:
            chunk_content = '\n'.join(current_chunk)
            if len(chunk_content.strip()) > 0:
                chunks.append({
                    'content': chunk_content,
                    'headers': current_headers.copy(),
                    'char_count': len(chunk_content)
                })
        
        return {
            'success': True,
            'filename': filename,
            'chunks': chunks,
            'summary': {
                'total_chunks': len(chunks),
                'average_size': sum(c['char_count'] for c in chunks) // len(chunks) if chunks else 0
            }
        }

class SmartCLMRAGEvaluator:
    """
    Smart CLM RAG 시스템 전용 평가기
    
    계약서 도메인에 특화된 RAG 평가 메트릭과 방법론을 제공합니다.
    """
    
    # def __init__(self, api_client: SmartCLMAPIClient):
    #     self.api_client = api_client
    #     self.chunker = HierarchicalChunker()
    #     self.evaluation_results = {}
        
    def evaluate_chunking_quality(self, contract_id: int) -> Dict[str, Any]:
        """
        청킹 품질 평가
        
        Args:
            contract_id: 평가할 계약서 ID
            
        Returns:
            청킹 품질 메트릭 딕셔너리
        """
        print(f"📊 계약서 {contract_id} 청킹 품질 평가 시작")
        
        try:
            # API를 통해 청크 데이터 조회
            chunks_data = self.api_client.get_contract_chunks(contract_id)
            
            if not chunks_data:
                print(f"⚠️ API에서 데이터를 가져올 수 없습니다. 목업 데이터를 사용합니다.")
                chunks_data = self._create_mock_chunks_data(contract_id)
            
            if not chunks_data:
                print(f"⚠️ 계약서 {contract_id}의 청크 데이터가 없습니다")
                return {"error": "No chunks found"}
            
            # 청킹 메트릭 계산
            char_counts = [chunk.get('char_count', 500) for chunk in chunks_data]
            
            metrics = {
                "total_chunks": len(chunks_data),
                "parent_chunks": len([c for c in chunks_data if c.get('chunk_type') == "parent"]),
                "child_chunks": len([c for c in chunks_data if c.get('chunk_type') == "child"]),
                "avg_chunk_length": float(np.mean(char_counts)) if char_counts else 0,
                "std_chunk_length": float(np.std(char_counts)) if char_counts else 0,
                "chunks_with_embeddings": len([c for c in chunks_data if c.get('embedding') is not None]),
                "hierarchical_coverage": self._calculate_hierarchical_coverage_from_data(chunks_data)
            }
            
            print(f"✅ 청킹 평가 완료: {metrics['total_chunks']}개 청크, 임베딩 {metrics['chunks_with_embeddings']}개")
            return metrics
            
        except Exception as e:
            print(f"❌ 청킹 평가 오류: {e}")
            return {"error": str(e)}
    
    def _create_mock_chunks_data(self, contract_id: int) -> List[Dict[str, Any]]:
        """목업 청크 데이터 생성 (딕셔너리 형태)"""
        mock_chunks = []
        for i in range(15):  # 15개 청크 생성
            chunk_data = {
                'id': i + 1,
                'contract_id': contract_id,
                'chunk_type': 'parent' if i < 3 else 'child',
                'char_count': int(np.random.randint(300, 800)),
                'embedding': [0.1] * 1024 if i < 10 else None,
                'header_1': f"제{i+1}조" if i < 5 else None,
                'header_2': f"항목 {i+1}" if i < 8 else None,
                'header_3': None,
                'header_4': None,
                'content': f"계약서 {contract_id} 청크 {i+1} 내용입니다. 이것은 목업 데이터입니다."
            }
            mock_chunks.append(chunk_data)
        return mock_chunks
    
    def _calculate_hierarchical_coverage_from_data(self, chunks_data: List[Dict[str, Any]]) -> float:
        """계층 구조 커버리지 계산 (딕셔너리 데이터용)"""
        headers = ["header_1", "header_2", "header_3", "header_4"]
        coverage_scores = []
        
        for header in headers:
            chunks_with_header = len([c for c in chunks_data if c.get(header) is not None])
            coverage = chunks_with_header / len(chunks_data) if chunks_data else 0
            coverage_scores.append(coverage)
        
        return float(np.mean(coverage_scores))
    
    def evaluate_retrieval_accuracy(self, test_queries: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        검색 정확도 평가
        
        Args:
            test_queries: 테스트 쿼리 리스트 [{"query": "...", "expected_chunks": [...], "contract_id": ...}]
            
        Returns:
            검색 정확도 메트릭
        """
        print(f"🔍 {len(test_queries)}개 쿼리로 검색 정확도 평가 시작")
        
        precision_scores = []
        recall_scores = []
        f1_scores = []
        
        for i, test_case in enumerate(test_queries):
            query = test_case["query"]
            expected_chunks = set(test_case.get("expected_chunks", []))
            contract_id = test_case.get("contract_id", 1)
            
            # API 또는 시뮬레이션을 통한 벡터 검색
            retrieved_chunks = self._perform_vector_search(query, contract_id)
            retrieved_chunk_ids = set([str(chunk.get('id', i)) for chunk in retrieved_chunks])
            
            # 정밀도, 재현율, F1 계산
            precision = 0.0
            recall = 0.0
            f1 = 0.0
            
            if retrieved_chunk_ids:
                precision = len(expected_chunks & retrieved_chunk_ids) / len(retrieved_chunk_ids)
                precision_scores.append(precision)
            
            if expected_chunks:
                recall = len(expected_chunks & retrieved_chunk_ids) / len(expected_chunks)
                recall_scores.append(recall)
            
            if precision > 0 or recall > 0:
                f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
                f1_scores.append(f1)
            
            print(f"  쿼리 {i+1}: P={precision:.3f}, R={recall:.3f}, F1={f1:.3f}")
        
        results = {
            "avg_precision": float(np.mean(precision_scores)) if precision_scores else 0.0,
            "avg_recall": float(np.mean(recall_scores)) if recall_scores else 0.0,
            "avg_f1": float(np.mean(f1_scores)) if f1_scores else 0.0,
            "total_queries": len(test_queries)
        }
        
        print(f"✅ 검색 평가 완료: P={results['avg_precision']:.3f}, R={results['avg_recall']:.3f}, F1={results['avg_f1']:.3f}")
        return results
    
    def _perform_vector_search(self, query: str, contract_id: int, top_k: int = 5) -> List[Dict[str, Any]]:
        """벡터 검색 수행 (API 우선, 실패시 목업 데이터)"""
        # 먼저 API를 통한 실제 검색 시도
        search_results = self.api_client.search_chunks(query, contract_id, top_k)
        
        if search_results:
            return search_results
        else:
            # API 실패시 목업 데이터 반환
            print(f"  ⚠️ API 검색 실패, 목업 데이터 사용")
            mock_chunks = self._create_mock_chunks_data(contract_id)
            return mock_chunks[:top_k]

print("✅ SmartCLMRAGEvaluator 클래스 정의 완료")

✅ SmartCLMRAGEvaluator 클래스 정의 완료


## 📝 4. 테스트 데이터 생성 및 평가 실행

In [13]:
# RAG 평가기 인스턴스 생성 (API 클라이언트와 함께)
# evaluator = SmartCLMRAGEvaluator(api_client)

# 계약서 도메인 특화 테스트 쿼리 생성
test_queries = [
    {
        "query": "계약 기간은 언제까지인가요?",
        "expected_chunks": ["1", "2", "3"],
        "contract_id": 1,
        "category": "기본정보"
    },
    {
        "query": "위약금 조항이 있나요?",
        "expected_chunks": ["5", "6"],
        "contract_id": 1,
        "category": "리스크"
    },
    {
        "query": "지급 조건은 어떻게 되나요?",
        "expected_chunks": ["4", "7", "8"],
        "contract_id": 1,
        "category": "재무"
    },
    {
        "query": "계약 해지 사유는 무엇인가요?",
        "expected_chunks": ["9", "10"],
        "contract_id": 1,
        "category": "해지"
    },
    {
        "query": "손해배상 책임은 누구에게 있나요?",
        "expected_chunks": ["11", "12"],
        "contract_id": 1,
        "category": "법적책임"
    }
]

print(f"📊 생성된 테스트 쿼리: {len(test_queries)}개")
for i, query in enumerate(test_queries):
    print(f"  {i+1}. [{query['category']}] {query['query'][:30]}...")

# 평가 결과 저장용 딕셔너리
evaluation_results = {}

📊 생성된 테스트 쿼리: 5개
  1. [기본정보] 계약 기간은 언제까지인가요?...
  2. [리스크] 위약금 조항이 있나요?...
  3. [재무] 지급 조건은 어떻게 되나요?...
  4. [해지] 계약 해지 사유는 무엇인가요?...
  5. [법적책임] 손해배상 책임은 누구에게 있나요?...


In [15]:
# 1. 청킹 품질 평가 실행
print("🔍 === 청킹 품질 평가 시작 ===")
chunking_results = evaluator.evaluate_chunking_quality(contract_id=1)
evaluation_results['chunking'] = chunking_results

print("\n📊 청킹 평가 결과:")
print(f"  - 총 청크 수: {chunking_results.get('total_chunks', 0)}")
print(f"  - Parent 청크: {chunking_results.get('parent_chunks', 0)}")
print(f"  - Child 청크: {chunking_results.get('child_chunks', 0)}")
print(f"  - 평균 청크 길이: {chunking_results.get('avg_chunk_length', 0):.1f}자")
print(f"  - 임베딩 보유 청크: {chunking_results.get('chunks_with_embeddings', 0)}")
print(f"  - 계층 구조 커버리지: {chunking_results.get('hierarchical_coverage', 0):.3f}")

# 청킹 품질 점수 계산
chunk_quality_score = 0
if chunking_results.get('total_chunks', 0) > 0:
    embedding_ratio = chunking_results.get('chunks_with_embeddings', 0) / chunking_results.get('total_chunks', 1)
    hierarchy_score = chunking_results.get('hierarchical_coverage', 0)
    chunk_quality_score = (embedding_ratio * 0.6 + hierarchy_score * 0.4) * 100

print(f"\n✅ 청킹 품질 종합 점수: {chunk_quality_score:.1f}/100")

🔍 === 청킹 품질 평가 시작 ===

📊 청킹 평가 결과:


NameError: name 'chunking_results' is not defined

In [27]:
!docker run pgvector/pgvector:0.8.0-pg16 .

docker: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?.
See 'docker run --help'.


In [25]:
os.getcwd()

'/home/sagemaker-user'