In [None]:
import os, re, zipfile, subprocess, time, unicodedata
from pathlib import Path
from difflib import get_close_matches
from typing import List, Dict, Any
import pandas as pd
from lxml import etree
import fitz
import tiktoken
from dotenv import load_dotenv, find_dotenv
from langchain_chroma import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from sentence_transformers import SentenceTransformer

In [None]:
# ====== 경로/설정 ======
DATA_DIR = Path("data")
FILES_DIR = DATA_DIR / "files"              # 원본 파일 저장 폴더
LIST_XLSX = DATA_DIR / "data_list.xlsx"
LIST_CSV  = DATA_DIR / "data_list.csv"
CHROMA_DIR = (Path("store/chroma")).resolve(); CHROMA_DIR.mkdir(parents=True, exist_ok=True)
EMBED_BACKEND = "openai"   # "openai" | "bge"
OPENAI_EMBED_MODEL = "text-embedding-3-small"
BGE_MODEL = "BAAI/bge-m3"

# 한국어 문서 기준 무난한 값 (너무 짧으면 문맥 끊김, 너무 길면 검색 정밀도↓/속도↓)
CHUNK_CHARS = 1200
CHUNK_OVERLAP = 300

In [None]:
# 노트북/스크립트 어디서 실행하든 .env를 찾도록
HERE = Path(__file__).resolve().parent if "__file__" in globals() else Path.cwd()
env_path = (HERE / ".env")
if env_path.exists():
    load_dotenv(dotenv_path=env_path, override=False)
else:
    # CWD 기준 탐색도 시도
    load_dotenv(find_dotenv(usecwd=True), override=False)

# 런타임에 실제로 잡혔는지 강제 확인(없으면 바로 오류)
api_key = os.getenv("OPENAI_API_KEY")
if not api_key or not api_key.startswith("sk-"):
    raise RuntimeError("[ENV] OPENAI_API_KEY가 런타임에 설정되지 않았습니다. .env 위치/CWD/커널 확인 필요.")
print("[ENV] OPENAI_API_KEY loaded:", api_key[:10] + "…")


In [None]:
# ====== 파일명 정규화/매칭 ======
def canon_filename(name: str | None) -> str:
    """파일명 매칭 표준화: NFC, 숨은공백 제거, 하이픈/괄호 통일, 공백축약, 확장자 소문자"""
    if not name:
        return ""
    s = unicodedata.normalize("NFC", str(name))
    s = (s.replace("\uFEFF","")
           .replace("\u200B","")
           .replace("\u00A0"," "))
    # 각종 대쉬/괄호 통일
    s = (s.replace("–","-").replace("—","-").replace("−","-")
           .replace("（","(").replace("）",")"))
    s = re.sub(r"\s+"," ", s.strip())
    p = Path(s)
    return p.stem + p.suffix.lower()

def build_files_index(root: Path) -> dict[str, Path]:
    """하위폴더까지 모두 인덱싱"""
    idx: dict[str, Path] = {}
    for p in root.rglob("*"):
        if p.is_file():
            idx[canon_filename(p.name)] = p
    return idx

def find_path_by_filename(files_index: dict[str, Path], raw: str | None) -> Path | None:
    if not raw:
        return None
    key = canon_filename(raw)
    # 1) 완전 일치
    if key in files_index:
        return files_index[key]
    # 2) 확장자 제외 스템 일치
    stem = Path(key).stem
    cand = [k for k in files_index if Path(k).stem == stem]
    if cand:
        return files_index[cand[0]]
    # 3) 근사 매칭
    close = get_close_matches(key, list(files_index), n=1, cutoff=0.88)
    return files_index[close[0]] if close else None

In [None]:
# ====== 텍스트 유틸 ======
def clean_text(s: str) -> str:
    if not isinstance(s, str):
        s = "" if pd.isna(s) else str(s)
    s = s.replace("\x00", "")
    s = re.sub(r"\n{3,}", "\n\n", s.strip())
    return s

def maybe_ocr_needed(text: str) -> bool:
    letters = re.findall(r"[A-Za-z가-힣]", text or "")
    return len(letters) < 100

In [None]:
# ====== 파일별 텍스트 추출 ======
def pdf_to_text(pdf_path: Path) -> str:
    doc = fitz.open(pdf_path)
    pages = [p.get_text("text") for p in doc]
    txt = "\n\n".join(pages)
    if maybe_ocr_needed(txt):
        try:
            ocrd = pdf_path.with_suffix(".ocr.pdf")
            subprocess.run(["ocrmypdf", "--force-ocr", "--skip-text", str(pdf_path), str(ocrd)], check=True)
            doc = fitz.open(ocrd)
            pages = [p.get_text("text") for p in doc]
            txt = "\n\n".join(pages)
        except Exception as e:
            print(f"[WARN] OCR 실패: {pdf_path.name} - {e}")
    return clean_text(txt)

def hwpx_to_text(hwpx_path: Path) -> str:
    texts = []
    with zipfile.ZipFile(hwpx_path) as zf:
        for name in zf.namelist():
            if name.startswith("Contents/") and name.endswith(".xml"):
                try:
                    xml = zf.read(name)
                    root = etree.fromstring(xml)
                    for node in root.iter():
                        if node.tag.endswith("p"):
                            t = "".join(node.itertext()).strip()
                            if t:
                                texts.append(t)
                except Exception as e:
                    print(f"[WARN] HWPX XML 파싱 실패: {name} - {e}")
    return clean_text("\n".join(texts))

def hwp_to_text(hwp_path: Path) -> str:
    try:
        out = subprocess.check_output(["hwp5txt", str(hwp_path)], text=True)
        return clean_text(out)
    except Exception as e:
        print(f"[WARN] hwp5txt 실패: {hwp_path.name} - {e}")
        return ""

def extract_text_by_format(path: Path, fmt: str) -> str:
    fmt = (fmt or path.suffix).lower().strip(".")
    if fmt == "pdf":
        return pdf_to_text(path)
    if fmt == "hwpx":
        return hwpx_to_text(path)
    if fmt == "hwp":
        return hwp_to_text(path)
    return ""

In [None]:
# ====== 임베딩 백엔드 ======
def get_embedder():
    if EMBED_BACKEND == "openai":
        # ✅ batch_size 사용, 재시도 강화
        return OpenAIEmbeddings(model=OPENAI_EMBED_MODEL, max_retries=8, api_key=api_key)
    else:
        class STEmb:
            def __init__(self, model_name):
                self.model = SentenceTransformer(model_name)
            def embed_documents(self, texts: List[str]):
                return self.model.encode(texts, batch_size=16, normalize_embeddings=True).tolist()
            def embed_query(self, text: str):
                return self.model.encode([text], normalize_embeddings=True).tolist()[0]
        return STEmb(BGE_MODEL)


In [None]:
# ====== 메타데이터 적재 ======
def load_data_list() -> pd.DataFrame:
    if LIST_XLSX.exists():
        df_x = pd.read_excel(LIST_XLSX)
    else:
        df_x = pd.DataFrame()
    if LIST_CSV.exists():
        df_c = pd.read_csv(LIST_CSV)
    else:
        df_c = pd.DataFrame()

    if not df_x.empty and not df_c.empty:
        key = "파일명" if "파일명" in df_x.columns and "파일명" in df_c.columns else None
        if key:
            df = df_c.set_index(key).combine_first(df_x.set_index(key)).reset_index()
        else:
            df = pd.concat([df_x, df_c]).drop_duplicates()
    else:
        df = df_x if not df_x.empty else df_c

    cols = {
        "공고 번호": "notice_id",
        "공고 차수": "round",
        "사업명": "project_name",
        "사업 금액": "budget",
        "발주 기관": "agency",
        "공개 일자": "published_at",
        "입찰 참여 시작일": "bid_start",
        "입찰 참여 마감일": "bid_end",
        "사업 요약": "summary",
        "파일형식": "file_format",
        "파일명": "filename",
        "텍스트": "text",
    }
    df = df.rename(columns={k: v for k, v in cols.items() if k in df.columns})
    for c in ["notice_id","round","project_name","budget","agency","published_at",
              "bid_start","bid_end","summary","file_format","filename","text"]:
        if c in df.columns:
            df[c] = df[c].apply(clean_text)
        else:
            df[c] = None
    return df

In [None]:
# ====== Document 생성 ======
def row_to_document(row: pd.Series, files_index: dict[str, Path]) -> Document:
    filename = row["filename"]
    fmt = row["file_format"] or (Path(filename).suffix.strip(".") if filename else None)
    text = row["text"]

    if not text:
        path = find_path_by_filename(files_index, filename)
        if path:
            text = extract_text_by_format(path, fmt or path.suffix.strip("."))
        else:
            print(f"[MISS] files/에서 못 찾음 -> {filename!r}")
            text = ""

    meta = {
        "filename": filename,
        "notice_id": row["notice_id"],
        "round": row["round"],
        "project_name": row["project_name"],
        "budget": row["budget"],
        "agency": row["agency"],
        "published_at": row["published_at"],
        "bid_start": row["bid_start"],
        "bid_end": row["bid_end"],
        "file_format": fmt,
    }
    return Document(page_content=text, metadata=meta)

In [None]:
def main():
    df = load_data_list()
    if df.empty:
        print("[ERROR] data_list가 비어있습니다.")
        return

    # files 인덱스 구축 (하위폴더 포함)
    files_index = build_files_index(FILES_DIR)

    # 미스 리포트
    miss = []
    for fn in df["filename"]:
        if not fn:
            continue
        if find_path_by_filename(files_index, fn) is None:
            miss.append(fn)
    if miss:
        print(f"[WARN] files/에 없는 파일 {len(miss)}건 (예: {miss[:5]})")
        # 추가 리포트 파일 생성
        rows = []
        for fn in miss:
            key = canon_filename(fn)
            cand = get_close_matches(key, list(files_index), n=1, cutoff=0.7)
            rows.append({"원본파일명": fn, "정규화후": key, "근사후보": cand[0] if cand else ""})
        if rows:
            pd.DataFrame(rows).to_csv("missing_files_report.csv", index=False, encoding="utf-8-sig")
            print("[REPORT] missing_files_report.csv 생성")

    # Document 리스트 생성
    docs: List[Document] = []
    for _, row in df.iterrows():
        doc = row_to_document(row, files_index)
        if doc.page_content:
            docs.append(doc)

    if not docs:
        print("[ERROR] 텍스트가 비어 문서가 없습니다. 추출 파이프라인/파일명 확인 필요.")
        return

    # 청킹
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_CHARS,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n제 ", "\n## ", "\n### ", "\n", " ", ""],
    )
    chunks = splitter.split_documents(docs)
    print(f"[INFO] total docs: {len(docs)}, chunks: {len(chunks)}")

    # 임베딩 & Chroma 인덱스
    embeddings = get_embedder()
    vectordb = Chroma(
        collection_name="bidmate_rag",
        embedding_function=embeddings,
        persist_directory=str(CHROMA_DIR),
    )

    # 이미 들어간 ID 스킵(재실행 이어붙기) — 선택적 최적화
    existing_ids = set()
    try:
        got = vectordb._collection.get(include=[], limit=0)  # 일부 드라이버에서 limit=0 허용X면 제외
        existing_ids = set(got.get("ids", []))
    except Exception:
        pass

    # 토큰 길이 측정기
    enc = tiktoken.get_encoding("cl100k_base")
    def tok_len(s: str) -> int:
        return len(enc.encode(s or ""))

    # 안전한 배치: 토큰 예산 + 개수 상한 + 재시도(backoff)
    TOKEN_BUDGET = 150_000
    MAX_ITEMS    = 8

    texts  = [c.page_content for c in chunks]
    metas  = [c.metadata      for c in chunks]
    # 파일명+인덱스 기반 고유 ID
    ids    = [f"{canon_filename(m.get('filename') or 'no-file')}::{i:08d}" for i, m in enumerate(metas)]

    # 기존 ID 제외
    if existing_ids:
        filtered = [(t,m,i) for t,m,i in zip(texts, metas, ids) if i not in existing_ids]
        texts, metas, ids = (list(x) for x in zip(*filtered)) if filtered else ([],[],[])
        print(f"[INFO] skipping existing ids, to_add: {len(texts)}")

    i, n = 0, len(texts)
    while i < n:
        budget, j = 0, i
        while j < n and (j - i) < MAX_ITEMS:
            tlen = tok_len(texts[j])
            if tlen > 20000:
                # 방어적 자르기 (권장: 청킹값을 더 줄이는 게 근본 해결)
                texts[j] = texts[j][:8000]
                tlen = tok_len(texts[j])
            if budget + tlen > TOKEN_BUDGET:
                break
            budget += tlen
            j += 1

        t_batch, m_batch, id_batch = texts[i:j], metas[i:j], ids[i:j]

        for attempt in range(1, 9):  # 최대 8회 백오프
            try:
                vecs = embeddings.embed_documents(t_batch)
                vectordb._collection.add(
                    embeddings=vecs,
                    documents=t_batch,
                    metadatas=m_batch,
                    ids=id_batch,
                )
                print(f"  -> indexed {j}/{n} (batch tokens~{budget})")
                break
            except Exception as e:
                wait = min(30, 2 ** attempt)
                print(f"[WARN] embed batch failed (try {attempt}): {e} -> sleep {wait}s")
                time.sleep(wait)

        i = j

    if hasattr(vectordb, "persist"):
        vectordb.persist()
    elif hasattr(vectordb._client, "persist"):
        vectordb._client.persist()
    print(f"✅ Indexed {n} chunks into {CHROMA_DIR}")

if __name__ == "__main__":
    main()