In [None]:
!pip install python-docx chromadb sentence-transformers "transformers>=4.32.0" "torch>=2.0.0" "accelerate" pymupdf python-pptx

Collecting python-docx
  Downloading python_docx-1.2.0-py3-none-any.whl.metadata (2.0 kB)
Collecting chromadb
  Downloading chromadb-1.0.15-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.0 kB)
Collecting pymupdf
  Downloading pymupdf-1.26.3-cp39-abi3-manylinux_2_28_x86_64.whl.metadata (3.4 kB)
Collecting python-pptx
  Downloading python_pptx-1.0.2-py3-none-any.whl.metadata (2.5 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp311-cp311-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.22.1-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting opentelemetry-api>=1.2.0 (from chromadb)
  Downloading opentelemetry_api-1.36.0-py3-none-any.whl.metadata (1.5 kB)
Collectin

In [None]:
import os
import re
import docx
import chromadb
from chromadb.utils import embedding_functions
import sys
import hashlib
import gc
from typing import List, Dict, Any, Optional, Tuple, Union, Set
import logging
from pathlib import Path
from docx.opc.exceptions import PackageNotFoundError
from collections import defaultdict
from dataclasses import dataclass, field
import json
import threading
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
from datetime import datetime
import pickle
import sqlite3
from contextlib import contextmanager
import uuid
import functools
from queue import Queue
import multiprocessing as mp

# DOCX
from docx.document import Document
from docx.table import Table as DocxTable
from docx.text.paragraph import Paragraph
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl

# PDF
import fitz

# PPTX
from pptx import Presentation
from pptx.table import Table as PptxTable

In [None]:
# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler('/content/legal_processing.log', encoding='utf-8')
    ]
)
logger = logging.getLogger(__name__)

In [None]:
@dataclass
class LegalStructure:
    """법률 구조 정보를 담는 데이터 클래스"""
    structure_type: str
    number: str
    title: str
    level: int
    parent_ref: Optional[str] = None
    cross_refs: List[str] = field(default_factory=list)
    original_chunk_id: Optional[str] = None  # 원본 청크 추적용

In [None]:
@dataclass
class ChunkMetadata:
    """청크 메타데이터 확장"""
    source_file: str
    file_hash: str
    title: str
    structure_info: Dict[str, Any]
    hierarchy_path: List[str]
    parent_structures: List[Dict[str, str]]
    char_count: int
    word_count: int
    chunk_index: int
    is_sub_chunk: bool = False
    sub_index: int = 0
    parent_chunk_id: Optional[str] = None  # 분할된 청크의 원본 추적
    creation_time: datetime = field(default_factory=datetime.now)
    last_updated: datetime = field(default_factory=datetime.now)
    effective_date: Optional[str] = None
    publication_date: Optional[str] = None
    amendment_info: Optional[str] = None

In [None]:
class Config:
    """문서 처리 파이프라인에 대한 구성 설정"""

    @staticmethod
    def get_data_directory() -> str:
        """환경에 따른 데이터 디렉토리 자동 감지"""
        if 'COLAB_RELEASE_TAG' in os.environ:
            colab_path = "/content/drive/MyDrive/pj/test"
            # colab_path = "/content/drive/MyDrive/pj/data"
            if os.path.exists("/content/drive"):
                return colab_path
        return os.path.join(os.getcwd(), "data")

    DOCX_DIRECTORY: str = get_data_directory()
    DB_PATH: str = "/content/chroma_db"
    COLLECTION_NAME: str = "legal_manuals"
    # COLLECTION_NAME: str = "legal_docs"
    METADATA_DB_PATH: str = "/content/legal_metadata.db"

    # 법률 특화 임베딩 모델 설정
    EMBEDDING_MODELS: List[str] = [
        "jhgan/ko-sroberta-multitask",
        # "BAAI/bge-m3",
        # "nlpai-lab/KURE",
        # "intfloat/multilingual-e5-large-instruct",
    ]
    PRIMARY_MODEL: str = "jhgan/ko-sroberta-multitask"
    # LEGAL_SPECIALIZED_MODEL: str = "bongsoo/kpf-bert-base"  # 법률 특화 모델

    METADATA: Dict[str, str] = {"hnsw:space": "cosine"}

    # 개선된 청킹 설정
    MIN_CHUNK_LENGTH: int = 50
    MAX_CHUNK_LENGTH: int = 1500  # 법률 조항 최적화
    OVERLAP_SIZE: int = 200 # 문장 분할 시 앞뒤 문맥을 더 많이 포함하도록 설정
    MAX_SENTENCE_LENGTH: int = 1200
    BATCH_SIZE: int = 100
    MAX_WORD_SPLIT: int = 400

    # 성능 설정
    MAX_WORKERS: int = min(4, (os.cpu_count() or 1))
    PROCESSING_TIMEOUT: int = 300  # 5분
    ENABLE_PARALLEL: bool = True
    CHUNK_CACHE_SIZE: int = 1000

In [None]:
class LegalSentenceSplitter:
    """법률 문서 특화 문장 분할기"""

    def __init__(self):
        # 법률 문서에서 마침표가 문장 끝이 아닌 경우들
        self.abbreviation_patterns = [
            r'\b(?:제|항|호|목|조|장|절|편|부|별표|별지)\s*\d+(?:\s*의\s*\d+)?\s*\.',
            r'\b\d+\.\s*(?=\d)',  # 번호 매김
            r'\b[A-Z]\.',  # 단일 대문자 약어
            r'(?:등|기타|포함|예시|단서|다만|그러나|다른|이외|기준|대상)\.',
            r'(?:법|시행령|시행규칙|고시|훈령|예규)\.',
        ]

        # 실제 문장 끝을 나타내는 패턴
        self.sentence_end_patterns = [
            r'(?<=[다음과같습니다])\.',
            r'(?<=[이다])\.',
            r'(?<=[한다])\.',
            r'(?<=[된다])\.',
            r'(?<=[않는다])\.',
            r'(?<=[있다])\.',
            r'(?<=[없다])\.',
        ]

        self.compiled_abbrev = [re.compile(p) for p in self.abbreviation_patterns]
        self.compiled_sent_end = [re.compile(p) for p in self.sentence_end_patterns]

    def split_sentences(self, text: str) -> List[str]:
        """법률 문서에 특화된 문장 분할"""
        if not text.strip():
            return []

        # 임시 플레이스홀더로 약어의 마침표를 보호
        protected_text = text
        placeholders = {}

        for i, pattern in enumerate(self.compiled_abbrev):
            matches = list(pattern.finditer(protected_text))
            for match in reversed(matches):  # 역순으로 처리하여 인덱스 보존
                placeholder = f"__ABBREV_{i}_{len(placeholders)}__"
                placeholders[placeholder] = match.group()
                protected_text = protected_text[:match.start()] + placeholder + protected_text[match.end():]

        # 문장 분할
        sentences = []
        current_sentence = ""

        # 개선된 문장 분할 로직
        parts = re.split(r'([.!?])\s+', protected_text)

        i = 0
        while i < len(parts):
            current_sentence += parts[i]

            if i + 1 < len(parts) and parts[i + 1] in '.!?':
                current_sentence += parts[i + 1]

                # 실제 문장 끝인지 확인
                if self._is_sentence_end(current_sentence):
                    sentences.append(current_sentence.strip())
                    current_sentence = ""
                else:
                    current_sentence += " "  # 계속 이어짐

                i += 2
            else:
                i += 1

        if current_sentence.strip():
            sentences.append(current_sentence.strip())

        # 플레이스홀더 복원
        restored_sentences = []
        for sentence in sentences:
            restored = sentence
            for placeholder, original in placeholders.items():
                restored = restored.replace(placeholder, original)

            if restored.strip():
                restored_sentences.append(restored.strip())

        return restored_sentences

    def _is_sentence_end(self, text: str) -> bool:
        """실제 문장 끝인지 판단"""
        # 문장 끝 패턴 매칭
        for pattern in self.compiled_sent_end:
            if pattern.search(text):
                return True

        # 추가 휴리스틱
        text = text.strip()
        if len(text) < 10:  # 너무 짧으면 문장이 아님
            return False

        # 법률 조문의 특징적 끝맺음
        legal_endings = ['한다', '이다', '된다', '않는다', '있다', '없다', '같다', '바와 같다']
        for ending in legal_endings:
            if text.endswith(ending + '.'):
                return True

        return False

In [None]:
class FileHashManager:
    """파일 해시 및 메타데이터 관리"""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """메타데이터 DB 초기화"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS file_metadata (
                    file_path TEXT PRIMARY KEY,
                    file_hash TEXT NOT NULL,
                    last_modified REAL NOT NULL,
                    chunk_count INTEGER DEFAULT 0,
                    processing_time REAL DEFAULT 0,
                    last_processed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')

            conn.execute('''
                CREATE TABLE IF NOT EXISTS chunk_metadata (
                    chunk_id TEXT PRIMARY KEY,
                    parent_chunk_id TEXT,
                    file_path TEXT NOT NULL,
                    chunk_index INTEGER NOT NULL,
                    is_sub_chunk BOOLEAN DEFAULT FALSE,
                    creation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (file_path) REFERENCES file_metadata (file_path)
                )
            ''')

    def get_file_hash(self, file_path: str) -> str:
        """파일 해시 계산"""
        hash_sha256 = hashlib.sha256()
        try:
            with open(file_path, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_sha256.update(chunk)
        except Exception as e:
            logger.error(f"파일 해시 계산 실패 {file_path}: {e}")
            # 파일 수정시간과 크기를 기반으로 폴백 해시
            stat = os.stat(file_path)
            fallback_string = f"{file_path}_{stat.st_mtime}_{stat.st_size}"
            return hashlib.sha256(fallback_string.encode()).hexdigest()

        return hash_sha256.hexdigest()

    def should_process_file(self, file_path: str) -> bool:
        """파일 처리 필요 여부 확인"""
        try:
            current_hash = self.get_file_hash(file_path)
            current_mtime = os.path.getmtime(file_path)

            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute(
                    'SELECT file_hash, last_modified FROM file_metadata WHERE file_path = ?',
                    (file_path,)
                )
                result = cursor.fetchone()

                if result is None:
                    return True  # 새 파일

                stored_hash, stored_mtime = result
                return current_hash != stored_hash or abs(current_mtime - stored_mtime) > 1

        except Exception as e:
            logger.error(f"파일 처리 필요성 확인 실패 {file_path}: {e}")
            return True  # 오류 시 안전하게 처리

    def update_file_metadata(self, file_path: str, chunk_count: int, processing_time: float):
        """파일 메타데이터 업데이트"""
        try:
            file_hash = self.get_file_hash(file_path)
            mtime = os.path.getmtime(file_path)

            with sqlite3.connect(self.db_path) as conn:
                conn.execute('''
                    INSERT OR REPLACE INTO file_metadata
                    (file_path, file_hash, last_modified, chunk_count, processing_time)
                    VALUES (?, ?, ?, ?, ?)
                ''', (file_path, file_hash, mtime, chunk_count, processing_time))

        except Exception as e:
            logger.error(f"파일 메타데이터 업데이트 실패 {file_path}: {e}")

    def register_chunks(self, file_path: str, chunk_ids: List[str], parent_chunk_map: Dict[str, str]):
        """청크 메타데이터 등록"""
        try:
            with sqlite3.connect(self.db_path) as conn:
                # 기존 청크 삭제
                conn.execute('DELETE FROM chunk_metadata WHERE file_path = ?', (file_path,))

                # 새 청크 등록
                for i, chunk_id in enumerate(chunk_ids):
                    parent_id = parent_chunk_map.get(chunk_id)
                    is_sub_chunk = parent_id is not None

                    conn.execute('''
                        INSERT INTO chunk_metadata
                        (chunk_id, parent_chunk_id, file_path, chunk_index, is_sub_chunk)
                        VALUES (?, ?, ?, ?, ?)
                    ''', (chunk_id, parent_id, file_path, i, is_sub_chunk))

        except Exception as e:
            logger.error(f"청크 메타데이터 등록 실패 {file_path}: {e}")

In [None]:
class EnhancedLegalParser:
    """개선된 법률 문서 구조 파서"""

    def __init__(self):
        # 기존 패턴 유지하되 개선
        self.patterns = {
            'chapter': re.compile(r"^\s*제\s*([0-9]+(?:의[0-9]+)?)\s*장\s*(.*)$", re.MULTILINE),
            'section': re.compile(r"^\s*제\s*([0-9]+(?:의[0-9]+)?)\s*절\s*(.*)$", re.MULTILINE),
            'article': re.compile(r"^\s*제\s*([0-9]+(?:의[0-9]+)?)\s*조\s*(?:\(([^)]+)\))?\s*(.*)$", re.MULTILINE),
            'paragraph': re.compile(r"^\s*(①|②|③|④|⑤|⑥|⑦|⑧|⑨|⑩|⑪|⑫|⑬|⑭|⑮|⑯|⑰|⑱|⑲|⑳)\s*(.*)$", re.MULTILINE),
            'item': re.compile(r"^\s*([0-9]+)\.\s*(.*)$", re.MULTILINE),
            'subitem': re.compile(r"^\s*([가-힣])\.\s*(.*)$", re.MULTILINE),
            'appendix': re.compile(r"^\s*부\s*칙\s*(.*)$", re.MULTILINE),
            'attachment': re.compile(r"^\s*(\[별표\s*[0-9]+.*?\]|별지\s*제[0-9]+호서식)\s*(.*)$", re.MULTILINE),
        }

        self.cross_ref_patterns = {
            'article_ref': re.compile(r"제\s*([0-9]+(?:의[0-9]+)?)\s*조"),
            'paragraph_ref': re.compile(r"제\s*([0-9]+)\s*항"),
            'item_ref': re.compile(r"제\s*([0-9]+)\s*호"),
            'law_ref': re.compile(r"「([^」]+)」"),
        }

        self.paragraph_map = {
            '①': '1', '②': '2', '③': '3', '④': '4', '⑤': '5',
            '⑥': '6', '⑦': '7', '⑧': '8', '⑨': '9', '⑩': '10',
            '⑪': '11', '⑫': '12', '⑬': '13', '⑭': '14', '⑮': '15',
            '⑯': '16', '⑰': '17', '⑱': '18', '⑲': '19', '⑳': '20'
        }

        # 시간 정보 추출을 위한 정규식
        self.temporal_patterns = {
            # '[시행 2025. 4. 22.]' 형식과 '시행일: 2025. 4. 22.' 형식 모두 처리
            "effective_date": re.compile(r"\[?\s*시행(?:일)?\s*:?\s*(\d{4}\.\s*\d{1,2}\.\s*\d{1,2})\.?\s*\]?"),

            # '[법률 제...호, 2025. 4. 22., ...]' 형식에서 공포일 추출
            "publication_date": re.compile(r"법률\s*제[0-9]+호\s*,\s*(\d{4}\.\s*\d{1,2}\.\s*\d{1,2})\.?"),

            # 법률 번호, 날짜, 개정 종류를 포함하여 추출
            "amendment_info": re.compile(r"(법률\s*제[0-9]+호,\s*\d{4}\.\s*\d{1,2}\.\s*\d{1,2}\.?,\s*.*?개정)"),
        }

    def parse_structure(self, text: str) -> Optional[LegalStructure]:
        """텍스트에서 법률 구조 정보를 추출"""
        text = text.strip()
        if not text:
            return None

        for structure_type, pattern in self.patterns.items():
            match = pattern.match(text)
            if match:
                return self._create_structure(structure_type, match, text)

        return None

    def _create_structure(self, structure_type: str, match, original_text: str) -> LegalStructure:
        """매치 결과로부터 LegalStructure 객체 생성"""
        level_map = {
            'chapter': 1, 'section': 2, 'article': 3,
            'paragraph': 4, 'item': 5, 'subitem': 6,
            'appendix': 1, 'attachment': 2
        }

        if structure_type == 'paragraph':
            number = self.paragraph_map.get(match.group(1), match.group(1))
            title = match.group(2).strip() if match.group(2) else ""
        elif structure_type in ['chapter', 'section', 'article']:
            number = match.group(1)
            if structure_type == 'article' and match.lastindex >= 3:
                subtitle = match.group(2) if match.group(2) else ""
                content = match.group(3) if match.group(3) else ""
                title = f"{subtitle} {content}".strip() if subtitle else content.strip()
            else:
                title = match.group(2).strip() if match.lastindex >= 2 else ""
        elif structure_type in ['item', 'subitem']:
            number = match.group(1)
            title = match.group(2).strip() if match.group(2) else ""
        else:
            number = "1"
            title = match.group(1).strip() if match.group(1) else ""

        cross_refs = self._extract_cross_references(original_text)

        return LegalStructure(
            structure_type=structure_type,
            number=number,
            title=title,
            level=level_map.get(structure_type, 7),
            cross_refs=cross_refs
        )

    def _extract_cross_references(self, text: str) -> List[str]:
        """텍스트에서 상호 참조 추출"""
        refs = []
        for ref_type, pattern in self.cross_ref_patterns.items():
            matches = pattern.findall(text)
            for match in matches:
                if ref_type == 'law_ref':
                    refs.append(f"법률:{match}")
                elif ref_type == 'article_ref':
                    refs.append(f"조:{match}")
                elif ref_type == 'paragraph_ref':
                    refs.append(f"항:{match}")
                elif ref_type == 'item_ref':
                    refs.append(f"호:{match}")
        return list(set(refs))

    def extract_temporal_info(self, text: str) -> Dict[str, Optional[str]]:
        """텍스트에서 시간 관련 메타데이터 추출"""
        info = {
            "effective_date": None,
            "publication_date": None,
            "amendment_info": None,
        }
        for key, pattern in self.temporal_patterns.items():
            match = pattern.search(text)
            if match:
                info[key] = match.group(1).strip()
        return info

In [None]:
class EnhancedDocumentProcessor:
    """개선된 문서 처리기"""

    def __init__(self, file_path: str, hash_manager: FileHashManager):
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"지정된 파일을 찾을 수 없습니다: {file_path}")

        self.file_path = file_path
        self.file_name = os.path.basename(file_path)
        self.parser = EnhancedLegalParser()
        self.sentence_splitter = LegalSentenceSplitter()
        self.hash_manager = hash_manager
        self.structure_hierarchy = []
        self.temporal_metadata = {}

        self.chunk_id_counter = 0
        self.parent_chunk_map = {}

    # 테이블을 Markdown으로 변환하는 헬퍼 함수
    def _convert_docx_table_to_markdown(self, table: DocxTable) -> str:
        """Docx 테이블을 Markdown 형식으로 변환"""
        markdown_text = "\n\n| "
        try:
            header_cells = [cell.text.strip().replace('\n', ' ') for cell in table.rows[0].cells]
            markdown_text += " | ".join(header_cells) + " |\n"
            markdown_text += "| " + " | ".join(["---"] * len(header_cells)) + " |\n"

            for row in table.rows[1:]:
                row_cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
                markdown_text += "| " + " | ".join(row_cells) + " |\n"
        except IndexError:
            logger.warning(f"'{self.file_name}'에서 비정상적인 테이블 구조 발견, 건너뜁니다.")
            return ""
        return markdown_text.strip() + "\n\n"

    def _convert_pptx_table_to_markdown(self, table: PptxTable) -> str:
        """PPTX 테이블을 Markdown 형식으로 변환 (수정된 버전)"""
        markdown_text = "\n\n| "
        try:
            # 테이블의 컬럼 수를 기준으로 순회하도록 수정
            num_cols = len(table.columns)
            header_cells = [table.cell(0, c).text.strip().replace('\n', ' ') for c in range(num_cols)]
            markdown_text += " | ".join(header_cells) + " |\n"
            markdown_text += "| " + " | ".join(["---"] * len(header_cells)) + " |\n"

            # row와 col 인덱스를 사용하여 셀 텍스트에 접근
            for r in range(1, len(table.rows)):
                row_cells = [table.cell(r, c).text.strip().replace('\n', ' ') for c in range(num_cols)]
                markdown_text += "| " + " | ".join(row_cells) + " |\n"
        except Exception as e:
            logger.warning(f"'{self.file_name}'의 PPTX 테이블 변환 중 오류: {e}")
            return ""
        return markdown_text.strip() + "\n\n"

    def _extract_initial_metadata_from_text(self, text: str):
        """일반 텍스트에서 시간 메타데이터 추출"""
        # 텍스트의 앞부분 30줄을 검사
        initial_lines = text.split('\n')[:30]
        temp_info = {k: None for k in self.parser.temporal_patterns.keys()}

        for line in initial_lines:
            if all(temp_info.values()): break
            extracted = self.parser.extract_temporal_info(line)
            for key, value in extracted.items():
                if not temp_info.get(key) and value:
                    temp_info[key] = value
        self.temporal_metadata = temp_info
    def _get_text_blocks(self) -> List[str]:
        """파일 형식에 따라 텍스트 블록 리스트를 추출"""
        file_ext = Path(self.file_path).suffix.lower()
        blocks = []

        if file_ext == '.docx':
            document = docx.Document(self.file_path)
            # DOCX는 자체 단락 구조에서 메타데이터를 찾는 것이 더 정확
            initial_paragraphs = [p.text for p in document.paragraphs[:15]]
            self._extract_initial_metadata_from_text("\n".join(initial_paragraphs))
            for element in document.element.body:
                if isinstance(element, CT_P):
                    blocks.append(Paragraph(element, document).text.strip())
                elif isinstance(element, CT_Tbl):
                    blocks.append(self._convert_docx_table_to_markdown(DocxTable(element, document)))

        elif file_ext == '.pdf':
            full_text = ""
            with fitz.open(self.file_path) as doc:
                for page in doc:
                    full_text += page.get_text(sort=True) + "\n"
            self._extract_initial_metadata_from_text(full_text)
            blocks = full_text.split('\n')

        elif file_ext == '.pptx':
            prs = Presentation(self.file_path)
            full_text_for_metadata = ""
            for slide in prs.slides:
                for shape in slide.shapes:
                    if shape.has_table:
                        table_text = self._convert_pptx_table_to_markdown(shape.table)
                        blocks.append(table_text)
                        full_text_for_metadata += table_text + "\n"
                    elif shape.has_text_frame:
                        text = shape.text_frame.text.strip()
                        blocks.append(text)
                        full_text_for_metadata += text + "\n"
            self._extract_initial_metadata_from_text(full_text_for_metadata)

        else:
            logger.warning(f"지원하지 않는 파일 형식입니다: {self.file_name}")

        return blocks


    def extract_structured_chunks(self) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
        """모든 파일 형식에 대해 구조화된 텍스트 청크를 추출하는 범용 메서드"""
        try:
            text_blocks = self._get_text_blocks()
            if not text_blocks:
                logger.warning(f"'{self.file_name}'에서 텍스트를 추출하지 못했습니다.")
                return [], {}
        except Exception as e:
            logger.error(f"'{self.file_name}' 파일 읽기 또는 파싱 실패: {e}", exc_info=True)
            return [], {}

        chunks = []
        current_chunk = None
        file_hash = self.hash_manager.get_file_hash(self.file_path)
        print(f"    -> '{self.file_name}': 청크 추출 및 분석 시작...")

        for text in text_blocks:
            text = text.strip()
            if not text:
                continue

            structure = self.parser.parse_structure(text)
            if structure:
                if current_chunk and current_chunk['content']:
                    chunks.append(self._finalize_chunk(current_chunk, file_hash))

                self._update_hierarchy(structure)
                chunk_id = self._generate_chunk_id()
                structure.original_chunk_id = chunk_id
                current_chunk = {
                    'chunk_id': chunk_id, 'structure': structure, 'content': [text],
                    'hierarchy_path': self._get_hierarchy_path(),
                    'parent_structures': self._get_parent_structures()
                }
            else:
                if not current_chunk: # 문서 시작 부분
                    chunk_id = self._generate_chunk_id()
                    current_chunk = {
                        'chunk_id': chunk_id,
                        'structure': LegalStructure('preamble', '0', '서문', 0, original_chunk_id=chunk_id),
                        'content': [text], 'hierarchy_path': [], 'parent_structures': []
                    }
                else:
                    current_chunk['content'].append(text)

        if current_chunk and current_chunk['content']:
            chunks.append(self._finalize_chunk(current_chunk, file_hash))

        processed_chunks = self._post_process_chunks_advanced(chunks)
        logger.info(f"'{self.file_name}'에서 {len(processed_chunks)}개의 청크를 추출했습니다.")
        gc.collect()
        print(f"    -> '{self.file_name}': 청크 {len(processed_chunks)}개 추출 완료.")
        return processed_chunks, self.parent_chunk_map

    def _generate_chunk_id(self) -> str:
        """고유 청크 ID 생성"""
        self.chunk_id_counter += 1
        timestamp = str(int(time.time()))
        return f"{self.file_name}_{self.chunk_id_counter:04d}_{timestamp}"

    def _update_hierarchy(self, structure: LegalStructure):
        """계층 구조 업데이트"""
        self.structure_hierarchy = [
            s for s in self.structure_hierarchy if s.level < structure.level
        ]

        if self.structure_hierarchy:
            structure.parent_ref = f"{self.structure_hierarchy[-1].structure_type}:{self.structure_hierarchy[-1].number}"

        self.structure_hierarchy.append(structure)

    def _get_hierarchy_path(self) -> List[str]:
        """현재 계층 경로 반환"""
        return [f"{s.structure_type}:{s.number}" for s in self.structure_hierarchy]

    def _get_parent_structures(self) -> List[Dict[str, str]]:
        """상위 구조 정보 반환"""
        return [
            {
                'type': s.structure_type,
                'number': s.number,
                'title': s.title,
                'level': s.level
            }
            for s in self.structure_hierarchy[:-1]
        ]

    def _finalize_chunk(self, chunk: Dict[str, Any], file_hash: str) -> Dict[str, Any]:
        """청크를 최종 형태로 변환"""
        content = "\n".join(chunk['content'])
        structure = chunk['structure']

        # 청크의 제목을 생성
        chunk_title = f"{structure.structure_type}:{structure.number} {structure.title}".strip()

        # 청크 내용 맨 앞에 제목을 붙여 문맥 정보를 강화합니다.
        final_content = f"[{chunk_title}]\\n{content}"

        final_chunk = {
            'chunk_id': chunk['chunk_id'],
            'title': chunk_title,
            'content': final_content,
            'structure_info': {
                'type': structure.structure_type,
                'number': structure.number,
                'level': structure.level,
                'parent_ref': structure.parent_ref,
                'cross_refs': structure.cross_refs,
                'original_chunk_id': structure.original_chunk_id
            },
            'hierarchy_path': chunk['hierarchy_path'],
            'parent_structures': chunk['parent_structures'],
            'char_count': len(content),
            'word_count': len(content.split()),
            'file_hash': file_hash
        }

        # 추출된 시간 메타데이터를 청크에 추가
        final_chunk.update(self.temporal_metadata)

        return final_chunk

    def _post_process_chunks_advanced(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """향상된 청크 후처리"""
        processed = []
        config = Config()

        for chunk in chunks:
            if chunk['char_count'] < config.MIN_CHUNK_LENGTH:
                # if chunk['structure_info']['type'] in ['article', 'chapter', 'section']:
                #     processed.append(chunk)
                continue

            if chunk['char_count'] > config.MAX_CHUNK_LENGTH:
                # 청킹 시 문맥 유지를 위해 오버랩 개념을 명시적으로 고려할 수 있음
                # 현재는 문장 단위 분할이 우선되지만, 향후 슬라이딩 윈도우 적용 시 이 부분 수정
                split_chunks = self._split_chunk_advanced(chunk, config.MAX_CHUNK_LENGTH)
                processed.extend(split_chunks)
            else:
                processed.append(chunk)

        return processed

    def _split_chunk_advanced(self, chunk: Dict[str, Any], max_length: int) -> List[Dict[str, Any]]:
        """향상된 청크 분할"""
        content = chunk['content']
        structure_info = chunk['structure_info']

        # 법률 구조별 분할 전략
        if structure_info['type'] == 'article':
            return self._split_by_paragraphs_advanced(chunk, max_length)
        elif structure_info['type'] == 'paragraph':
            return self._split_by_items_advanced(chunk, max_length)
        else:
            return self._split_by_sentences_advanced(chunk, max_length)

    def _split_by_sentences_advanced(self, chunk: Dict[str, Any], max_length: int) -> List[Dict[str, Any]]:
        """향상된 문장 단위 분할"""
        content = chunk['content']
        sentences = self.sentence_splitter.split_sentences(content)

        if len(sentences) <= 1:
            # 단일 문장이 너무 긴 경우 - 의미 단위로 분할
            return self._split_single_long_sentence(chunk, max_length)

        split_chunks = []
        current_content = []
        current_length = 0

        # 문장 윈도우 전략과 유사한 접근
        # N개의 문장을 하나의 청크로 구성하고, 길이가 초과되면 분할
        for i, sentence in enumerate(sentences):
            sentence_length = len(sentence)

            if current_length + sentence_length > max_length and current_content:
                # 현재 청크 저장
                sub_chunk_text = ' '.join(current_content)
                sub_chunk = self._create_sub_chunk(chunk, sub_chunk_text, len(split_chunks) + 1)
                split_chunks.append(sub_chunk)

                # 슬라이딩 윈도우(오버랩) 구현
                # 이전 청크의 마지막 문장을 현재 청크의 시작 부분에 포함
                overlap_sentences = [s for s in current_content[-2:] if len(' '.join(current_content[-2:])) < Config.OVERLAP_SIZE]
                current_content = overlap_sentences + [sentence]
                current_length = len(' '.join(current_content))
            else:
                current_content.append(sentence)
                current_length += sentence_length

        if current_content:
            sub_chunk = self._create_sub_chunk(chunk, ' '.join(current_content), len(split_chunks) + 1)
            split_chunks.append(sub_chunk)

        return split_chunks if split_chunks else [chunk]

    def _split_single_long_sentence(self, chunk: Dict[str, Any], max_length: int) -> List[Dict[str, Any]]:
        """단일 긴 문장을 의미 단위로 분할"""
        content = chunk['content']

        # 의미 기반 청킹을 위한 분할점 (정규식 기반)
        split_points = [
            r'(?<=다만)\s*,?\s*',    # 단서 조항
            r'(?<=그러나)\s*,?\s*',  # 예외 조항
            r'(?<=다음과\s같다)\s*:?\s*',  # 열거 시작
            r'(다음\\s각\\s호(?:의 어느 하나에 해당하는 경우)?\\s*:?\\s*)',  # 각호 시작
            r'(?<=이\s경우)\s*,?\s*',     # 조건 시작
            r'(?<=[.\)\]])\s+(?=한편|또한|그리고)', # 접속 부사
            r';\s+', # 세미콜론은 문맥상 중요한 분리점일 수 있음
        ]

        # 의미 단위로 분할 시도
        parts = [content]
        for pattern in split_points:
            new_parts = []
            for part in parts:
                if len(part) > max_length:
                    # re.split은 구분자도 결과에 포함시키기 위해 괄호로 묶음
                    split_result = re.split(f'({pattern})', part)
                    # 분리된 텍스트와 구분자를 다시 합쳐서 의미 유지
                    temp_part = ""
                    for j in range(0, len(split_result), 2):
                        segment = split_result[j]
                        delimiter = split_result[j+1] if j+1 < len(split_result) else ""
                        if len(temp_part) + len(segment) + len(delimiter) > max_length and temp_part:
                             new_parts.append(temp_part)
                             temp_part = segment + delimiter
                        else:
                             temp_part += segment + delimiter
                    if temp_part:
                        new_parts.append(temp_part)
                else:
                    new_parts.append(part)
            parts = new_parts

        # 여전히 긴 부분이 있으면 단어 단위로 분할
        final_parts = []
        for part in parts:
            if len(part) > max_length:
                words = part.split()
                current_part = []
                current_length = 0

                for word in words:
                    word_length = len(word) + 1  # 공백 포함
                    if current_length + word_length > max_length and current_part:
                        final_parts.append(' '.join(current_part))
                        current_part = [word]
                        current_length = word_length
                    else:
                        current_part.append(word)
                        current_length += word_length

                if current_part:
                    final_parts.append(' '.join(current_part))
            else:
                final_parts.append(part)

        # 청크 생성
        split_chunks = []
        for i, part in enumerate(final_parts):
            if part.strip():
                sub_chunk = self._create_sub_chunk(chunk, part.strip(), i + 1)
                split_chunks.append(sub_chunk)

        return split_chunks if split_chunks else [chunk]

    def _split_by_paragraphs_advanced(self, chunk: Dict[str, Any], max_length: int) -> List[Dict[str, Any]]:
        """향상된 항 단위 분할"""
        content = chunk['content']
        paragraph_pattern = re.compile(r'(①|②|③|④|⑤|⑥|⑦|⑧|⑨|⑩|⑪|⑫|⑬|⑭|⑮|⑯|⑰|⑱|⑲|⑳)')

        paragraphs = paragraph_pattern.split(content)
        if len(paragraphs) <= 1:
            return self._split_by_sentences_advanced(chunk, max_length)

        split_chunks = []
        current_content = paragraphs[0] if paragraphs[0].strip() else ""

        i = 1
        while i < len(paragraphs):
            if i + 1 < len(paragraphs):
                paragraph_marker = paragraphs[i]
                paragraph_content = paragraphs[i + 1]
                paragraph_text = paragraph_marker + paragraph_content

                if len(current_content) + len(paragraph_text) > max_length and current_content.strip():
                    sub_chunk = self._create_sub_chunk(chunk, current_content, len(split_chunks) + 1)
                    split_chunks.append(sub_chunk)
                    current_content = paragraph_text
                else:
                    current_content += paragraph_text

                i += 2
            else:
                current_content += paragraphs[i]
                i += 1

        if current_content.strip():
            sub_chunk = self._create_sub_chunk(chunk, current_content, len(split_chunks) + 1)
            split_chunks.append(sub_chunk)

        return split_chunks if split_chunks else [chunk]

    def _split_by_items_advanced(self, chunk: Dict[str, Any], max_length: int) -> List[Dict[str, Any]]:
        """향상된 호 단위 분할"""
        content = chunk['content']
        item_pattern = re.compile(r'(\d+\.)')

        items = item_pattern.split(content)
        if len(items) <= 1:
            return self._split_by_sentences_advanced(chunk, max_length)

        split_chunks = []
        current_content = items[0] if items[0].strip() else ""

        i = 1
        while i < len(items):
            if i + 1 < len(items):
                item_marker = items[i]
                item_content = items[i + 1]
                item_text = item_marker + item_content

                if len(current_content) + len(item_text) > max_length and current_content.strip():
                    sub_chunk = self._create_sub_chunk(chunk, current_content, len(split_chunks) + 1)
                    split_chunks.append(sub_chunk)
                    current_content = item_text
                else:
                    current_content += item_text

                i += 2
            else:
                current_content += items[i]
                i += 1

        if current_content.strip():
            sub_chunk = self._create_sub_chunk(chunk, current_content, len(split_chunks) + 1)
            split_chunks.append(sub_chunk)

        return split_chunks if split_chunks else [chunk]

    def _create_sub_chunk(self, original_chunk: Dict[str, Any], content: str, sub_index: int) -> Dict[str, Any]:
        """향상된 하위 청크 생성"""
        sub_chunk_id = f"{original_chunk['chunk_id']}_sub_{sub_index}"
        parent_chunk_id = original_chunk['chunk_id']

        # 부모-자식 관계 추적
        self.parent_chunk_map[sub_chunk_id] = parent_chunk_id

        new_chunk = original_chunk.copy()

        contextual_title = original_chunk.get('title', '관련 조항')
        final_content = f"[{contextual_title} (부분)]\\n{content}"

        new_chunk.update({
            'chunk_id': sub_chunk_id,
            'title': f"{original_chunk['title']} (부분 {sub_index})",
            'content': final_content,
            'char_count': len(content),
            'word_count': len(content.split()),
            'is_sub_chunk': True,
            'sub_index': sub_index,
            'parent_chunk_id': parent_chunk_id
        })
        return new_chunk

In [None]:
class EnhancedVectorDBManager:
    """개선된 벡터 DB 관리자"""

    def __init__(self, config: Config):
        self.config = config
        self.hash_manager = FileHashManager(config.METADATA_DB_PATH)

        logger.info("개선된 임베딩 모델을 로드하는 중입니다...")
        try:
            # 법률 특화 모델 우선 시도
            self.primary_embedding_func = self._load_best_embedding_model(config)

            self.client = chromadb.PersistentClient(path=config.DB_PATH)
            self._initialize_collection_safe(config.COLLECTION_NAME, config.METADATA)

            logger.info("개선된 Vector DB 초기화가 완료되었습니다.")

        except Exception as e:
            logger.error(f"Vector DB 초기화 실패: {e}")
            raise

    def _load_best_embedding_model(self, config: Config):
        logger.info("최적 임베딩 모델 선택을 시작합니다. (현재: 가용성 기반 선택)")
        logger.info("권장 사항: 프로덕션 환경에서는 Q&A 평가셋 기반의 실증적 성능 측정(MRR, Hit Rate)을 통해 모델을 선택해야 합니다.")

        models_to_try = config.EMBEDDING_MODELS
        #  models_to_try = [config.LEGAL_SPECIALIZED_MODEL] + config.EMBEDDING_MODELS

        for model in models_to_try:
            try:
                embedding_func = embedding_functions.SentenceTransformerEmbeddingFunction(
                    model_name=model
                )
                # 테스트 임베딩으로 모델 검증
                embedding_func(["테스트"])
                logger.info(f"임베딩 모델 로드 성공: {model}")
                return embedding_func
            except Exception as e:
                logger.warning(f"모델 {model} 로드 실패: {e}")
                continue

        raise RuntimeError("사용 가능한 임베딩 모델이 없습니다.")

    def _initialize_collection_safe(self, collection_name: str, metadata: Dict[str, str]):
        """안전한 컬렉션 초기화 - 데이터 보존"""
        try:
            existing_collections = [col.name for col in self.client.list_collections()]

            if collection_name in existing_collections:
                # 기존 컬렉션 사용
                self.collection = self.client.get_collection(
                    name=collection_name,
                    embedding_function=self.primary_embedding_func
                )
                logger.info(f"기존 컬렉션 '{collection_name}'을 사용합니다.")
            else:
                # 새 컬렉션 생성
                self.collection = self.client.create_collection(
                    name=collection_name,
                    embedding_function=self.primary_embedding_func,
                    metadata=metadata
                )
                logger.info(f"새 컬렉션 '{collection_name}'을 생성했습니다.")

        except Exception as e:
            logger.error(f"컬렉션 초기화 실패: {e}")
            raise

    def should_process_file(self, file_path: str) -> bool:
        """파일 처리 필요 여부 확인"""
        return self.hash_manager.should_process_file(file_path)

    def upsert_chunks_incremental(self, file_name: str, chunks: List[Dict[str, Any]],
                                 parent_chunk_map: Dict[str, str]):
        """증분 청크 업서트 - 중복 방지"""
        if not chunks:
            logger.warning(f"{file_name}: 처리할 청크가 없습니다.")
            return

        logger.info(f"{file_name}: {len(chunks)}개 청크 증분 처리 시작")

        print(f"    -> '{file_name}': 청크 {len(chunks)}개 DB 저장 시작...")


        # 기존 청크 확인 및 삭제
        self._remove_existing_chunks(file_name)

        # 새 청크 추가
        successful_batches = 0
        total_batches = (len(chunks) + self.config.BATCH_SIZE - 1) // self.config.BATCH_SIZE
        chunk_ids = []

        for batch_idx in range(0, len(chunks), self.config.BATCH_SIZE):
            batch_chunks = chunks[batch_idx:batch_idx + self.config.BATCH_SIZE]

            batch_num = (batch_idx // self.config.BATCH_SIZE) + 1
            total_batches = (len(chunks) + self.config.BATCH_SIZE - 1) // self.config.BATCH_SIZE
            print(f"        -> 배치 {batch_num}/{total_batches} 저장 중...")

            try:
                ids, documents, metadatas = self._prepare_enhanced_batch_data(
                    file_name, batch_chunks, batch_idx
                )

                self.collection.upsert(ids=ids, documents=documents, metadatas=metadatas)
                chunk_ids.extend(ids)
                successful_batches += 1

                if successful_batches % 5 == 0:
                    logger.info(f"  -> {successful_batches}/{total_batches} 배치 완료")

            except Exception as e:
                logger.error(f"  -> 배치 {batch_idx//self.config.BATCH_SIZE + 1} 저장 실패: {e}")
                continue

        # 메타데이터 업데이트
        file_path = os.path.join(self.config.DOCX_DIRECTORY, file_name)
        self.hash_manager.update_file_metadata(file_path, len(chunks), 0)
        self.hash_manager.register_chunks(file_path, chunk_ids, parent_chunk_map)

        print(f"  -> {file_name}: {successful_batches}/{total_batches} 배치 성공적으로 저장")
        logger.info(f"  -> {file_name}: {successful_batches}/{total_batches} 배치 성공적으로 저장")

    def _remove_existing_chunks(self, file_name: str):
        """기존 파일의 청크 제거"""
        try:
            # 파일의 기존 청크 ID 조회
            existing_results = self.collection.get(
                where={"source_file": {"$eq": file_name}},
                include=["metadatas"]
            )

            if existing_results['ids']:
                # 기존 청크 삭제
                self.collection.delete(ids=existing_results['ids'])
                logger.info(f"{file_name}: {len(existing_results['ids'])}개 기존 청크 삭제")

        except Exception as e:
            logger.warning(f"{file_name}: 기존 청크 삭제 중 오류 (무시됨): {e}")

    def _prepare_enhanced_batch_data(self, file_name: str, chunks: List[Dict[str, Any]],
                                   batch_start_idx: int) -> Tuple[List[str], List[str], List[Dict[str, Any]]]:
        """향상된 배치 데이터 준비"""
        ids, documents, metadatas = [], [], []

        for i, chunk in enumerate(chunks):
            chunk_index = batch_start_idx + i

            # 개선된 ID 생성 - 청크 고유 ID 사용
            unique_id = chunk.get('chunk_id', f"{file_name}_{chunk_index:04d}_{int(time.time())}")

            ids.append(unique_id)
            documents.append(chunk['content'])

            # 향상된 메타데이터
            structure_info = chunk.get('structure_info', {})

            effective_timestamp = 0
            publication_timestamp = 0
            try:
                if chunk.get('effective_date'):
                    dt_obj = datetime.strptime(chunk['effective_date'].replace(' ', '').strip('.'), '%Y.%m.%d')
                    effective_timestamp = int(dt_obj.timestamp())
                if chunk.get('publication_date'):
                    dt_obj = datetime.strptime(chunk['publication_date'].replace(' ', '').strip('.'), '%Y.%m.%d')
                    publication_timestamp = int(dt_obj.timestamp())
            except (ValueError, TypeError):
                pass # 날짜 변환 실패 시 0으로 유지

            metadata = {
                "source_file": file_name,
                "file_hash": chunk.get('file_hash', ''),
                "chunk_id": unique_id,
                "article_title": chunk.get('title', 'Unknown')[:200],
                "level": max(0, min(10, structure_info.get('level', 4))),
                "char_count": max(0, chunk.get('char_count', 0)),
                "word_count": max(0, chunk.get('word_count', 0)),
                "chunk_index": chunk_index,

                # 법률 구조 정보
                "structure_type": structure_info.get('type', 'unknown'),
                "structure_number": structure_info.get('number', ''),
                "parent_ref": structure_info.get('parent_ref', ''),
                "cross_refs": json.dumps(structure_info.get('cross_refs', []), ensure_ascii=False),
                "hierarchy_path": json.dumps(chunk.get('hierarchy_path', []), ensure_ascii=False),
                "original_chunk_id": structure_info.get('original_chunk_id', ''),

                # 하위 청크 정보
                "is_sub_chunk": chunk.get('is_sub_chunk', False),
                "sub_index": chunk.get('sub_index', 0),
                "parent_chunk_id": chunk.get('parent_chunk_id', ''),

                # 시간 메타데이터 추가
                "effective_date": effective_timestamp,
                "publication_date": publication_timestamp,
                "amendment_info": chunk.get('amendment_info', ''),

                # 타임스탬프
                "created_at": datetime.now().isoformat(),
                "last_updated": datetime.now().isoformat()
            }
            metadatas.append(metadata)

        return ids, documents, metadatas

    def query_enhanced(self, query_text: str, n_results: int = 5,
                      structure_filter: Optional[str] = None,
                      level_filter: Optional[int] = None,
                      date_filter: Optional[Dict[str, str]] = None,
                      include_context: bool = True) -> Dict[str, Any]:
        if not query_text or not query_text.strip():
            logger.warning("빈 검색어입니다.")
            return {'ids': [[]], 'documents': [[]], 'metadatas': [[]], 'distances': [[]]}

        try:
            count = self.collection.count()
            if count == 0:
                logger.info("검색할 문서가 없습니다.")
                return {'ids': [[]], 'documents': [[]], 'metadatas': [[]], 'distances': [[]]}

            # 필터 조건 구성
            where_clause = self._build_where_clause(structure_filter, level_filter, date_filter)

            # 기본 검색
            results = self.collection.query(
                query_texts=[query_text.strip()],
                n_results=min(max(1, n_results * 2), count),  # 더 많은 후보 확보
                where=where_clause
            )

            if not results['ids'][0]:
                return results

            # 컨텍스트 강화
            if include_context:
                enhanced_results = self._enhance_results_with_context(results, n_results)
                return enhanced_results
            else:
                # 결과 수 제한
                for key in results:
                    if isinstance(results[key], list) and results[key]:
                        results[key] = [results[key][0][:n_results]]
                return results

        except Exception as e:
            logger.error(f"검색 중 오류 발생: {e}")
            return {'ids': [[]], 'documents': [[]], 'metadatas': [[]], 'distances': [[]]}

    def _build_where_clause(self, structure_filter: Optional[str], level_filter: Optional[int], date_filter: Optional[Dict[str, str]]):
        """검색 필터 조건 구성"""
        filters = []

        if structure_filter:
            filters.append({"structure_type": {"$eq": structure_filter}})

        if level_filter and 1 <= level_filter <= 10:
            filters.append({"level": {"$eq": level_filter}})

        # 날짜 필터링 로직
        if date_filter:
            # ChromaDB는 문자열 비교를 지원하므로 YYYY-MM-DD 형식으로 날짜를 제공해야 함
            for key, value_dict in date_filter.items():
                if key in ["effective_date", "publication_date"]:
                    # {"$gte": "2025-01-01"} 같은 딕셔너리의 값을 타임스탬프로 변환
                    new_value_dict = {}
                    for op, date_str in value_dict.items():
                        try:
                            # 쿼리는 'YYYY-MM-DD' 형식을 가정
                            dt_object = datetime.strptime(date_str, '%Y-%m-%d')
                            timestamp = int(dt_object.timestamp())
                            new_value_dict[op] = timestamp
                        except (ValueError, TypeError):
                            continue # 잘못된 날짜 형식은 무시

                    if new_value_dict:
                        filters.append({key: new_value_dict})

        if not filters:
            return None
        elif len(filters) == 1:
            return filters[0]
        else:
            return {"$and": filters}

    def _enhance_results_with_context(self, results: Dict[str, Any], n_results: int) -> Dict[str, Any]:
        """검색 결과에 컨텍스트 추가"""
        enhanced_ids = []
        enhanced_docs = []
        enhanced_metas = []
        enhanced_distances = []

        added_chunks = set()

        for i, (chunk_id, doc, meta, distance) in enumerate(zip(
            results['ids'][0], results['documents'][0],
            results['metadatas'][0], results['distances'][0]
        )):
            if len(enhanced_ids) >= n_results:
                break

            if chunk_id in added_chunks:
                continue

            # 메인 청크 추가
            enhanced_ids.append(chunk_id)
            enhanced_docs.append(doc)
            enhanced_metas.append(meta)
            enhanced_distances.append(distance)
            added_chunks.add(chunk_id)

            # 관련 청크 찾기 (부모/자식 관계)
            if len(enhanced_ids) < n_results:
                related_chunks = self._find_related_chunks(meta, added_chunks, n_results - len(enhanced_ids))

                for rel_id, rel_doc, rel_meta, rel_dist in related_chunks:
                    if len(enhanced_ids) >= n_results:
                        break
                    enhanced_ids.append(rel_id)
                    enhanced_docs.append(rel_doc)
                    enhanced_metas.append(rel_meta)
                    enhanced_distances.append(rel_dist + 0.1)  # 약간 낮은 우선순위
                    added_chunks.add(rel_id)

        return {
            'ids': [enhanced_ids],
            'documents': [enhanced_docs],
            'metadatas': [enhanced_metas],
            'distances': [enhanced_distances]
        }

    def _find_related_chunks(self, main_meta: Dict[str, Any], added_chunks: Set[str], max_related: int) -> List[Tuple]:
        """관련 청크 찾기"""
        related = []

        try:
            # 부모 청크 찾기 (하위 청크인 경우)
            if main_meta.get('is_sub_chunk') and main_meta.get('parent_chunk_id'):
                parent_id = main_meta['parent_chunk_id']
                if parent_id not in added_chunks:
                    parent_result = self.collection.get(
                        ids=[parent_id],
                        include=['documents', 'metadatas']
                    )
                    if parent_result['ids']:
                        related.append((
                            parent_result['ids'][0],
                            parent_result['documents'][0],
                            parent_result['metadatas'][0],
                            0.05  # 부모는 높은 관련성
                        ))

            # 자식 청크들 찾기 (부모 청크인 경우)
            if not main_meta.get('is_sub_chunk'):
                chunk_id = main_meta.get('chunk_id')
                if chunk_id:
                    children_result = self.collection.get(
                        where={"parent_chunk_id": {"$eq": chunk_id}},
                        include=['documents', 'metadatas'],
                        limit=max_related
                    )
                    for child_id, child_doc, child_meta in zip(
                        children_result['ids'], children_result['documents'], children_result['metadatas']
                    ):
                        if child_id not in added_chunks and len(related) < max_related:
                            related.append((child_id, child_doc, child_meta, 0.08))

        except Exception as e:
            logger.warning(f"관련 청크 검색 중 오류: {e}")

        return related[:max_related]

    def get_comprehensive_statistics(self) -> Dict[str, Any]:
        """포괄적인 통계 정보"""
        try:
            count = self.collection.count()
            if count == 0:
                return {"total_chunks": 0, "message": "데이터가 없습니다."}

            # 전체 메타데이터 샘플링
            sample_size = min(2000, count)
            sample = self.collection.get(
                limit=sample_size,
                include=['metadatas']
            )

            if not sample['metadatas']:
                return {"total_chunks": count, "message": "메타데이터가 없습니다."}

            # 통계 계산
            stats = self._calculate_detailed_stats(sample['metadatas'], count)

            # 파일별 통계 추가
            file_stats = self._get_file_statistics()
            stats.update(file_stats)

            return stats

        except Exception as e:
            logger.error(f"통계 생성 중 오류: {e}")
            return {"total_chunks": 0, "error": str(e)}

    def _calculate_detailed_stats(self, metadatas: List[Dict], total_count: int) -> Dict[str, Any]:
        """상세 통계 계산"""
        char_counts = []
        structure_counts = defaultdict(int)
        level_counts = defaultdict(int)
        files = set()
        sub_chunk_count = 0
        cross_ref_count = 0
        temporal_data_count = defaultdict(int)

        for meta in metadatas:
            if meta.get("source_file"):
                files.add(meta["source_file"])

            char_count = meta.get("char_count", 0)
            if isinstance(char_count, (int, float)):
                char_counts.append(char_count)

            structure_type = meta.get("structure_type", "unknown")
            structure_counts[structure_type] += 1

            level = meta.get("level")
            if isinstance(level, (int, float)):
                level_counts[int(level)] += 1

            if meta.get("is_sub_chunk", False):
                sub_chunk_count += 1

            cross_refs = meta.get("cross_refs", "[]")
            try:
                refs = json.loads(cross_refs) if isinstance(cross_refs, str) else cross_refs
                if refs:
                    cross_ref_count += 1
            except:
                pass

            # 시간 메타데이터 통계
            if meta.get("effective_date"):
                temporal_data_count["effective_date"] += 1
            if meta.get("publication_date"):
                temporal_data_count["publication_date"] += 1
            if meta.get("amendment_info"):
                temporal_data_count["amendment_info"] += 1


        return {
            "total_chunks": total_count,
            "sampled_chunks": len(metadatas),
            "files": len(files),
            "avg_char_count": sum(char_counts) / max(1, len(char_counts)) if char_counts else 0,
            "min_char_count": min(char_counts) if char_counts else 0,
            "max_char_count": max(char_counts) if char_counts else 0,
            "structure_distribution": dict(structure_counts),
            "level_distribution": dict(level_counts),
            "sub_chunks": sub_chunk_count,
            "complete_structures": len(metadatas) - sub_chunk_count,
            "chunks_with_cross_refs": cross_ref_count,
            "cross_ref_percentage": (cross_ref_count / max(1, len(metadatas))) * 100,
            "temporal_data_counts": dict(temporal_data_count),
        }

    def _get_file_statistics(self) -> Dict[str, Any]:
        """파일별 통계"""
        try:
            with sqlite3.connect(self.hash_manager.db_path) as conn:
                cursor = conn.execute('''
                    SELECT COUNT(*) as file_count,
                           SUM(chunk_count) as total_chunks,
                           AVG(chunk_count) as avg_chunks_per_file,
                           AVG(processing_time) as avg_processing_time
                    FROM file_metadata
                ''')
                result = cursor.fetchone()

                if result:
                    return {
                        "processed_files": result[0] or 0,
                        "total_chunks_from_files": result[1] or 0,
                        "avg_chunks_per_file": result[2] or 0,
                        "avg_processing_time": result[3] or 0
                    }
        except Exception as e:
            logger.warning(f"파일 통계 조회 실패: {e}")

        return {"processed_files": 0}

In [None]:
def process_file_parallel(args: Tuple[str, 'Config']) -> Tuple[str, bool, int, float, Optional[str]]:
    file_path, config = args
    file_name = os.path.basename(file_path)
    start_time = time.time()
    logger = logging.getLogger(__name__) # Get logger for the process

    try:
        hash_manager = FileHashManager(config.METADATA_DB_PATH)

        if not hash_manager.should_process_file(file_path):
            logger.info(f"{file_name}: No changes detected, skipping.")
            return file_name, True, 0, 0, None

        processor = EnhancedDocumentProcessor(file_path, hash_manager)
        chunks, parent_chunk_map = processor.extract_structured_chunks()

        if not chunks:
            logger.warning(f"{file_name}: No chunks were extracted.")
            return file_name, False, 0, time.time() - start_time, None

        processing_time = time.time() - start_time

        # Save results to a temporary file for the main process
        temp_dir = os.path.join(os.getcwd(), "temp_chunks")
        os.makedirs(temp_dir, exist_ok=True)
        temp_file_path = os.path.join(temp_dir, f"chunks_{uuid.uuid4()}.pkl")

        with open(temp_file_path, 'wb') as f:
            pickle.dump({
                'chunks': chunks,
                'parent_chunk_map': parent_chunk_map,
                'file_name': file_name,
                'file_path': file_path,
                'processing_time': processing_time
            }, f)

        return file_name, True, len(chunks), processing_time, temp_file_path

    except Exception as e:
        logger.error(f"Error processing {file_name} in parallel: {e}", exc_info=True)
        return file_name, False, 0, time.time() - start_time, None

def main():

    try:
        config = Config()
        logger.info(f"Data directory: {config.DOCX_DIRECTORY}")
        logger.info(f"Max workers: {config.MAX_WORKERS}")
        logger.info(f"Parallel processing enabled: {config.ENABLE_PARALLEL}")

        # config.COLLECTION_NAME = "legal_docs"

        print(config.COLLECTION_NAME)

        data_path = Path(config.DOCX_DIRECTORY)
        if not data_path.exists():
            data_path.mkdir(parents=True, exist_ok=True)
            logger.info(f"Created directory '{config.DOCX_DIRECTORY}'. Please add document files and run again.")
            return

        try:
            # 지원하는 모든 파일 확장자 포함
            supported_extensions = (".docx", ".pdf", ".pptx")
            files_to_process = [
                os.path.join(config.DOCX_DIRECTORY, f)
                for f in os.listdir(config.DOCX_DIRECTORY)
                if f.lower().endswith(supported_extensions) and not f.startswith("~")
            ]
        except PermissionError:
            logger.error(f"Permission denied for directory '{config.DOCX_DIRECTORY}'.")
            return

        if not files_to_process:
            logger.info(f"No '.docx', '.pdf', or '.pptx' files found in '{config.DOCX_DIRECTORY}'.")
            return

        logger.info(f"Found {len(files_to_process)} files to process.")

        db_manager = EnhancedVectorDBManager(config)
        total_chunks_processed = 0
        successful_files = 0
        start_time = time.time()

        if config.ENABLE_PARALLEL and len(files_to_process) > 1:
            logger.info("Running in parallel processing mode.")
            temp_result_files = []

            with ProcessPoolExecutor(max_workers=config.MAX_WORKERS) as executor:
                file_args = [(file_path, config) for file_path in files_to_process]
                futures = {executor.submit(process_file_parallel, args): args[0] for args in file_args}

                for i, future in enumerate(as_completed(futures)):
                    file_name, success, num_chunks, p_time, temp_file = future.result()
                    if success:
                        print(f"--- [{i+1}/{len(files_to_process)}] 파일 처리 완료: {file_name} ({p_time:.2f}s, {num_chunks} chunks) ---")
                        if temp_file:
                            temp_result_files.append(temp_file)
                        logger.info(f"[{i+1}/{len(files_to_process)}] File processed: {file_name} ({p_time:.2f}s, {num_chunks} chunks)")
                    else:
                        print(f"--- [{i+1}/{len(files_to_process)}] 파일 처리 실패: {file_name} ---")
                        logger.error(f"[{i+1}/{len(files_to_process)}] Failed to process file: {file_name}")

            logger.info("Chunk extraction complete. Starting database ingestion...")
            for temp_file in temp_result_files:
                try:
                    with open(temp_file, 'rb') as f:
                        data = pickle.load(f)

                    db_manager.upsert_chunks_incremental(data['file_name'], data['chunks'], data['parent_chunk_map'])
                    total_chunks_processed += len(data['chunks'])
                    successful_files += 1
                except Exception as e:
                    logger.error(f"Failed to ingest data from {temp_file}: {e}")
                finally:
                    os.remove(temp_file)

            temp_dir = os.path.join(os.getcwd(), "temp_chunks")
            if os.path.exists(temp_dir):
                try:
                    os.rmdir(temp_dir)
                except OSError:
                    pass # Directory might not be empty if an error occurred

        else: # Sequential processing
            logger.info("Running in sequential processing mode.")
            for i, file_path in enumerate(files_to_process):
                file_name = os.path.basename(file_path)
                print(f"\n--- [{i+1}/{len(files_to_process)}] 파일 처리 시작: {file_name} ---")
                logger.info(f"[{i+1}/{len(files_to_process)}] Processing: {file_name}")
                p_start_time = time.time()

                if not db_manager.should_process_file(file_path):
                    logger.info(f"  -> No changes detected, skipping.")
                    continue

                try:
                    processor = EnhancedDocumentProcessor(file_path, db_manager.hash_manager)
                    chunks, parent_chunk_map = processor.extract_structured_chunks()

                    if chunks:
                        db_manager.upsert_chunks_incremental(file_name, chunks, parent_chunk_map)
                        p_time = time.time() - p_start_time
                        total_chunks_processed += len(chunks)
                        successful_files += 1
                        logger.info(f"  -> Completed in {p_time:.2f}s, found {len(chunks)} chunks.")

                    else:
                        logger.warning(f"  -> No chunks were extracted from {file_name}.")
                except Exception as e:
                    logger.error(f"  -> An error occurred while processing {file_name}: {e}", exc_info=True)

        # --- Final Report ---
        total_time = time.time() - start_time
        logger.info("\n" + "="*20 + " Processing Complete " + "="*20)
        logger.info(f"Total execution time: {total_time:.2f} seconds")
        logger.info(f"Successfully processed files: {successful_files}/{len(files_to_process)}")
        logger.info(f"Total chunks ingested/updated: {total_chunks_processed}")

        print("\n" + "="*20 + " Processing Complete " + "="*20)
        print(f"Total execution time: {total_time:.2f} seconds")
        print(f"Successfully processed files: {successful_files}/{len(files_to_process)}")
        print(f"Total chunks ingested/updated: {total_chunks_processed}")

        stats = db_manager.get_comprehensive_statistics()
        logger.info(f"Database Statistics: {json.dumps(stats, ensure_ascii=False, indent=2)}")

        # --- Enhanced Search Test ---
        if stats.get("total_chunks", 0) > 0:
            logger.info("\n" + "="*20 + " Enhanced Search Test " + "="*20)
            test_queries = [
                {"query": "사장 임명", "structure_filter": "article"},
                {"query": "항만시설 사용을 위한 허가"},
                {"query": "준공확인", "level_filter": 3},
                {"query": "사용료", "include_context": False},
                {"query": "사용료", "date_filter": {"effective_date": {"$gte": "2025-01-01"}}}
            ]

            for test_case in test_queries:
                query = test_case["query"]
                options = {k:v for k,v in test_case.items() if k != 'query'}
                logger.info(f"\n[Query]: '{query}' (Options: {options})")
                print(f"\n[Query]: '{query}' (Options: {options})")

                try:
                    results = db_manager.query_enhanced(
                        query_text=query,
                        n_results=3,
                        structure_filter=test_case.get("structure_filter"),
                        level_filter=test_case.get("level_filter"),
                        date_filter=test_case.get("date_filter"),
                        include_context=test_case.get("include_context", True)
                    )

                    if results and results['ids'] and results['ids'][0]:
                        for i, (meta, doc, dist) in enumerate(zip(results['metadatas'][0], results['documents'][0], results['distances'][0])):
                            context_info = ""
                            if meta.get('is_sub_chunk'):
                                context_info = f"(Sub-chunk of: ...{meta.get('parent_chunk_id', 'N/A')[-15:]})"

                            temporal_info = f"시행일: {meta.get('effective_date', 'N/A')}"

                            logger.info(f"  [{i+1}] Title: {meta.get('article_title', 'N/A')} {context_info}")
                            print(f"  [{i+1}] Title: {meta.get('article_title', 'N/A')} {context_info}")
                            logger.info(f"      - Similarity: {1 - dist:.4f} | Structure: {meta.get('structure_type')}:{meta.get('structure_number')} | {temporal_info}")
                            logger.info(f"      - Source: {meta.get('source_file')}")
                            logger.info(f"      - Content: {doc[:120].replace(chr(10), ' ')}...")
                            print(f"      - Similarity: {1 - dist:.4f} | Structure: {meta.get('structure_type')}:{meta.get('structure_number')} | {temporal_info}")
                            print(f"      - Source: {meta.get('source_file')}")
                            print(f"      - Content: {doc[:120].replace(chr(10), ' ')}...")

                    else:
                        logger.info("  -> No results found.")

                except Exception as e:
                    logger.error(f"  -> An error occurred during search test: {e}", exc_info=True)
        else:
            logger.warning("No chunks in DB, skipping search test.")

    except KeyboardInterrupt:
        logger.info("\nProcess interrupted by user.")
    except Exception as e:
        logger.error(f"An unexpected error occurred in the main process: {e}", exc_info=True)
        sys.exit(1)

In [None]:
if __name__ == "__main__":
    mp.freeze_support()
    main()

legal_manuals
    -> '항만구역 내 출입통제구역 지정·운영 매뉴얼(해양수산부, 2024.1).pdf': 청크 추출 및 분석 시작...
    -> '항만구역 내 출입통제구역 지정·운영 매뉴얼(해양수산부, 2024.1).pdf': 청크 96개 추출 완료.
--- [1/2] 파일 처리 완료: 항만구역 내 출입통제구역 지정·운영 매뉴얼(해양수산부, 2024.1).pdf (10.07s, 96 chunks) ---
    -> '해양수산부_항만건설 안전사고 예방 매뉴얼_20231201.pptx': 청크 추출 및 분석 시작...
    -> '해양수산부_항만건설 안전사고 예방 매뉴얼_20231201.pptx': 청크 426개 추출 완료.
--- [2/2] 파일 처리 완료: 해양수산부_항만건설 안전사고 예방 매뉴얼_20231201.pptx (16.71s, 426 chunks) ---
    -> '항만구역 내 출입통제구역 지정·운영 매뉴얼(해양수산부, 2024.1).pdf': 청크 96개 DB 저장 시작...
        -> 배치 1/1 저장 중...
  -> 항만구역 내 출입통제구역 지정·운영 매뉴얼(해양수산부, 2024.1).pdf: 1/1 배치 성공적으로 저장
    -> '해양수산부_항만건설 안전사고 예방 매뉴얼_20231201.pptx': 청크 426개 DB 저장 시작...
        -> 배치 1/5 저장 중...
      

In [None]:
import shutil
import os
import chromadb
import logging

# 이 셀을 실행하기 전에 노트북의 다른 셀에서
# Config 클래스가 모두 정의되어 있어야 합니다.
# 로거가 정의되지 않은 경우를 대비한 기본 설정
if 'logger' not in globals():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    logger = logging.getLogger("VerificationTest")


def verify_main_pipeline_results():
    """
    이전 셀들에서 실행된 메인 파이프라인의 결과로 생성된
    ChromaDB 컬렉션들을 검증합니다.
    """
    print("="*60)
    print("  메인 파이프라인 실행 결과(DB 컬렉션) 검증을 시작합니다.")
    print("="*60)

    # 1. 원본 Config 클래스에서 DB 경로를 가져옵니다.
    #    이렇게 하면 원본 코드의 설정이 변경되어도 테스트 코드를 수정할 필요가 없습니다.
    try:
        original_db_path = Config.DB_PATH
        # 사용자가 순차적으로 실행했다고 가정한 검증 대상 컬렉션 목록
        expected_collections = ["legal_manuals", "legal_docs"]
    except NameError:
        print("❌ 검증 실패: 'Config' 클래스가 정의되지 않았습니다.")
        print("이전 셀들을 먼저 실행하여 Config 클래스를 메모리에 로드해주세요.")
        return

    all_tests_passed = True
    print(f"검증 대상 DB 경로: {original_db_path}")

    # 2. DB 경로 존재 여부 확인
    if not os.path.exists(original_db_path):
        print(f"❌ 검증 실패: DB 경로 '{original_db_path}'를 찾을 수 없습니다.")
        print("메인 파이프라인을 먼저 실행하여 DB를 생성해주세요.")
        return

    try:
        # 3. DB에 연결하고 모든 컬렉션 목록 가져오기
        client = chromadb.PersistentClient(path=original_db_path)
        existing_collections = [col.name for col in client.list_collections()]
        print(f"현재 DB에 존재하는 컬렉션: {existing_collections}")

        # 4. 기대하는 각 컬렉션이 존재하는지 확인
        for collection_name in expected_collections:
            print(f"\n--- '{collection_name}' 컬렉션 존재 여부 확인 ---")
            if collection_name in existing_collections:
                print(f"✅ 성공: '{collection_name}' 컬렉션이 DB에 존재합니다.")
                # 추가적으로 컬렉션의 데이터 수 확인
                collection = client.get_collection(name=collection_name)
                count = collection.count()
                print(f"  -> '{collection_name}' 컬렉션의 데이터(청크) 수: {count}")
                if count == 0:
                     print(f"  ⚠️ 경고: 컬렉션은 존재하지만 데이터가 없습니다.")
            else:
                print(f"❌ 실패: '{collection_name}' 컬렉션을 찾을 수 없습니다.")
                all_tests_passed = False

    except Exception as e:
        print(f"❌ 오류: DB 검증 중 예외가 발생했습니다: {e}")
        logger.error("DB 검증 실패", exc_info=True)
        all_tests_passed = False

    # 5. 최종 결과 요약
    print("\n" + "="*25 + " 검증 결과 요약 " + "="*25)
    if all_tests_passed:
        print("🎉 모든 필수 컬렉션이 성공적으로 검증되었습니다! 🎉")
    else:
        print("🔥 일부 컬렉션을 찾지 못했습니다. 위 로그를 확인해주세요. 🔥")
    print("="*61)


# --- 검증 실행 ---
verify_main_pipeline_results()


  메인 파이프라인 실행 결과(DB 컬렉션) 검증을 시작합니다.
검증 대상 DB 경로: /content/chroma_db
현재 DB에 존재하는 컬렉션: ['legal_manuals', 'legal_docs']

--- 'legal_manuals' 컬렉션 존재 여부 확인 ---
✅ 성공: 'legal_manuals' 컬렉션이 DB에 존재합니다.
  -> 'legal_manuals' 컬렉션의 데이터(청크) 수: 522

--- 'legal_docs' 컬렉션 존재 여부 확인 ---
✅ 성공: 'legal_docs' 컬렉션이 DB에 존재합니다.
  -> 'legal_docs' 컬렉션의 데이터(청크) 수: 32193

🎉 모든 필수 컬렉션이 성공적으로 검증되었습니다! 🎉
