# RAG 전처리 파이프라인 설계

이 노트북은 RAG 시스템의 전처리 파이프라인을 설계하고 검증합니다.

## 목차

0. **Introduction & Setup** - 환경 설정 및 라이브러리 로드
1. **File Parser Design** - 다양한 파일 형식 파싱 (PDF, DOCX, XLSX, TXT, JSON)
2. **Text Normalization** - 텍스트 정규화 및 전처리
3. **Semantic Chunking Design** - LangChain 기반 시맨틱 청킹
4. **Metadata Management** - 메타데이터 스키마 및 관리
5. **Weaviate Integration** - 벡터 저장소 확장 및 통합
6. **End-to-End Pipeline** - 전체 파이프라인 테스트

---
## Section 0: Introduction & Setup

### 0.1 목적 및 개요

**목적**: RAG 시스템에서 사용할 문서 전처리 파이프라인 설계 및 검증

**처리 흐름**:
```
파일 입력 → 파서 선택 → 텍스트 추출 → 정규화 → 시맨틱 청킹 → 메타데이터 추가 → Weaviate 저장
```

**지원 파일 형식**:
- PDF: pdfplumber
- DOCX: python-docx
- XLSX: openpyxl
- TXT: 기본 파일 읽기
- JSON: json 모듈

### 0.2 필요 라이브러리 설치

In [5]:
# 라이브러리 설치 확인 (이미 pyproject.toml에 추가됨)
# !uv sync

### 0.3 환경 변수 로드 및 임포트

In [6]:
# 표준 라이브러리
import os
import json
import re
from pathlib import Path
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC, abstractmethod
import uuid

# 파일 파싱 라이브러리
import pdfplumber
from docx import Document
from openpyxl import load_workbook

# LangChain
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

# Pydantic
from pydantic import BaseModel, Field

# 환경 변수
from dotenv import load_dotenv
load_dotenv()

print("모든 라이브러리 로드 완료!")

모든 라이브러리 로드 완료!


In [7]:
# 테스트 데이터 경로 설정
TEST_DATA_DIR = Path("./test_data")
print(f"테스트 데이터 디렉토리: {TEST_DATA_DIR.absolute()}")
print(f"\n테스트 파일 목록:")
for f in TEST_DATA_DIR.iterdir():
    print(f"  - {f.name} ({f.stat().st_size:,} bytes)")

테스트 데이터 디렉토리: /mnt/data1/work/sm-ai-v2/my-rag-server/dev_v1/test_data

테스트 파일 목록:
  - sample.pdf (3,957 bytes)
  - sample.xlsx (7,661 bytes)
  - sample.docx (37,950 bytes)
  - sample.json (3,516 bytes)
  - sample.txt (2,625 bytes)


---
## Section 1: File Parser Design

### 1.1 RawDocument 데이터 클래스

파서의 출력을 표준화하기 위한 데이터 구조

In [8]:
@dataclass
class RawDocument:
    """파서에서 추출된 원본 문서 데이터"""
    content: str                          # 추출된 전체 텍스트
    source: str                           # 파일 경로
    file_type: str                        # 파일 확장자 (pdf, docx, xlsx, txt, json)
    file_name: str                        # 파일 이름
    metadata: Dict[str, Any] = field(default_factory=dict)  # 파일별 메타데이터
    pages: Optional[List[str]] = None     # 페이지별 텍스트 (PDF, DOCX)
    sheets: Optional[Dict[str, str]] = None  # 시트별 텍스트 (XLSX)
    
    def __post_init__(self):
        if self.pages is None:
            self.pages = []
        if self.sheets is None:
            self.sheets = {}
    
    def __repr__(self):
        return (
            f"RawDocument(\n"
            f"  file_name='{self.file_name}',\n"
            f"  file_type='{self.file_type}',\n"
            f"  content_length={len(self.content)},\n"
            f"  pages={len(self.pages)},\n"
            f"  sheets={list(self.sheets.keys()) if self.sheets else []},\n"
            f"  metadata={self.metadata}\n"
            f")"
        )

### 1.2 BaseParser 추상 클래스

In [9]:
class BaseParser(ABC):
    """모든 파서의 기본 클래스"""
    
    @property
    @abstractmethod
    def supported_extensions(self) -> List[str]:
        """지원하는 파일 확장자 목록"""
        pass
    
    @abstractmethod
    def parse(self, file_path: str) -> RawDocument:
        """파일을 파싱하여 RawDocument 반환"""
        pass
    
    def can_parse(self, file_path: str) -> bool:
        """파일 파싱 가능 여부 확인"""
        ext = Path(file_path).suffix.lower().lstrip('.')
        return ext in self.supported_extensions
    
    def _get_file_info(self, file_path: str) -> Dict[str, Any]:
        """파일 기본 정보 추출"""
        path = Path(file_path)
        stat = path.stat()
        return {
            "file_name": path.name,
            "file_type": path.suffix.lower().lstrip('.'),
            "file_size": stat.st_size,
            "created_at": datetime.fromtimestamp(stat.st_ctime).isoformat(),
            "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
        }

### 1.3 PDFParser (pdfplumber)

In [10]:
class PDFParser(BaseParser):
    """PDF 파일 파서"""
    
    @property
    def supported_extensions(self) -> List[str]:
        return ["pdf"]
    
    def parse(self, file_path: str) -> RawDocument:
        file_info = self._get_file_info(file_path)
        pages = []
        
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text = page.extract_text() or ""
                pages.append(text)
                
                # 테이블 추출 (있는 경우)
                tables = page.extract_tables()
                for table in tables:
                    if table:
                        table_text = self._table_to_text(table)
                        if table_text not in text:
                            pages[-1] += f"\n\n[Table]\n{table_text}"
            
            file_info["page_count"] = len(pdf.pages)
            file_info["pdf_metadata"] = pdf.metadata or {}
        
        content = "\n\n".join(pages)
        
        return RawDocument(
            content=content,
            source=str(Path(file_path).absolute()),
            file_type="pdf",
            file_name=file_info["file_name"],
            metadata=file_info,
            pages=pages
        )
    
    def _table_to_text(self, table: List[List]) -> str:
        """테이블을 텍스트로 변환"""
        rows = []
        for row in table:
            cells = [str(cell or "").strip() for cell in row]
            rows.append(" | ".join(cells))
        return "\n".join(rows)

### 1.4 DOCXParser (python-docx)

In [11]:
class DOCXParser(BaseParser):
    """DOCX 파일 파서"""
    
    @property
    def supported_extensions(self) -> List[str]:
        return ["docx"]
    
    def parse(self, file_path: str) -> RawDocument:
        file_info = self._get_file_info(file_path)
        doc = Document(file_path)
        
        paragraphs = []
        
        # 문단 추출
        for para in doc.paragraphs:
            text = para.text.strip()
            if text:
                paragraphs.append(text)
        
        # 테이블 추출
        for table in doc.tables:
            table_text = self._extract_table(table)
            if table_text:
                paragraphs.append(f"[Table]\n{table_text}")
        
        content = "\n\n".join(paragraphs)
        
        # 문서 속성 추출
        props = doc.core_properties
        file_info["title"] = props.title or ""
        file_info["author"] = props.author or ""
        file_info["paragraph_count"] = len(doc.paragraphs)
        file_info["table_count"] = len(doc.tables)
        
        return RawDocument(
            content=content,
            source=str(Path(file_path).absolute()),
            file_type="docx",
            file_name=file_info["file_name"],
            metadata=file_info,
            pages=paragraphs  # 문단을 페이지처럼 취급
        )
    
    def _extract_table(self, table) -> str:
        """테이블을 텍스트로 변환"""
        rows = []
        for row in table.rows:
            cells = [cell.text.strip() for cell in row.cells]
            rows.append(" | ".join(cells))
        return "\n".join(rows)

### 1.5 XLSXParser (openpyxl)

In [12]:
class XLSXParser(BaseParser):
    """XLSX 파일 파서"""
    
    @property
    def supported_extensions(self) -> List[str]:
        return ["xlsx", "xls"]
    
    def parse(self, file_path: str) -> RawDocument:
        file_info = self._get_file_info(file_path)
        wb = load_workbook(file_path, read_only=True, data_only=True)
        
        sheets = {}
        all_content = []
        
        for sheet_name in wb.sheetnames:
            ws = wb[sheet_name]
            sheet_content = self._extract_sheet(ws)
            
            if sheet_content.strip():
                sheets[sheet_name] = sheet_content
                all_content.append(f"[Sheet: {sheet_name}]\n{sheet_content}")
        
        content = "\n\n".join(all_content)
        
        file_info["sheet_count"] = len(wb.sheetnames)
        file_info["sheet_names"] = wb.sheetnames
        
        wb.close()
        
        return RawDocument(
            content=content,
            source=str(Path(file_path).absolute()),
            file_type="xlsx",
            file_name=file_info["file_name"],
            metadata=file_info,
            sheets=sheets
        )
    
    def _extract_sheet(self, ws) -> str:
        """시트 내용을 텍스트로 변환"""
        rows = []
        for row in ws.iter_rows(values_only=True):
            cells = [str(cell) if cell is not None else "" for cell in row]
            if any(cells):  # 빈 행 제외
                rows.append(" | ".join(cells))
        return "\n".join(rows)

### 1.6 TXTParser

In [13]:
class TXTParser(BaseParser):
    """TXT 파일 파서"""
    
    @property
    def supported_extensions(self) -> List[str]:
        return ["txt", "md", "rst"]
    
    def parse(self, file_path: str) -> RawDocument:
        file_info = self._get_file_info(file_path)
        
        # 인코딩 자동 감지 시도
        encodings = ['utf-8', 'cp949', 'euc-kr', 'latin-1']
        content = None
        
        for encoding in encodings:
            try:
                with open(file_path, 'r', encoding=encoding) as f:
                    content = f.read()
                file_info["encoding"] = encoding
                break
            except UnicodeDecodeError:
                continue
        
        if content is None:
            raise ValueError(f"파일 인코딩을 감지할 수 없습니다: {file_path}")
        
        file_info["line_count"] = content.count('\n') + 1
        file_info["char_count"] = len(content)
        
        return RawDocument(
            content=content,
            source=str(Path(file_path).absolute()),
            file_type=file_info["file_type"],
            file_name=file_info["file_name"],
            metadata=file_info
        )

### 1.7 JSONParser

In [14]:
class JSONParser(BaseParser):
    """JSON 파일 파서"""
    
    @property
    def supported_extensions(self) -> List[str]:
        return ["json"]
    
    def parse(self, file_path: str) -> RawDocument:
        file_info = self._get_file_info(file_path)
        
        with open(file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        # JSON을 읽기 쉬운 텍스트로 변환
        content = self._json_to_text(data)
        
        file_info["json_type"] = type(data).__name__
        if isinstance(data, dict):
            file_info["top_level_keys"] = list(data.keys())
        elif isinstance(data, list):
            file_info["item_count"] = len(data)
        
        return RawDocument(
            content=content,
            source=str(Path(file_path).absolute()),
            file_type="json",
            file_name=file_info["file_name"],
            metadata=file_info
        )
    
    def _json_to_text(self, data: Any, prefix: str = "") -> str:
        """JSON 데이터를 텍스트로 변환 (계층적 구조 유지)"""
        lines = []
        
        if isinstance(data, dict):
            for key, value in data.items():
                if isinstance(value, (dict, list)):
                    lines.append(f"{prefix}{key}:")
                    lines.append(self._json_to_text(value, prefix + "  "))
                else:
                    lines.append(f"{prefix}{key}: {value}")
        elif isinstance(data, list):
            for i, item in enumerate(data):
                if isinstance(item, (dict, list)):
                    lines.append(f"{prefix}[{i}]:")
                    lines.append(self._json_to_text(item, prefix + "  "))
                else:
                    lines.append(f"{prefix}- {item}")
        else:
            lines.append(f"{prefix}{data}")
        
        return "\n".join(lines)

### 1.8 UnifiedFileParser (통합 인터페이스)

In [15]:
class UnifiedFileParser:
    """통합 파일 파서 - 파일 확장자에 따라 적절한 파서 선택"""
    
    def __init__(self):
        self._parsers: List[BaseParser] = [
            PDFParser(),
            DOCXParser(),
            XLSXParser(),
            TXTParser(),
            JSONParser(),
        ]
    
    def get_supported_extensions(self) -> List[str]:
        """지원하는 모든 파일 확장자 반환"""
        extensions = []
        for parser in self._parsers:
            extensions.extend(parser.supported_extensions)
        return extensions
    
    def parse(self, file_path: str) -> RawDocument:
        """파일을 파싱하여 RawDocument 반환"""
        for parser in self._parsers:
            if parser.can_parse(file_path):
                return parser.parse(file_path)
        
        ext = Path(file_path).suffix
        raise ValueError(f"지원하지 않는 파일 형식입니다: {ext}")
    
    def parse_directory(self, dir_path: str, recursive: bool = False) -> List[RawDocument]:
        """디렉토리 내 모든 지원 파일 파싱"""
        documents = []
        path = Path(dir_path)
        
        pattern = "**/*" if recursive else "*"
        
        for file_path in path.glob(pattern):
            if file_path.is_file():
                ext = file_path.suffix.lower().lstrip('.')
                if ext in self.get_supported_extensions():
                    try:
                        doc = self.parse(str(file_path))
                        documents.append(doc)
                        print(f"  ✓ 파싱 완료: {file_path.name}")
                    except Exception as e:
                        print(f"  ✗ 파싱 실패: {file_path.name} - {e}")
        
        return documents

### 1.9 파서 테스트

In [16]:
# 통합 파서 생성
parser = UnifiedFileParser()
print(f"지원 확장자: {parser.get_supported_extensions()}")

지원 확장자: ['pdf', 'docx', 'xlsx', 'xls', 'txt', 'md', 'rst', 'json']


In [17]:
# 테스트 파일 파싱
print("=" * 50)
print("테스트 파일 파싱 결과")
print("=" * 50)

documents = parser.parse_directory(str(TEST_DATA_DIR))

print(f"\n총 {len(documents)}개 파일 파싱 완료")

테스트 파일 파싱 결과
  ✓ 파싱 완료: sample.pdf
  ✓ 파싱 완료: sample.xlsx
  ✓ 파싱 완료: sample.docx
  ✓ 파싱 완료: sample.json
  ✓ 파싱 완료: sample.txt

총 5개 파일 파싱 완료


In [18]:
# 각 문서 상세 정보 출력
for doc in documents:
    print("\n" + "=" * 50)
    print(doc)
    print("\n--- 내용 미리보기 (처음 500자) ---")
    print(doc.content[:500] + "..." if len(doc.content) > 500 else doc.content)


RawDocument(
  file_name='sample.pdf',
  file_type='pdf',
  content_length=1957,
  pages=2,
  sheets=[],
  metadata={'file_name': 'sample.pdf', 'file_type': 'pdf', 'file_size': 3957, 'created_at': '2025-12-11T10:09:32.435347', 'modified_at': '2025-12-09T16:20:56.788867', 'page_count': 2, 'pdf_metadata': {'Author': 'anonymous', 'CreationDate': "D:20251209162056+09'00'", 'Creator': 'ReportLab PDF Library - www.reportlab.com', 'Keywords': '', 'ModDate': "D:20251209162056+09'00'", 'Producer': 'ReportLab PDF Library - www.reportlab.com', 'Subject': 'unspecified', 'Title': 'untitled', 'Trapped': 'False'}}
)

--- 내용 미리보기 (처음 500자) ---
RAG System Technical Report
1. Executive Summary
This document describes the RAG (Retrieval-Augmented Generation) system
architecture and implementation details. The system is designed to enhance
LLM responses by retrieving relevant information from a vector database.
Key Features:
- Multi-format document parsing (PDF, DOCX, XLSX, TXT, JSON)
- Semantic chunking

---
## Section 2: Text Normalization

### 2.1 TextNormalizer 클래스

In [19]:
class TextNormalizer:
    """텍스트 정규화 클래스"""
    
    def __init__(
        self,
        remove_extra_whitespace: bool = True,
        remove_extra_newlines: bool = True,
        remove_special_chars: bool = False,
        lowercase: bool = False,
        min_line_length: int = 0
    ):
        self.remove_extra_whitespace = remove_extra_whitespace
        self.remove_extra_newlines = remove_extra_newlines
        self.remove_special_chars = remove_special_chars
        self.lowercase = lowercase
        self.min_line_length = min_line_length
    
    def normalize(self, text: str) -> str:
        """텍스트 정규화 수행"""
        result = text
        
        # 1. 여러 줄바꿈을 2개로 통일
        if self.remove_extra_newlines:
            result = re.sub(r'\n{3,}', '\n\n', result)
        
        # 2. 여러 공백을 하나로
        if self.remove_extra_whitespace:
            result = re.sub(r'[ \t]+', ' ', result)
            result = re.sub(r' +\n', '\n', result)  # 줄 끝 공백 제거
        
        # 3. 특수 문자 제거 (선택적)
        if self.remove_special_chars:
            # 한글, 영문, 숫자, 기본 구두점만 유지
            result = re.sub(r'[^가-힣a-zA-Z0-9\s.,!?\-:;()\[\]"\']', '', result)
        
        # 4. 소문자 변환 (선택적)
        if self.lowercase:
            result = result.lower()
        
        # 5. 짧은 줄 필터링 (선택적)
        if self.min_line_length > 0:
            lines = result.split('\n')
            lines = [line for line in lines if len(line.strip()) >= self.min_line_length or line.strip() == '']
            result = '\n'.join(lines)
        
        return result.strip()
    
    def normalize_document(self, doc: RawDocument) -> RawDocument:
        """RawDocument의 내용을 정규화"""
        normalized_content = self.normalize(doc.content)
        normalized_pages = [self.normalize(p) for p in doc.pages] if doc.pages else None
        normalized_sheets = {k: self.normalize(v) for k, v in doc.sheets.items()} if doc.sheets else None
        
        return RawDocument(
            content=normalized_content,
            source=doc.source,
            file_type=doc.file_type,
            file_name=doc.file_name,
            metadata={**doc.metadata, "normalized": True},
            pages=normalized_pages,
            sheets=normalized_sheets
        )

### 2.2 정규화 테스트

In [20]:
# 정규화기 생성
normalizer = TextNormalizer(
    remove_extra_whitespace=True,
    remove_extra_newlines=True,
    min_line_length=3
)

# 테스트 텍스트
test_text = """
이것은    테스트   문장입니다.



여러 줄바꿈이     있습니다.

a
b

긴 문장은 유지됩니다.
"""

print("=== 원본 ===")
print(repr(test_text))
print("\n=== 정규화 후 ===")
print(repr(normalizer.normalize(test_text)))

=== 원본 ===
'\n이것은    테스트   문장입니다.\n\n\n\n여러 줄바꿈이     있습니다.\n\na\nb\n\n긴 문장은 유지됩니다.\n'

=== 정규화 후 ===
'이것은 테스트 문장입니다.\n\n여러 줄바꿈이 있습니다.\n\n\n긴 문장은 유지됩니다.'


In [21]:
# 문서 정규화
normalized_docs = [normalizer.normalize_document(doc) for doc in documents]

# 비교
for orig, norm in zip(documents, normalized_docs):
    orig_len = len(orig.content)
    norm_len = len(norm.content)
    reduction = (1 - norm_len / orig_len) * 100 if orig_len > 0 else 0
    print(f"{orig.file_name}: {orig_len:,} → {norm_len:,} ({reduction:.1f}% 감소)")

sample.pdf: 1,957 → 1,957 (0.0% 감소)
sample.xlsx: 1,178 → 1,178 (0.0% 감소)
sample.docx: 878 → 878 (0.0% 감소)
sample.json: 2,111 → 1,731 (18.0% 감소)
sample.txt: 1,307 → 1,306 (0.1% 감소)


---
## Section 3: Semantic Chunking Design

### 3.1 LangChain SemanticChunker 소개

SemanticChunker는 임베딩 유사도를 기반으로 텍스트를 의미 단위로 분할합니다.

**Breakpoint 전략:**
- `percentile`: 상위 N%의 유사도 차이에서 분할 (기본값)
- `standard_deviation`: 평균 + N*표준편차 이상에서 분할
- `interquartile`: IQR 기반 분할
- `gradient`: 유사도 변화율 기반 분할

### 3.2 Chunk 데이터 클래스

In [22]:
@dataclass
class Chunk:
    """청킹된 텍스트 단위"""
    content: str                    # 청크 텍스트
    chunk_id: str                   # 고유 ID
    chunk_index: int                # 문서 내 순서
    doc_id: str                     # 원본 문서 ID
    source: str                     # 원본 파일 경로
    file_name: str                  # 파일 이름
    file_type: str                  # 파일 형식
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    @property
    def char_count(self) -> int:
        return len(self.content)
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            "content": self.content,
            "chunk_id": self.chunk_id,
            "chunk_index": self.chunk_index,
            "doc_id": self.doc_id,
            "source": self.source,
            "file_name": self.file_name,
            "file_type": self.file_type,
            "char_count": self.char_count,
            **self.metadata
        }

### 3.3 ChunkingService 클래스

In [23]:
class ChunkingService:
    """시맨틱 청킹 서비스"""
    
    def __init__(
        self,
        embedding_model: str = "text-embedding-3-small",
        breakpoint_type: str = "percentile",
        breakpoint_threshold: float = 95,
        min_chunk_size: int = 100,
        max_chunk_size: int = 2000
    ):
        self.embedding_model = embedding_model
        self.breakpoint_type = breakpoint_type
        self.breakpoint_threshold = breakpoint_threshold
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
        
        # 임베딩 모델 초기화
        self._embeddings = OpenAIEmbeddings(model=embedding_model)
        
        # SemanticChunker 초기화
        self._chunker = SemanticChunker(
            embeddings=self._embeddings,
            breakpoint_threshold_type=breakpoint_type,
            breakpoint_threshold_amount=breakpoint_threshold
        )
    
    def chunk_text(self, text: str, doc_id: str, source: str, file_name: str, file_type: str) -> List[Chunk]:
        """텍스트를 시맨틱 청킹"""
        # SemanticChunker로 분할
        langchain_docs = self._chunker.create_documents([text])
        
        chunks = []
        for i, lc_doc in enumerate(langchain_docs):
            content = lc_doc.page_content
            
            # 최소 크기 미달 청크 병합은 여기서 처리 가능
            # (현재는 그대로 유지)
            
            chunk = Chunk(
                content=content,
                chunk_id=str(uuid.uuid4()),
                chunk_index=i,
                doc_id=doc_id,
                source=source,
                file_name=file_name,
                file_type=file_type,
                metadata={
                    "chunking_method": "semantic",
                    "breakpoint_type": self.breakpoint_type,
                    "breakpoint_threshold": self.breakpoint_threshold
                }
            )
            chunks.append(chunk)
        
        return chunks
    
    def chunk_document(self, doc: RawDocument) -> List[Chunk]:
        """RawDocument를 시맨틱 청킹"""
        doc_id = str(uuid.uuid4())
        return self.chunk_text(
            text=doc.content,
            doc_id=doc_id,
            source=doc.source,
            file_name=doc.file_name,
            file_type=doc.file_type
        )
    
    def get_chunk_stats(self, chunks: List[Chunk]) -> Dict[str, Any]:
        """청킹 통계 반환"""
        if not chunks:
            return {"count": 0}
        
        sizes = [c.char_count for c in chunks]
        return {
            "count": len(chunks),
            "total_chars": sum(sizes),
            "avg_size": sum(sizes) / len(sizes),
            "min_size": min(sizes),
            "max_size": max(sizes),
            "sizes": sizes
        }

### 3.4 Breakpoint 전략 비교

In [24]:
# 테스트용 긴 텍스트 선택
test_doc = max(normalized_docs, key=lambda d: len(d.content))
print(f"테스트 문서: {test_doc.file_name} ({len(test_doc.content):,} 글자)")

# Percentile 전략 테스트
chunker_percentile = ChunkingService(
    breakpoint_type="percentile",
    breakpoint_threshold=95
)

chunks_percentile = chunker_percentile.chunk_document(test_doc)
stats_percentile = chunker_percentile.get_chunk_stats(chunks_percentile)

print(f"\n=== Percentile (95) ===")
print(f"청크 수: {stats_percentile['count']}")
print(f"평균 크기: {stats_percentile['avg_size']:.0f}자")
print(f"최소/최대: {stats_percentile['min_size']} / {stats_percentile['max_size']}자")

테스트 문서: sample.pdf (1,957 글자)

=== Percentile (95) ===
청크 수: 2
평균 크기: 978자
최소/최대: 862 / 1093자


In [25]:
# 다른 threshold 값 비교
print("\n=== Threshold 비교 ===")
print(f"{'Threshold':<12} {'청크 수':<10} {'평균 크기':<12} {'최소':<8} {'최대':<8}")
print("-" * 50)

for threshold in [90, 92, 95, 97, 99]:
    chunker = ChunkingService(breakpoint_type="percentile", breakpoint_threshold=threshold)
    chunks = chunker.chunk_document(test_doc)
    stats = chunker.get_chunk_stats(chunks)
    print(f"{threshold:<12} {stats['count']:<10} {stats['avg_size']:<12.0f} {stats['min_size']:<8} {stats['max_size']:<8}")


=== Threshold 비교 ===
Threshold    청크 수       평균 크기        최소       최대      
--------------------------------------------------
90           3          651          218      1093    
92           3          651          218      1093    
95           2          978          862      1093    
97           2          978          862      1093    
99           2          978          862      1093    


### 3.5 청킹 품질 검증

In [26]:
# 청크 내용 미리보기
print("=== 청크 내용 미리보기 ===")
for i, chunk in enumerate(chunks_percentile[:5]):
    print(f"\n--- Chunk {i} ({chunk.char_count}자) ---")
    preview = chunk.content[:200] + "..." if len(chunk.content) > 200 else chunk.content
    print(preview)

=== 청크 내용 미리보기 ===

--- Chunk 0 (1093자) ---
RAG System Technical Report
1. Executive Summary
This document describes the RAG (Retrieval-Augmented Generation) system
architecture and implementation details. The system is designed to enhance
LLM ...

--- Chunk 1 (862자) ---
3. Performance Metrics
The system has been evaluated on the following metrics:
+-------------------+----------+--------+
| Metric | Value | Target |
+-------------------+----------+--------+
| Precisi...


In [27]:
# 모든 문서 청킹
print("\n=== 전체 문서 청킹 ===")
all_chunks = []

for doc in normalized_docs:
    chunks = chunker_percentile.chunk_document(doc)
    all_chunks.extend(chunks)
    stats = chunker_percentile.get_chunk_stats(chunks)
    print(f"{doc.file_name}: {stats['count']}개 청크, 평균 {stats['avg_size']:.0f}자")

print(f"\n총 {len(all_chunks)}개 청크 생성")


=== 전체 문서 청킹 ===
sample.pdf: 2개 청크, 평균 978자
sample.xlsx: 1개 청크, 평균 1178자
sample.docx: 2개 청크, 평균 436자
sample.json: 2개 청크, 평균 864자
sample.txt: 2개 청크, 평균 650자

총 9개 청크 생성


---
## Section 4: Metadata Management

### 4.1 Pydantic 메타데이터 모델

In [28]:
class DocumentMetadata(BaseModel):
    """문서 메타데이터 스키마"""
    doc_id: str = Field(description="문서 고유 ID")
    source: str = Field(description="파일 경로")
    file_name: str = Field(description="파일 이름")
    file_type: str = Field(description="파일 형식")
    file_size: int = Field(default=0, description="파일 크기 (bytes)")
    created_at: datetime = Field(default_factory=datetime.now, description="생성 일시")
    page_count: Optional[int] = Field(default=None, description="페이지 수 (PDF, DOCX)")
    sheet_count: Optional[int] = Field(default=None, description="시트 수 (XLSX)")
    
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }


class ChunkMetadata(BaseModel):
    """청크 메타데이터 스키마"""
    chunk_id: str = Field(description="청크 고유 ID")
    doc_id: str = Field(description="원본 문서 ID")
    chunk_index: int = Field(description="문서 내 순서")
    total_chunks: int = Field(description="문서의 전체 청크 수")
    source: str = Field(description="원본 파일 경로")
    file_name: str = Field(description="파일 이름")
    file_type: str = Field(description="파일 형식")
    char_count: int = Field(description="청크 글자 수")
    page_number: Optional[int] = Field(default=None, description="페이지 번호 (PDF)")
    sheet_name: Optional[str] = Field(default=None, description="시트 이름 (XLSX)")
    created_at: datetime = Field(default_factory=datetime.now, description="생성 일시")
    
    class Config:
        json_encoders = {
            datetime: lambda v: v.isoformat()
        }

/tmp/ipykernel_295674/155222958.py:1: PydanticDeprecatedSince20: Support for class-based `config` is deprecated, use ConfigDict instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.12/migration/
  class DocumentMetadata(BaseModel):
/tmp/ipykernel_295674/155222958.py:18: PydanticDeprecatedSince20: Support for class-based `config` is deprecated, use ConfigDict instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.12/migration/
  class ChunkMetadata(BaseModel):


In [29]:
# 메타데이터 생성 테스트
sample_chunk = all_chunks[0]

chunk_meta = ChunkMetadata(
    chunk_id=sample_chunk.chunk_id,
    doc_id=sample_chunk.doc_id,
    chunk_index=sample_chunk.chunk_index,
    total_chunks=len([c for c in all_chunks if c.doc_id == sample_chunk.doc_id]),
    source=sample_chunk.source,
    file_name=sample_chunk.file_name,
    file_type=sample_chunk.file_type,
    char_count=sample_chunk.char_count
)

print("=== ChunkMetadata 예시 ===")
print(chunk_meta.model_dump_json(indent=2))

=== ChunkMetadata 예시 ===
{
  "chunk_id": "847ce983-2e63-4e81-8898-789f3753b122",
  "doc_id": "29e35b99-7915-4450-a642-2b72b571c378",
  "chunk_index": 0,
  "total_chunks": 2,
  "source": "/mnt/data1/work/sm-ai-v2/my-rag-server/dev_v1/test_data/sample.pdf",
  "file_name": "sample.pdf",
  "file_type": "pdf",
  "char_count": 1093,
  "page_number": null,
  "sheet_name": null,
  "created_at": "2025-12-11T16:26:20.000206"
}


---
## Section 5: Weaviate Integration

### 5.1 확장된 컬렉션 스키마

기존 스키마에 메타데이터 속성을 추가합니다.

In [30]:
# Weaviate 설정 (참고용 - 실제 연결은 Section 6에서)
WEAVIATE_SCHEMA = {
    "class": "DocumentChunk",
    "description": "RAG 시스템의 문서 청크",
    "properties": [
        # 기존 속성
        {"name": "content", "dataType": ["text"], "description": "청크 텍스트"},
        
        # 신규 메타데이터 속성
        {"name": "chunk_id", "dataType": ["text"], "description": "청크 고유 ID"},
        {"name": "doc_id", "dataType": ["text"], "description": "문서 고유 ID"},
        {"name": "chunk_index", "dataType": ["int"], "description": "문서 내 순서"},
        {"name": "total_chunks", "dataType": ["int"], "description": "문서의 전체 청크 수"},
        {"name": "source", "dataType": ["text"], "description": "파일 경로"},
        {"name": "file_name", "dataType": ["text"], "description": "파일 이름"},
        {"name": "file_type", "dataType": ["text"], "description": "파일 형식"},
        {"name": "char_count", "dataType": ["int"], "description": "글자 수"},
        {"name": "page_number", "dataType": ["int"], "description": "페이지 번호"},
        {"name": "sheet_name", "dataType": ["text"], "description": "시트 이름"},
        {"name": "created_at", "dataType": ["date"], "description": "생성 일시"},
    ],
    "vectorizer": "none",  # 외부 임베딩 사용
}

print("확장된 Weaviate 스키마:")
for prop in WEAVIATE_SCHEMA["properties"]:
    print(f"  - {prop['name']}: {prop['dataType'][0]}")

확장된 Weaviate 스키마:
  - content: text
  - chunk_id: text
  - doc_id: text
  - chunk_index: int
  - total_chunks: int
  - source: text
  - file_name: text
  - file_type: text
  - char_count: int
  - page_number: int
  - sheet_name: text
  - created_at: date


### 5.2 Weaviate 연결 테스트

In [31]:
# Weaviate 연결 (옵션)
# 실제 Weaviate 서버가 실행 중인 경우에만 실행

WEAVIATE_ENABLED = False  # True로 변경하여 Weaviate 연동 테스트

if WEAVIATE_ENABLED:
    import weaviate
    from weaviate.classes.config import Property, DataType
    
    client = weaviate.connect_to_local(
        host=os.getenv("WEAVIATE_HOST", "localhost"),
        port=int(os.getenv("WEAVIATE_PORT", 8080))
    )
    
    print(f"Weaviate 연결 상태: {client.is_ready()}")
else:
    print("Weaviate 연동 테스트 비활성화 (WEAVIATE_ENABLED = False)")

Weaviate 연동 테스트 비활성화 (WEAVIATE_ENABLED = False)


---
## Section 6: End-to-End Pipeline

### 6.1 PreprocessingPipeline 클래스

In [32]:
@dataclass
class PreprocessingResult:
    """전처리 결과"""
    document: RawDocument           # 원본 문서
    chunks: List[Chunk]             # 청킹 결과
    metadata: DocumentMetadata      # 문서 메타데이터
    stats: Dict[str, Any]           # 통계
    success: bool = True            # 처리 성공 여부
    error: Optional[str] = None     # 에러 메시지


class PreprocessingPipeline:
    """전처리 파이프라인 - 파일 → 청크 → 메타데이터 전체 처리"""
    
    def __init__(
        self,
        chunking_service: ChunkingService,
        normalizer: TextNormalizer,
        file_parser: UnifiedFileParser
    ):
        self.chunking_service = chunking_service
        self.normalizer = normalizer
        self.file_parser = file_parser
    
    def process_file(self, file_path: str) -> PreprocessingResult:
        """단일 파일 전처리"""
        try:
            # 1. 파싱
            raw_doc = self.file_parser.parse(file_path)
            
            # 2. 정규화
            normalized_doc = self.normalizer.normalize_document(raw_doc)
            
            # 3. 청킹
            chunks = self.chunking_service.chunk_document(normalized_doc)
            
            # 4. 청크 메타데이터 보강
            total_chunks = len(chunks)
            for chunk in chunks:
                chunk.metadata["total_chunks"] = total_chunks
            
            # 5. 문서 메타데이터 생성
            doc_metadata = DocumentMetadata(
                doc_id=chunks[0].doc_id if chunks else str(uuid.uuid4()),
                source=normalized_doc.source,
                file_name=normalized_doc.file_name,
                file_type=normalized_doc.file_type,
                file_size=normalized_doc.metadata.get("file_size", 0),
                page_count=normalized_doc.metadata.get("page_count"),
                sheet_count=normalized_doc.metadata.get("sheet_count")
            )
            
            # 6. 통계
            stats = self.chunking_service.get_chunk_stats(chunks)
            stats["original_length"] = len(raw_doc.content)
            stats["normalized_length"] = len(normalized_doc.content)
            
            return PreprocessingResult(
                document=normalized_doc,
                chunks=chunks,
                metadata=doc_metadata,
                stats=stats,
                success=True
            )
            
        except Exception as e:
            return PreprocessingResult(
                document=None,
                chunks=[],
                metadata=None,
                stats={},
                success=False,
                error=str(e)
            )
    
    def process_directory(self, dir_path: str, recursive: bool = False) -> List[PreprocessingResult]:
        """디렉토리 내 모든 파일 전처리"""
        results = []
        path = Path(dir_path)
        pattern = "**/*" if recursive else "*"
        
        supported_exts = self.file_parser.get_supported_extensions()
        
        for file_path in path.glob(pattern):
            if file_path.is_file():
                ext = file_path.suffix.lower().lstrip('.')
                if ext in supported_exts:
                    result = self.process_file(str(file_path))
                    results.append(result)
                    
                    if result.success:
                        print(f"✓ {file_path.name}: {result.stats['count']}개 청크")
                    else:
                        print(f"✗ {file_path.name}: {result.error}")
        
        return results

### 6.2 전체 파이프라인 테스트

In [33]:
# 파이프라인 생성
pipeline = PreprocessingPipeline(
    chunking_service=ChunkingService(
        breakpoint_type="percentile",
        breakpoint_threshold=95
    ),
    normalizer=TextNormalizer(
        remove_extra_whitespace=True,
        remove_extra_newlines=True
    ),
    file_parser=UnifiedFileParser()
)

print("파이프라인 생성 완료!")

파이프라인 생성 완료!


In [34]:
# 전체 테스트 데이터 처리
print("=" * 50)
print("전체 파이프라인 테스트")
print("=" * 50)

results = pipeline.process_directory(str(TEST_DATA_DIR))

print("\n" + "=" * 50)
print("처리 결과 요약")
print("=" * 50)

total_chunks = 0
for result in results:
    if result.success:
        total_chunks += result.stats['count']
        print(f"\n{result.metadata.file_name}:")
        print(f"  - 문서 ID: {result.metadata.doc_id[:8]}...")
        print(f"  - 청크 수: {result.stats['count']}")
        print(f"  - 평균 크기: {result.stats['avg_size']:.0f}자")
        print(f"  - 원본 → 정규화: {result.stats['original_length']:,} → {result.stats['normalized_length']:,}자")

print(f"\n총 청크 수: {total_chunks}")

전체 파이프라인 테스트
✓ sample.pdf: 2개 청크
✓ sample.xlsx: 1개 청크
✓ sample.docx: 2개 청크
✓ sample.json: 2개 청크
✓ sample.txt: 2개 청크

처리 결과 요약

sample.pdf:
  - 문서 ID: 8b3fc43a...
  - 청크 수: 2
  - 평균 크기: 978자
  - 원본 → 정규화: 1,957 → 1,957자

sample.xlsx:
  - 문서 ID: 0a591f2d...
  - 청크 수: 1
  - 평균 크기: 1178자
  - 원본 → 정규화: 1,178 → 1,178자

sample.docx:
  - 문서 ID: 5b0c6bc9...
  - 청크 수: 2
  - 평균 크기: 436자
  - 원본 → 정규화: 878 → 878자

sample.json:
  - 문서 ID: 5879379b...
  - 청크 수: 2
  - 평균 크기: 864자
  - 원본 → 정규화: 2,111 → 1,731자

sample.txt:
  - 문서 ID: 885b84f1...
  - 청크 수: 2
  - 평균 크기: 650자
  - 원본 → 정규화: 1,307 → 1,306자

총 청크 수: 9


### 6.3 결과 확인

In [35]:
# 첫 번째 결과의 청크 상세 정보
if results and results[0].success:
    result = results[0]
    print(f"=== {result.metadata.file_name} 청크 상세 ===")
    
    for i, chunk in enumerate(result.chunks[:3]):
        print(f"\n--- Chunk {i} ---")
        print(f"ID: {chunk.chunk_id[:8]}...")
        print(f"크기: {chunk.char_count}자")
        print(f"내용 미리보기:")
        preview = chunk.content[:150] + "..." if len(chunk.content) > 150 else chunk.content
        print(preview)

=== sample.pdf 청크 상세 ===

--- Chunk 0 ---
ID: 0f9b6f29...
크기: 1093자
내용 미리보기:
RAG System Technical Report
1. Executive Summary
This document describes the RAG (Retrieval-Augmented Generation) system
architecture and implementati...

--- Chunk 1 ---
ID: b187e39d...
크기: 862자
내용 미리보기:
3. Performance Metrics
The system has been evaluated on the following metrics:
+-------------------+----------+--------+
| Metric | Value | Target |
+...


---
## v2 모듈화 계획

노트북 검증 완료 후, 다음 구조로 모듈화:

```
dev_v2/
├── services/
│   ├── chunking.py        # ChunkingService
│   └── file_parser.py     # UnifiedFileParser, 개별 파서
├── schemas/
│   └── preprocessing.py   # RawDocument, Chunk, 메타데이터 모델
├── preprocessing/
│   ├── pipeline.py        # PreprocessingPipeline
│   └── normalizer.py      # TextNormalizer
└── config/
    └── settings.py        # PreprocessingSettings 추가
```

---
## 검증 체크리스트

- [ ] 각 파서가 정상 동작하는가?
- [ ] 텍스트 정규화가 올바른가?
- [ ] Semantic Chunking 품질이 적절한가? (평균 200~500자)
- [ ] 메타데이터가 정확히 추출되는가?
- [ ] End-to-End 파이프라인이 동작하는가?