# VectorDB

| DB | Description |
|----|-------------|
| Regulation DB | 회사 내규 또는 법률에 대한 VectorDB |
| Space DB | 사옥 내 정보, 인사 조직도 및 연락처 등 정보에 대한 VectorDB |

In [None]:
# ---------------------------------------------------------
# 📌 VectorDB 파이프라인 (Jupyter Notebook 테스트용)
# - PDF 텍스트 + 이미지 Embedding
# - FAISS 기반 VectorDB 구축
# - 문서 목록 조회 / 삭제 기능 포함
# ---------------------------------------------------------

import os
import shutil
from pathlib import Path
from typing import List, Tuple
from io import BytesIO

from PIL import Image
import numpy as np
from tqdm import tqdm
import fitz  # PyMuPDF
import faiss

from langchain.vectorstores import FAISS as LangChainFAISS
from langchain.schema import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.in_memory import InMemoryDocstore

from sentence_transformers import SentenceTransformer

# -----------------------------------------
# ✅ 설정
# -----------------------------------------

DATA_DIRS = {
    "regulation": {
        "raw": "./data/raw/regulation",
        "processed": "./data/processed/regulation",
        "embeddings": "./data/embeddings/regulation"
    },
    "space": {
        "raw": "./data/raw/space",
        "processed": "./data/processed/space",
        "embeddings": "./data/embeddings/space"
    }
}

VECTORDB_DIRS = {
    "regulation": "./vectordb/regulation_db",
    "space": "./vectordb/space_db"
}

EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"

# 문장 임베딩용 모델 (빠른 사용을 위해 미리 로드)
model = SentenceTransformer(EMBEDDING_MODEL)

# -----------------------------------------
# ✅ PDF 텍스트 추출
# -----------------------------------------

def extract_text_from_pdf(pdf_path: str) -> str:
    doc = fitz.open(pdf_path)
    texts = []
    for page in doc:
        texts.append(page.get_text())
    return "\n".join(texts)

# -----------------------------------------
# ✅ PDF 이미지 추출
# -----------------------------------------

def extract_images_from_pdf(pdf_path: str) -> List[Image.Image]:
    doc = fitz.open(pdf_path)
    images = []
    for page in doc:
        for img_index, img in enumerate(page.get_images(full=True)):
            xref = img[0]
            base_image = doc.extract_image(xref)
            image = Image.open(BytesIO(base_image["image"]))
            images.append(image)
    return images

# -----------------------------------------
# ✅ 이미지 → 임베딩 변환
# -----------------------------------------

def image_to_embedding(image: Image.Image) -> np.ndarray:
    image = image.resize((64, 64)).convert("RGB")
    arr = np.array(image).flatten() / 255.0
    return arr[:384] if arr.size >= 384 else np.pad(arr, (0, 384 - arr.size))

# -----------------------------------------
# ✅ VectorDB 구축
# -----------------------------------------

def build_vectordb(target: str):
    data_info = DATA_DIRS[target]
    vectordb_dir = VECTORDB_DIRS[target]

    os.makedirs(data_info["processed"], exist_ok=True)
    os.makedirs(data_info["embeddings"], exist_ok=True)
    os.makedirs(vectordb_dir, exist_ok=True)

    index = faiss.IndexFlatL2(384)
    docs = []

    embedding_model = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)

    files = list(Path(data_info["raw"]).glob("*.pdf"))

    if len(files) == 0:
        print(f"[경고] {data_info['raw']} 경로에 PDF 파일이 없습니다.")
        return

    for file in tqdm(files, desc=f"Processing {target} PDFs"):

        text = extract_text_from_pdf(str(file))
        text_file = Path(data_info["processed"]) / (file.stem + ".txt")
        text_file.write_text(text, encoding="utf-8")

        text_embedding = embedding_model.embed_query(text)
        emb_path = Path(data_info["embeddings"]) / (file.stem + "_text.npy")
        np.save(emb_path, text_embedding)

        index.add(np.array([text_embedding], dtype=np.float32))
        docs.append(Document(page_content=text, metadata={"source": str(file), "type": "text"}))

        images = extract_images_from_pdf(str(file))
        for idx, img in enumerate(tqdm(images, desc=f"{file.stem} 이미지 처리")):
            img_embedding = image_to_embedding(img)
            emb_path = Path(data_info["embeddings"]) / f"{file.stem}_img{idx}.npy"
            np.save(emb_path, img_embedding)

            index.add(np.array([img_embedding], dtype=np.float32))
            docs.append(Document(page_content=f"Image from {file.stem} (index {idx})", metadata={"source": str(file), "type": f"image_{idx}"}))

    faiss.write_index(index, os.path.join(vectordb_dir, "index.faiss"))
    index_to_docstore_id = {i: str(i) for i in range(len(docs))}
    docstore = InMemoryDocstore({str(i): doc for i, doc in enumerate(docs)})

    lc_faiss = LangChainFAISS(embedding_model, index, docstore, index_to_docstore_id)
    lc_faiss.save_local(vectordb_dir)

    print(f"✅ Saved VectorDB to {vectordb_dir}")

# -----------------------------------------
# ✅ 문서 추가
# -----------------------------------------

def add_document_to_vectordb(pdf_path: str, target: str, security_level: str = "중"):
    vectordb_dir = VECTORDB_DIRS[target]
    embedding_model = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    vectordb = LangChainFAISS.load_local(vectordb_dir, embedding_model, allow_dangerous_deserialization=True)

    text = extract_text_from_pdf(pdf_path)
    text_embedding = embedding_model.embed_query(text)

    text_doc = Document(
        page_content=text,
        metadata={"source": pdf_path, "type": "text", "security_level": security_level}
    )

    vectordb.index.add(np.array([text_embedding], dtype=np.float32))
    new_docstore_id = str(len(vectordb.docstore._dict))
    vectordb.docstore._dict[new_docstore_id] = text_doc
    vectordb.index_to_docstore_id[vectordb.index.ntotal - 1] = new_docstore_id

    images = extract_images_from_pdf(pdf_path)
    for idx, img in enumerate(tqdm(images, desc=f"{Path(pdf_path).stem} 이미지 추가")):
        img_embedding = image_to_embedding(img)
        img_doc = Document(
            page_content=f"Image from {Path(pdf_path).stem} (index {idx})",
            metadata={"source": pdf_path, "type": f"image_{idx}", "security_level": security_level}
        )

        vectordb.index.add(np.array([img_embedding], dtype=np.float32))
        new_docstore_id = str(len(vectordb.docstore._dict))
        vectordb.docstore._dict[new_docstore_id] = img_doc
        vectordb.index_to_docstore_id[vectordb.index.ntotal - 1] = new_docstore_id

    vectordb.save_local(vectordb_dir)

    print(f"✅ {pdf_path} 추가 완료 (보안등급: {security_level})")

# -----------------------------------------
# ✅ 문서 목록 조회
# -----------------------------------------

def list_vectordb(vectordb_dir: str) -> List[Tuple[str, str]]:
    """
    VectorDB 내 문서 목록을 조회합니다. (파일명 + 보안등급만 반환, 중복 제거)

    Returns:
        List of (파일명, 보안등급)
    """
    embedding_model = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    vectordb = LangChainFAISS.load_local(vectordb_dir, embedding_model, allow_dangerous_deserialization=True)

    docs = vectordb.docstore._dict.values()

    summary = {}
    for doc in docs:
        source = doc.metadata.get("source", "unknown")
        security_level = doc.metadata.get("security_level", "중")

        file_name = Path(source).name
        summary[file_name] = security_level  # 같은 파일명이면 덮어씀 (보안등급은 동일함)

    return list(summary.items())

# -----------------------------------------
# ✅ 특정 문서 삭제
# -----------------------------------------

def delete_document_from_vectordb(target: str, doc_source_name: str):
    vectordb_dir = VECTORDB_DIRS[target]
    embedding_model = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
    vectordb = LangChainFAISS.load_local(vectordb_dir, embedding_model, allow_dangerous_deserialization=True)

    remaining_docs = []
    delete_docs = []

    for doc in vectordb.docstore._dict.values():
        source = doc.metadata.get("source", "")
        if doc_source_name in source:
            delete_docs.append(doc)
        else:
            remaining_docs.append(doc)

    if not delete_docs:
        print(f"❗ 삭제 대상 문서를 찾을 수 없습니다: {doc_source_name}")
        return

    print(f"✅ {len(delete_docs)}개 문서 삭제")

    new_index = faiss.IndexFlatL2(384)
    new_docstore = {}
    new_index_to_docstore_id = {}

    for i, doc in enumerate(remaining_docs):
        content = doc.page_content
        doc_type = doc.metadata.get("type", "unknown")

        if doc_type.startswith("image"):
            arr = image_to_embedding(Image.new("RGB", (64, 64)))
        else:
            arr = embedding_model.embed_query(content)

        new_index.add(np.array([arr], dtype=np.float32))
        new_docstore[str(i)] = doc
        new_index_to_docstore_id[new_index.ntotal - 1] = str(i)

    new_vectordb = LangChainFAISS(embedding_model, new_index, InMemoryDocstore(new_docstore), new_index_to_docstore_id)
    new_vectordb.save_local(vectordb_dir)

    print(f"✅ VectorDB 재구성 완료 ({vectordb_dir})")

# -----------------------------------------
# ✅ VectorDB 전체 삭제
# -----------------------------------------

def delete_vectordb(vectordb_dir: str):
    if os.path.exists(vectordb_dir):
        shutil.rmtree(vectordb_dir)
        print(f"✅ {vectordb_dir} 전체 삭제 완료")
    else:
        print("❗ 삭제할 VectorDB가 없습니다.")

In [11]:
def test_vectordb_flow():
    target = "regulation"  # 테스트할 target 설정
    vectordb_dir = VECTORDB_DIRS.get(target)
    index_file = os.path.join(vectordb_dir, "index.faiss")

    if vectordb_dir is None:
        print(f"❗ 대상 '{target}' 이(가) VECTORDB_DIRS에 등록되어 있지 않습니다.")
        return

    print("\n[Step 1] VectorDB 구축 테스트")
    if not os.path.exists(index_file):
        print("VectorDB가 없거나 index.faiss가 없으므로 새로 구축합니다.")
        build_vectordb(target)
    else:
        print("✅ 이미 구축된 VectorDB가 존재합니다.")

    # print("\n[Step 2] 문서 추가 테스트 (보안등급 상으로 추가)")
    # test_pdf_path = "./data/raw/regulation/test_document.pdf"

    # if not os.path.exists(test_pdf_path):
    #     print("❗ 테스트용 PDF 파일이 존재하지 않습니다. 경로: './data/raw/regulation/test_document.pdf'")
    #     return

    # add_document_to_vectordb(test_pdf_path, target, security_level="상")

    # print("\n[Step 3] VectorDB 문서 목록 조회 (보안등급 확인)")
    # entries = list_vectordb(vectordb_dir)
    # if not entries:
    #     print("❗ VectorDB에 문서가 없습니다.")
    # else:
    #     for file_name, security_level in entries:
    #         print(f"{file_name} | 보안등급: {security_level}")

    # print("\n[Step 4] 특정 문서 삭제 테스트 (test_document.pdf)")
    # delete_document_from_vectordb(target, "test_document.pdf")

    # print("\n[Step 5] VectorDB 문서 목록 재조회 (삭제 결과 확인)")
    # entries_after_delete = list_vectordb(vectordb_dir)
    # if not entries_after_delete:
    #     print("✅ 모든 문서가 삭제되어 VectorDB가 비어있습니다.")
    # else:
    #     for file_name, security_level in entries_after_delete:
    #         print(f"{file_name} | 보안등급: {security_level}")

    # print("\n[Step 6] VectorDB 전체 삭제 테스트")
    # delete_vectordb(vectordb_dir)

    # if not os.path.exists(vectordb_dir):
    #     print("✅ VectorDB 전체 삭제 완료")
    # else:
    #     print("❗ VectorDB 전체 삭제 실패")
        
test_vectordb_flow()


[Step 1] VectorDB 구축 테스트
VectorDB가 없거나 index.faiss가 없으므로 새로 구축합니다.


test_document 이미지 처리: 100%|██████████| 10842/10842 [00:16<00:00, 650.91it/s]
근로기준법(법률)(제20520호)(20250223) 이미지 처리: 100%|██████████| 1/1 [00:00<00:00, 142.89it/s]
Processing regulation PDFs: 100%|██████████| 2/2 [00:27<00:00, 13.85s/it]


✅ Saved VectorDB to ./vectordb/regulation_db


# Agents

| Agent | Description | DB |
|-------|-------------|----|
| Regulation Agent | 회사 내규 또는 법률에 대해 답변하는 Agent로, Regulation DB 기반 RAG 형식으로 구축 | Regulation DB |
| Space Agent | 사옥 내 정보, 인사 조직도 및 연락처 등 질문에 대해 답변하는 Agent로, Space DB 기반 RAG 형식으로 구축 | Space DB |
| Worker Agent | 사용자의 질문에 대하여 이전 Reference 자료 또는 양식(Template)을 찾아주는 Agent로, 사내 DB API를 호출하여 답변 | 사내 DB API |
| General Agent | 위 내용 외의 일반적인 질문(ex. 날씨 등)에 대해 답변하는 Agent | 외부 일반 데이터 |

In [6]:
client = OpenAI(api_key=OPENAI_API_KEY)

try:
    models = client.models.list()
    print("✅ API Key 인증 성공! 사용 가능한 모델:")
    for model in models.data[:5]:
        print("-", model.id)
except Exception as e:
    print("❌ API Key 인증 실패:", e)

✅ API Key 인증 성공! 사용 가능한 모델:
- gpt-4o-audio-preview-2024-12-17
- dall-e-3
- dall-e-2
- gpt-4o-audio-preview-2024-10-01
- gpt-4-turbo-preview


In [13]:
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langgraph.graph import StateGraph, END, START
from typing import TypedDict, Optional, Literal
from openai import OpenAI
import os

# Load environment
from dotenv import load_dotenv
load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# print("OpenAI API Key:", OPENAI_API_KEY)

# -----------------------
# VectorDB 로드 (FAISS)
# -----------------------

EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)

regulation_vectordb = FAISS.load_local("./vectordb/regulation_db", embeddings, index_name="index", allow_dangerous_deserialization=True)
space_vectordb = FAISS.load_local("./vectordb/space_db", embeddings, index_name="index", allow_dangerous_deserialization=True)

# -----------------------
# Company API Client
# -----------------------

class CompanyAPIClient:
    def __init__(self):
        self.templates = {
            "회의록": "회의록 양식: - 회의 일시 - 장소 - 참석자 - 주요 안건 - 논의 내용 - 결론 및 조치 사항",
            "출장 보고서": "출장 보고서 양식: - 출장지 - 출장 기간 - 출장 목적 - 주요 활동 - 결과 및 향후 계획",
            "경조사 신청": "경조사 신청서 양식: - 신청인 - 관계 - 경조사 종류 - 일시 및 장소 - 기타 사항",
        }

    def search_templates(self, query: str) -> Optional[str]:
        for keyword, template in self.templates.items():
            if keyword in query:
                return template
        return None

api_client = CompanyAPIClient()

# -----------------------
# LLM 초기화
# -----------------------

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# -----------------------
# Agents 정의
# -----------------------

def regulation_agent(user_input: str) -> str:
    docs = regulation_vectordb.similarity_search(user_input, k=3)
    context = "\n".join([d.page_content for d in docs])
    prompt = f"내규 문서:\n{context}\n\n질문: {user_input}"
    return llm.invoke(prompt).content

def space_agent(user_input: str) -> str:
    docs = space_vectordb.similarity_search(user_input, k=3)
    context = "\n".join([d.page_content for d in docs])
    prompt = f"사옥 문서:\n{context}\n\n질문: {user_input}"
    return llm.invoke(prompt).content

def worker_agent(user_input: str) -> str:
    template = api_client.search_templates(user_input)
    if template:
        return f"Template 추천:\n\n{template}"
    else:
        return llm.invoke(f"적절한 양식을 찾을 수 없습니다. 대신 일반 가이드를 작성하세요.\n\n{user_input}").content

def general_agent(user_input: str) -> str:
    return llm.invoke(user_input).content

# -----------------------
# Agent Mapping
# -----------------------

AGENT_MAP = {
    "regulation": regulation_agent,
    "space": space_agent,
    "worker": worker_agent,
    "general": general_agent,
}

# -----------------------
# LangGraph 상태 정의
# -----------------------

class AgentState(TypedDict):
    user_input: str
    intent: Optional[Literal["regulation", "space", "worker", "general"]]
    response: Optional[str]
    next_node: Optional[str]

# -----------------------
# Hybrid Supervisor (VectorDB + LLM)
# -----------------------

def supervisor_agent(state: AgentState) -> AgentState:
    user_input = state["user_input"]

    system_prompt = """
                    너는 사용자의 질문을 다음 4개 Intent 중 하나로 분류하는 에이전트이다.

                    Intent 종류:

                    - regulation: 회사 내규, 법률, 인사, 근태, 연차, 휴가, 경조사 등 규정이나 정책에 관한 질문
                    - space: 사옥, 조직도, 연락처, 좌석, 위치, 시설, 회의실 등 공간과 관련된 질문
                    - worker: 업무 처리 방법, 업무 양식, 템플릿 등 형식이나 절차와 관련된 질문
                    - general: 위에 해당하지 않는 일반적인 질문 (예: 날씨, 일상 대화, 회사와 관련 없는 질문 등)

                    규칙:

                    1. 반드시 위 4개 중 하나만 소문자로 정확히 골라서 답변하라.
                    2. 질문이 규정이나 인사 관련이면 regulation 으로 분류하라.
                    3. 질문이 사옥이나 공간 정보 관련이면 space 으로 분류하라.
                    4. 질문이 업무 양식이나 절차 관련이면 worker 로 분류하라.
                    5. 어떤 것도 아니거나 일상/일반적 질문이면 general 로 분류하라.
                    6. 그 외는 절대 다른 단어를 사용하지 말고 위 4개 중 하나로만 답하라.

                    질문: {input}
                    답변 (intent 중 하나만): 
                    """

    prompt = ChatPromptTemplate.from_messages([
        ("system", system_prompt),
        ("human", "{input}")
    ])

    chain = prompt | llm | (lambda x: x.content.strip().lower())
    intent = chain.invoke({"input": user_input})

    print(f"[Supervisor 최종 Intent]: {intent}")

    return {
        **state,
        "intent": intent,
        "next_node": intent
    }


# -----------------------
# 공통 Agent Executor
# -----------------------

def common_agent_executor(state: AgentState) -> AgentState:
    agent_name = state["next_node"]
    user_input = state["user_input"]

    agent = AGENT_MAP.get(agent_name)

    if agent is None:
        result = "적절한 Agent가 없습니다."
    else:
        result = agent(user_input)

    return {**state, "response": result}

# -----------------------
# Graph 구성 (START → Supervisor → Executor → END)
# -----------------------

graph = StateGraph(AgentState)

graph.add_node("supervisor", supervisor_agent)
graph.add_node("agent_executor", common_agent_executor)

graph.add_edge(START, "supervisor")
graph.add_conditional_edges("supervisor", lambda state: state["next_node"], {
    "regulation": "agent_executor",
    "space": "agent_executor",
    "worker": "agent_executor",
    "general": "agent_executor",
})
graph.add_edge("agent_executor", END)

runnable = graph.compile()

# -----------------------
# 테스트 실행
# -----------------------

inputs = {"user_input": "사내 카페 몇 층에 있나요?"}

result = runnable.invoke(inputs)

print("\n=== 최종 응답 ===")
print(result["response"])

[Supervisor 최종 Intent]: space

=== 최종 응답 ===
사내 카페는 지상 1층에 위치해 있습니다.
