# RAG 시스템 구현

## 개요

- 사용 데이터: 2024년 연말정산 신고 안내 문서

- 설계
| 태스크 | 기술 |
|----------------|-----------------------------|
| 데이터 전처리 | 데이터 로드, 전처리 |
| 사용 LLM | yanolja/YanoljaNEXT-EEVE-Instruct-7B-v2-Previewe  |
| 사용 임베딩 | dragonkue/BGE-m3-ko  |
| 사용 DB | FAISS |
| Cross-Encoder | BAAI/bge-reranker-v2-m3 |
| 사용 웹검색 엔진 | DuckDuckGoSearch |
| 사용 에이전트 프레임워크 | Langraph |

# 1. 환경 설정

In [None]:
!pip install langchain-huggingface

Collecting langchain-huggingface
  Downloading langchain_huggingface-1.2.0-py3-none-any.whl.metadata (2.8 kB)
Downloading langchain_huggingface-1.2.0-py3-none-any.whl (30 kB)
Installing collected packages: langchain-huggingface
Successfully installed langchain-huggingface-1.2.0


In [None]:
!pip install -U ddgs

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting ddgs
  Downloading ddgs-9.10.0-py3-none-any.whl.metadata (12 kB)
Collecting primp>=0.15.0 (from ddgs)
  Downloading primp-0.15.0-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (13 kB)
Collecting fake-useragent>=2.2.0 (from ddgs)
  Downloading fake_useragent-2.2.0-py3-none-any.whl.metadata (17 kB)
Collecting h2<5,>=3 (from httpx[brotli,http2,socks]>=0.28.1->ddgs)
  Downloading h2-4.3.0-py3-none-any.whl.metadata (5.1 kB)
Collecting socksio==1.* (from httpx[brotli,http2,socks]>=0.28.1->ddgs)
  Downloading socksio-1.0.0-py3-none-any.whl.metadata (6.1 kB)
Collecting hyperframe<7,>=6.1 (from h2<5,>=3->httpx[brotli,http2,socks]>=0.28.1->ddgs)
  Downloading hyperframe-6.1.0-py3-none-any.whl.metadata (4.3 kB)
Collecting hpack<5,>=4.1 (from h2<5,>=3->httpx[brotli,http2,socks]>=0.28.1->ddgs)
  Downloading hpack-4.1.0-py3-none-any.whl.metadata (4.6 kB)
Downloading ddgs-9.10.0-py3-none-any.whl (40 kB)
Downloading fake_useragent-2.2.0-py3-none-any.whl (161 kB)
Downloadi

In [1]:
# 데이터, 경로, 상태용
import os
import re
import math
import hashlib
from dataclasses import dataclass
from pathlib import Path
import pickle
from typing import List, Dict, Any, TypedDict, Tuple, Optional

# 허깅페이스 트랜스포머
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch

# Cross-Encoder Reranker
from sentence_transformers import CrossEncoder

# 랭체인
from langchain_core.documents import Document
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.llms import HuggingFacePipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS

from langchain_community.retrievers import BM25Retriever
from langchain_community.tools import DuckDuckGoSearchRun

# 랭그래프
from langgraph.graph import StateGraph, END

# langfuse 환경변수용
from langfuse.langchain import CallbackHandler
from dotenv import load_dotenv  # .env 파일에 정의된 환경 변수(API 키 등)를 현재 실행 환경에 로드
load_dotenv()  # .env 파일의 내용을 읽어 os.environ에 반영

  from .autonotebook import tqdm as notebook_tqdm


True

In [2]:
"""
Config 클래스 정의

- 상수 변수 정의
- @dataclass 데코레이터 추가
"""
@dataclass
class Config:
    # 데이터 폴더 생성
    root = Path(".")
    raw_dir = root / "data"
    data_dir = raw_dir / "2024년_원천징수의무자를위한연말정산신간고안내.pdf"
    model_dir = root / "models"

    # 청킹
    chunk_size = 1100
    chunk_overlap = 180

    # Retrieval
    dense_k: int = 30     # 임베딩 벡터에 쓰는 k
    sparse_k: int = 30    # bm25에 쓰는 k
    rrf_k: int = 60       # RRF에서 쓰는 k
    rrf_pool: int = 60    # RRF 결과 중 rerank 대상으로 올릴 후보 수

    # Rerank
    rerank_top_n: int = 24  # Cross-Encoder에게 넘길 후보 수(너무 크면 느림)
    final_k: int = 6        # 최종 컨텍스트에 포함할 top-k

    # Web fallback gate
    # Cross-Encoder 점수는 모델마다 스케일이 다를 수 있으므로 절대값 임계치 + 상대격차
    rerank_min_score: float = 0.15   # top1이 이보다 낮으면 "근거 부족"으로 판단
    rerank_min_margin: float = 0.03  # top1-top2가 너무 작으면 애매하다고 보고 fallback 고려

    max_context_chars: int = 5000     # LLM 컨텍스트 최대 길이

    # 생성용 파라미터
    max_new_tokens: int = 512
    temperature: float = 0.0

    # 디바이스 설정
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 후처리 함수 설정
    def __post_init__(self):
        self.raw_dir.mkdir(parents=True, exist_ok=True)
        self.model_dir.mkdir(parents=True, exist_ok=True)

In [3]:
# 변수 생성 및 폴더 생성
cfg = Config()

# 2. 모델 로드

In [4]:
# 모델명 변수
model_id = "yanolja/YanoljaNEXT-EEVE-Instruct-7B-v2-Preview"

# 모델 로컬 저장 경로 지정
eeve_dir = cfg.model_dir / "eeve"

# 토크나이저 및 모델 생성
tokenizer = AutoTokenizer.from_pretrained(model_id, cache_dir=eeve_dir)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    cache_dir=eeve_dir,
    device_map="auto",
    # 4bit로 불러오기
    load_in_4bit=True,
)

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.
Loading checkpoint shards: 100%|██████████| 4/4 [00:08<00:00,  2.22s/it]


In [5]:
# 랭체인용 파이프라인 생성
text_gen_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    max_new_tokens=cfg.max_new_tokens,
    do_sample=False,
    temperature=None,
    top_p=None,
    top_k=None,
    repetition_penalty=1.1,
    return_full_text=False,
)

Device set to use cuda:0


In [6]:
# 랭체인에 사용할 LLM 래퍼
llm = HuggingFacePipeline(pipeline=text_gen_pipeline)

  llm = HuggingFacePipeline(pipeline=text_gen_pipeline)


In [7]:
# 임베딩 모델 생성
# normalize_embeddings로 코사인 유사도 안정화
embeddings = HuggingFaceEmbeddings(
    model_name="dragonkue/BGE-m3-ko",
    encode_kwargs={"normalize_embeddings": True}
)

  embeddings = HuggingFaceEmbeddings(


In [8]:
# 랭퓨즈 초기화
langfuse_handler = CallbackHandler()

# 3. PDF 로드 및 처리

In [9]:
# PDF 로드
loader = PyPDFLoader(str(cfg.data_dir))
raw_docs = loader.load()

In [10]:
# 데이터 청킹
splitter = RecursiveCharacterTextSplitter(
    chunk_size=cfg.chunk_size,
    chunk_overlap=cfg.chunk_overlap,
    separators=["\n\n", "\n", "•", "○", "①", "②", "③", ".", " ", ""],
)

docs = splitter.split_documents(raw_docs)

In [11]:
# 각 문서 청크(Document)에 고유한 chunk_id를 부여
# 목적
# 1. RRF(Dense + Sparse 결과 병합) 시 동일 청크 식별
# 2. Reranking, 중복 제거 시 추적
# 3. 답변 근거 표시용

for i, d in enumerate(docs):
    # PDF loader가 자동으로 넣어주는 메타데이터 형태:
    # d.metadata = {"source":, "page":}

    # 문서 출처(source)를 문자열로 가져옴
    src = str(d.metadata.get("source", "pdf"))

    # PDF 페이지 번호를 문자열로 가져옴
    page = str(d.metadata.get("page", "NA"))

    # 고유 해시 생성
    # - source + page + 청크 인덱스를 결합
    # - 같은 PDF의 같은 페이지라도, 다른 청크면 다른 ID가 되도록 설계

    # 1. 고유 문자열 생성
    raw_key = src + "|" + page + "|" + str(i)

    # 2. 문자열을 바이트로 변환 (hash 함수 입력용)
    raw_bytes = raw_key.encode("utf-8")

    # 3. MD5 해시 계산: 32자리 16진수 문자열
    # 4. 앞 10자리만 사용
    h = hashlib.md5(raw_bytes).hexdigest()[:10]

    # 최종 chunk_id 구성
    # 형식: "{페이지}_{청크인덱스}_{해시}"
    d.metadata["chunk_id"] = f"{page}_{i}_{h}"

    # 결과적으로 d.metadata에는 다음이 추가됨:
    # {"source":, "page":, "chunk_id":}

In [12]:
# 청크 결과물 저장 경로
chunks_path = cfg.raw_dir / "chunks.pkl"

# 청크 불러오기
with open(chunks_path, "wb") as f:
    pickle.dump(docs, f)

In [13]:
# 저장한 청크 불러오기
if Path.exists(chunks_path):
    with open(chunks_path, "rb") as f:
        docs = pickle.load(f)

In [14]:
# 청크 샘플 확인
docs[1]

Document(metadata={'producer': 'ezPDF Builder Supreme', 'creator': 'PyPDF', 'creationdate': '2024-12-22T23:44:00+09:00', 'moddate': '2025-01-09T17:28:20+09:00', 'source': 'data/2024년_원천징수의무자를위한연말정산신간고안내.pdf', 'total_pages': 426, 'page': 1, 'page_label': '2', 'chunk_id': '1_1_32862a711b'}, page_content='머 리 말\n어려운 경제 상황 속에서도 항상 성실하게 원천징수의무를 이행하여 국세행정에  \n협력해주신 원천징수의무자(회사)와 성실하게 세금을 납부하신 근로자 여러분께 진심\n으로 감사드립니다.\n그동안 국세청은 「연말정산 미리보기」, 「편리한 연말정산」 시스템과 「간소화자료  \n일괄제공」 서비스를 시행하는 등 디지털 납세서비스를 제공하고 「맞춤형안내」를 통해 \n성실 납세를 지원해 왔습니다.\n더불어, 2024년 귀속부터는 소득기준 초과 부양가족에 대한 자료 조회 및 다운로드를  \n제한하여 납세자의 실수를 최소화하고 정확한 연말정산을 지원하기 위해 「간소화  \n서비스」를 전면 개편합니다.\n이 책자에는 세법 개정사항과 최신 예규, 소득·세액공제신고서 작성 방법, 홈택스를 \n이용한 지급명세서 제출 방법 등을 분야별로 담아 납세자에게 실질적인 도움이 될 수 \n있도록 구성하였습니다.\n책자에 수록된 내용 외에도 연말정산 주요정보, 동영상 자료, 계산 사례, 체크리스트 등 \n다양한 자료를 국세청 누리집(www.nts.go.kr) 연말정산 종합안내에서 제공하고 있습니다.\n본 책자가 연말정산에 관한 납세자 여러분의 궁금증을 해소하고, 원천징수의무 이행에 \n유용한 참고자료로 활용되어 성실신고·납부에 도움이 되기를 바랍니다.\n2024년 12월\n법인납세국장     \n연말정산\n신고안내\n2024 원천징수의무자를 위한')

# 4. Retriever 처리

In [15]:
# FAISS 벡터 DB 생성
# docs는 split된 Document 리스트
vectorstore = FAISS.from_documents(
    docs,
    embeddings
)

In [16]:
# Faiss 벡터 DB 저장 경로
faiss_path = cfg.raw_dir / "faiss"

# Faiss 벡터 저장
vectorstore.save_local(faiss_path)

In [17]:
# Faiss 불러오기
if Path.exists(faiss_path):
    vectorstore = FAISS.load_local(
        faiss_path,
        embeddings,
        allow_dangerous_deserialization=True
    )

In [18]:
# 임베딩 Retriever 생성
dense_retriever = vectorstore.as_retriever(
    search_kwargs={"k": cfg.dense_k}
)

In [19]:
# bm25 Retriever 생성
bm25_retriever = BM25Retriever.from_documents(docs)

# bm25 저장 경로
bm25_path = cfg.raw_dir / "bm25.pkl"

# 저장
with bm25_path.open("wb") as f:
    pickle.dump(bm25_retriever, f)

In [20]:
# 저장한 bm25 불러오기
with bm25_path.open("rb") as f:
    bm25_retriever = pickle.load(f)

In [21]:
# bm25 Retriever 설정
bm25_retriever.k = cfg.sparse_k

In [22]:
# RRF (Reciprocal Rank Fusion)
def rrf_fuse(
    dense_docs: List[Document],   # Dense(임베딩 기반) 검색 결과 (순서 있음)
    sparse_docs: List[Document],  # Sparse(BM25 등 키워드 기반) 검색 결과 (순서 있음)
    k: int,
) -> List[Tuple[Document, float]]:
    """
    RRF 공식:
        score(d) = Σ 1 / (k + rank)

    - 각 검색기의 순위를 점수로 변환하여 누적
    - 여러 검색기에서 동시에 상위권인 문서를 우선시
    - 반환: (Document, RRF 점수) 리스트
    """

    # scores:
    # - key   : 문서 고유 ID (chunk_id)
    # - value : RRF 누적 점수
    scores: Dict[str, float] = {}

    # pool:
    # - key   : 문서 고유 ID (chunk_id)
    # - value : 실제 Document 객체
    # - 정렬은 scores로 하고, 반환 시 Document를 꺼내기 위해 사용
    pool: Dict[str, Document] = {}

    # 하나의 검색 결과 리스트(dense 또는 sparse)를 RRF 점수로 변환하여 누적하는 내부 함수
    def add(rank_list: List[Document]):
        # enumerate(start=1):
        # RRF에서 rank는 1부터 시작하는 것이 표준
        for r, d in enumerate(rank_list, start=1):

            # 문서를 유일하게 식별하기 위한 ID
            # - chunk_id가 있으면 사용
            # - 없을 경우, 임시로 텍스트 앞부분을 사용 (fallback)
            doc_id = d.metadata.get("chunk_id") or d.page_content[:80]

            # 문서 ID에 해당하는 Document 객체를 저장
            # - 나중에 정렬된 ID를 Document로 되돌리기 위함
            pool[doc_id] = d

            # RRF 점수 누적:
            # - 이미 점수가 있으면 더하고
            # - 없으면 0에서 시작
            # - r이 작을수록(상위 순위일수록) 더 큰 점수 기여
            scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + r)

    # Dense 검색 결과를 RRF 점수에 반영
    add(dense_docs)

    # Sparse 검색 결과를 RRF 점수에 반영
    add(sparse_docs)

    # 누적된 RRF 점수를 기준으로 문서 ID를 내림차순 정렬
    # scores.items(): (doc_id, score) 형태
    ranked = sorted(
        scores.items(),
        key=lambda x: x[1],  # score 기준 정렬
        reverse=True         # 점수 높은 순
    )

    # 정렬된 doc_id를 실제 Document 객체로 변환하여 반환
    # 반환 형태: [(Document, score), ...]
    return [(pool[doc_id], score) for doc_id, score in ranked]

In [23]:
# Cross-Encoder Reranker 설정
# RRF 이후 후보 문서들을 query-문서 쌍 단위로 평가

# Cross-Encoder reranker 모델 ID
RERANK_MODEL_ID = "BAAI/bge-reranker-v2-m3"

# reranker 모델을 저장할 로컬 디렉터리
rerank_dir = cfg.model_dir / "rerank"

# CrossEncoder 인스턴스 생성
reranker = CrossEncoder(
    RERANK_MODEL_ID,
    cache_dir=rerank_dir,
    device="cuda" if torch.cuda.is_available() else "cpu"
)

# Cross-Encoder 기반 재정렬 함수
def cross_encoder_rerank(
    query: str,
    candidates: List[Document],
    top_n: int,
) -> List[Tuple[Document, float]]:
    """
    Cross-Encoder를 사용해 (query, document) 쌍마다 점수를 계산한 뒤
    점수 기준으로 내림차순 정렬하여 반환한다.

    반환 형태:
        [(Document, score), ...]
    """
    # 상위 top_n개만 사용
    candidates = candidates[:top_n]

    # Cross-Encoder 입력 형태 생성
    # 각 문서에 대해 (query, document_text) 쌍
    pairs = [(query, d.page_content) for d in candidates]

    scores = reranker.predict(pairs)

    # 문서와 점수를 묶어서 정렬
    # zip으로 (Document, score) 형태로 묶음
    reranked = sorted(
        list(zip(candidates, [float(s) for s in scores])),
        key=lambda x: x[1],   # score 기준
        reverse=True
    )

    return reranked


The CrossEncoder `cache_dir` argument was renamed and is now deprecated, please use `cache_folder` instead.


In [24]:
# 최종 context 생성 함수
def build_context_from_docs(
    ranked_docs: List[Tuple[Document, float]],
    final_k: int,
    max_context_chars: int,
) -> str:
    parts = []
    total = 0

    # 최종 k기준으로 처리(max_context_chars까지)
    for doc, score in ranked_docs[:final_k]:
        page = doc.metadata.get("page", "NA")
        cid = doc.metadata.get("chunk_id", "NA")

        block = f"[p.{page} | {cid} | rerank={score:.3f}]\n{doc.page_content}"
        if total + len(block) > max_context_chars:
            break

        parts.append(block)
        total += len(block)

    return "\n\n".join(parts).strip()

# 5. Query 처리

In [25]:
# Query Rewriter
rewrite_prompt = ChatPromptTemplate.from_template(
"""
너는 '국세청 연말정산 안내서(PDF)'에서 검색이 잘 되도록 질문을 재작성하는 Query Rewriter다.

목표:
- 문서에서 찾기 쉬운 "검색용 질문 1개"로 바꾼다.

규칙:
1. 원래 의미는 유지한다.
2. 연말정산/원천징수/간소화/공제/세액공제/제출기한/홈택스 경로 같은 문서 용어를 우선 사용한다.
3. 기간/연도(2024, 2025), 조건(소득요건, 한도)을 가능하면 포함한다.
4. 출력은 반드시 한 줄 한국어. 불릿/설명/따옴표/추가 문장 금지.

[원문 질문]
{q}

[출력: 검색용 질문 1줄]
"""
)

# Query rewriter 함수 작성
def rewrite_query(q: str) -> str:
    out = (rewrite_prompt | llm).invoke(
        {"q": q},
        config={"callbacks": [langfuse_handler]},
    )
    text = str(out).strip()

    # 출력 라인만 뽑기
    m = re.search(r"(?:출력\s*[:：]|검색용 질문\s*[:：])\s*(.*)", text, flags=re.IGNORECASE)
    if m:
        text = m.group(1).strip()
    else:
        # fallback: 첫 줄
        text = text.splitlines()[0].strip()

    # 같은 줄에 뒤로 더 붙으면 첫 줄만 유지
    text = text.splitlines()[0].strip()

    # 라벨 제거
    text = re.sub(r"^(Human:|Assistant:)\s*", "", text, flags=re.IGNORECASE).strip()

    # 따옴표/특수따옴표 제거, 공백 정리
    text = text.replace('"', "").replace("“", "").replace("”", "")
    text = re.sub(r"\s+", " ", text).strip()

    # rewritten이 너무 짧거나 깨지면 원문 사용
    if len(text) < 5:
        return q.strip()

    return text

# 6. 1차 답변 처리

In [26]:
# 답변 프롬프트
answer_prompt = ChatPromptTemplate.from_template(
"""
당신은 '국세청 연말정산 안내서(PDF)'의 **문구를 재진술하는 역할만** 수행한다.

규칙 (매우 중요):
1. [문서 근거]에 **명시적으로 존재하는 내용만** bullet로 옮긴다.
2. 문서에 없는 개념, 요약, 일반화, 재해석, 사례 추가를 절대 하지 않는다.
3. 질문에 정확히 대응되는 문장이 문서에 없으면
   반드시 "문서에 없는 정보입니다." 한 줄만 출력한다.
4. 질문을 쪼개거나, 새로운 질문을 만들지 않는다.
5. 답변은 bullet(-)로만 구성한다.

[문서 근거]
{context}

[질문]
{q}

[답변]
"""
)

# pdf 기반 답변 함수 작성
def answer_from_pdf_context(q: str, context: str) -> str:
    out = (answer_prompt | llm).invoke(
        {"q": q, "context": context},
        config={"callbacks": [langfuse_handler]},
    )
    text = str(out)

    # '[답변]' 이후만 가져오도록 고정
    if "[답변]" in text:
        text = text.split("[답변]", 1)[1]

    # Human/Assistant 라벨 제거
    text = re.sub(r"^\s*(Human:|Assistant:)\s*", "", text, flags=re.IGNORECASE | re.MULTILINE).strip()

    # 라인 단위 정리
    lines = [ln.strip() for ln in text.splitlines() if ln.strip()]

    # 문서 명사 토큰
    doc_tokens = set(re.findall(r"[가-힣]{2,}", context))

    bullets = []
    for ln in lines:
        # bullet만 수집
        if not ln.startswith(("-", "•", "○")):
            continue

        # 근거성 체크(너무 빡세면 답이 다 날아가니까 2개 교집합만 요구)
        nouns = set(re.findall(r"[가-힣]{2,}", ln))
        if doc_tokens and len(nouns & doc_tokens) < 2:
            continue

        bullets.append(ln)

    if not bullets:
        return "문서에 없는 정보입니다."

    # 중복 bullet 제거(동일 문장 반복 방지)
    uniq = []
    seen = set()
    for b in bullets:
        key = re.sub(r"\s+", " ", b)
        if key not in seen:
            uniq.append(b)
            seen.add(key)

    return "\n".join(uniq)

# 7. 웹검색 기반 2차 답변 처리

In [27]:
# 문서를 우선 탐색하되, 웹 검색으로 넘어갈지 판단하는 게이트(gate) 함수 생성
#
# 목적:
# - RAG의 1차 근거는 "PDF 문서"로 유지
# - 문서 검색 결과가 부실/애매할 때만 웹검색(DuckDuckGo)으로 보조
def need_web_fallback(
    reranked: List[Tuple[Document, float]],
    context: str,
    min_score: float,
    min_margin: float,
) -> bool:
    """
    웹검색 fallback 조건

    1. reranked 없음 -> True
    2. top1이 매우 낮음 -> True
    3. top1이 낮은 편인데(top1 < strong_enough) top1-top2도 작으면 애매 -> True
    4. context가 거의 비었으면 -> True
    """
    if not reranked:
        return True

    top1 = float(reranked[0][1])

    # 컨텍스트가 사실상 비었으면 웹 고려
    if not context or len(context.strip()) < 200:
        return True

    # top1이 일정 점수보다 높으면 문서 근거 충분으로 보고 margin 체크 자체를 생략
    strong_enough = min_score + 0.10  # (기본 min_score=0.15라면 strong_enough=0.25)

    if top1 < min_score:
        return True

    # top1이 충분히 높으면 애매해도 웹으로 안 감
    if top1 >= strong_enough:
        return False

    # 여기부터는 top1이 높진 않은 구간 -> margin으로 애매함 판단
    if len(reranked) >= 2:
        top2 = float(reranked[1][1])
        if (top1 - top2) < min_margin:
            return True

    return False

In [28]:
# duckduckgo 검색 엔진 생성
ddg = DuckDuckGoSearchRun()

web_answer_prompt = ChatPromptTemplate.from_template(
"""
너는 검색 결과를 **사실 그대로 요약만 하는 도구**다.

절대 금지:
- 검증, 평가, 수정, 요청, 재질문
- 가정, 추론, 보완 설명
- 메타 발언 ("검증 결과", "수정 요청" 등)

규칙:
1. 검색 결과에 실제로 포함된 문장만 사용한다.
2. 각 항목은 불릿(-) 하나의 문장으로 작성한다.
3. 최대 3개 항목까지만 출력한다.
4. 불릿 외의 텍스트는 절대 출력하지 않는다.

[질문]
{q}

[검색 결과]
{search_results}

[출력]
"""
)

# 웹기반 답변 생성 함수
def web_fallback_answer(q: str) -> str:
    # DDG 검색
    search_results = ddg.run(q)

    # 프롬프트 주입
    out = (web_answer_prompt | llm).invoke(
        {"q": q, "search_results": search_results},
        config={"callbacks": [langfuse_handler]},
    )
    text = str(out).strip()

    # 후처리: '-' 불릿만 남기고 최대 3줄로 컷 (모델이 딴소리하면 강제로 잘라냄)
    lines = []
    for ln in text.splitlines():
        ln = ln.strip()
        if ln.startswith("- "):
            lines.append(ln)
        if len(lines) >= 3:
            break

    # 불릿이 없으면 최소한 형태 강제
    if not lines:
        return "- (추가 참고) 검색 결과 요약을 생성하지 못했습니다. ※ 국세청/홈택스 공식 공지 확인 권장"

    return "\n".join(lines)

# 8. 랭그래프 에이전트

In [29]:
# 상태 정의
class RAGState(TypedDict):
    question: str   # query
    rewritten: str  # rewrite 처리된 query

    # retrieval 파트
    rrf_ranked: List[Tuple[Document, float]] # rrf 처리된 결과물
    reranked: List[Tuple[Document, float]]   # Cross-Encoder로 rerank된 결과물
    context: str

    # outputs
    answer: str     # 기본 답변
    need_web: bool  # 웹 검색 게이트
    web_answer: str # 웬기반 답변

In [30]:
# rewrite 노드
def node_rewrite(state: RAGState) -> RAGState:
    q = state["question"]
    rq = rewrite_query(q)
    # 각 노드는 상태를 수정하는 것이 아닌 새로운 상태를 반환
    return {**state, "rewritten": rq}

In [31]:
# pre-retrieve 노드
def node_retrieve_rrf(state: RAGState) -> RAGState:
    q = state["rewritten"]

    dense_docs = dense_retriever.invoke(q)
    sparse_docs = bm25_retriever.invoke(q)

    rrf_ranked = rrf_fuse(dense_docs, sparse_docs, k=cfg.rrf_k)
    rrf_ranked = rrf_ranked[:cfg.rrf_pool]

    return {**state, "rrf_ranked": rrf_ranked}

In [32]:
# post-retrieve 노드
def node_rerank(state: RAGState) -> RAGState:
    q = state["rewritten"]

    # RRF 결과에서 doc만 뽑아 reranker 후보로 사용
    candidates = [d for d, _ in state["rrf_ranked"]]
    reranked = cross_encoder_rerank(q, candidates, top_n=cfg.rerank_top_n)

    return {**state, "reranked": reranked}

In [33]:
# 웹검색 게이트
def node_gate(state: RAGState) -> RAGState:
    reranked = state["reranked"]

    context = build_context_from_docs(
        ranked_docs=reranked,
        final_k=cfg.final_k,
        max_context_chars=cfg.max_context_chars,
    )

    need_web = need_web_fallback(
        reranked=reranked,
        context=context,
        min_score=cfg.rerank_min_score,
        min_margin=cfg.rerank_min_margin,
    )

    return {**state, "context": context, "need_web": need_web}

In [None]:
# Answer 노드
def node_answer(state: RAGState) -> RAGState:
    q = state["question"]
    context = state["context"]

    ans = answer_from_pdf_context(q, context)

    # 답변 자체가 "문서에 없는 정보"면 web fallback 강제
    if ans.strip() == "문서에 없는 정보입니다.":
        return {
            **state,
            "answer": ans,
            "need_web": True,
        }

    return {**state, "answer": ans}

In [41]:
# 웹 노드
def node_web(state: RAGState) -> RAGState:
    q = state["question"]
    wab = web_fallback_answer(q)
    return {**state, "web_answer": wab}

In [42]:
# pdf 기반 답변 + 웹 답변 병합 노드
def node_merge(state: RAGState) -> RAGState:
    """
    문서 답변 + (추가 참고) 웹 검색을 분리해서 붙인다.
    """
    if state.get("need_web", False):
        merged = state["answer"].strip() + "\n\n(추가 참고: DuckDuckGo 웹 검색)\n" + state.get("web_answer", "").strip()
        return {**state, "answer": merged}
    return state

In [43]:
# 조건 분기 함수 정의
# answer 노드 실행 후, state를 보고 다음 노드를 결정
def route_after_answer(state: RAGState):
    # state["need_web"]가 True이면 웹검색 노드로 이동
    # False이거나 없으면 바로 merge 단계로 이동
    return "web" if state.get("need_web", False) else "merge"

In [44]:
# LangGraph 구성: RAG 전체 파이프라인 정의

# StateGraph 생성
g = StateGraph(RAGState)

# 노드 등록

# 1. 사용자 질문을 검색 친화적으로 재작성
g.add_node("rewrite", node_rewrite)

# 2. Dense + Sparse 검색 후 RRF로 통합 retrieval
g.add_node("retrieve_rrf", node_retrieve_rrf)

# 3. Cross-Encoder 기반 재정렬
g.add_node("rerank", node_rerank)

# 4. 웹 노드 게이트
g.add_node("gate", node_gate)

# 4. 문서 기반 답변 생성 (1차 답변 시도)
g.add_node("answer", node_answer)

# 5. 외부 웹검색 노드
g.add_node("web", node_web)

# 6. 문서 답변 + 웹 답변 최종 병합
g.add_node("merge", node_merge)

# Entry point 설정
g.set_entry_point("rewrite")

# 기본적인 직선형(edge) 흐름 정의
g.add_edge("rewrite", "retrieve_rrf")
g.add_edge("retrieve_rrf", "rerank")
g.add_edge("rerank", "gate")
g.add_edge("gate", "answer")

# 조건부 edge 등록
# answer 노드 이후에만 조건 분기 발생
g.add_conditional_edges(
    "answer",
    route_after_answer,
    {
        "web": "web",         # "web" 반환 시 web 노드
        "merge": "merge",     # "merge" 반환 시 merge 노드
    }
)

# web 노드를 거쳤을 경우의 흐름
# - 웹검색 결과를 얻은 뒤, merge 단계로 합류
g.add_edge("web", "merge")

# merge 이후 그래프 종료
g.add_edge("merge", END)

# 그래프 컴파일
app = g.compile()


In [47]:
# 실행 예시
tests = [
    "연말정산 때 비거주자는 세액공제신고서를 어떻게 작성해야 해?",
    "2024년 개정 세법 중에 월세와 관련한 내용이 있을까?",
    "외국인 근로자의 경우에는 어떤 점에 주의해야 할까?",
    "연말정산 간소화자료 일괄제공 서비스의 동의 기간이 언제야?", # 문서에 없음: web fallback 확인용
    "해외주식 양도소득도 공제 대상이야?",                     # 문서에 없음: web fallback 확인용
    "프리랜서는 세액공제신고서 작성을 어떻게 해야 해?"          # 문서에 없음: web fallback 확인용
]

for q in tests:
    print("\n" + "="*80)
    print("Q:", q)
    out = app.invoke({"question": q})
    print("\nA:\n", out["answer"])
    # 디버깅용
    print("rewritten:", out["rewritten"])
    if out.get("reranked"):
        print("top1 rerank:", out["reranked"][0][1])
    print("need_web:", out.get("need_web"))


Q: 연말정산 때 비거주자는 세액공제신고서를 어떻게 작성해야 해?

A:
 • 비거주자의 국내원천소득에 대해서는 「소득세법」 제122조에 따라 거주자의 계산규정을 준용합니다.
• 「소득세법」 제51조 제3항에 따른 인적공제 중 비거주자 본인 외의 자에 대한 공제는 적용되지 않습니다.
• 「소득세법」 제59조의2에 따른 자녀세액공제 역시 적용되지 않습니다.
• 「소득세법」 제59조의4에 따른 특별세액공제 역시 적용되지 않습니다.
• 「소득세법」 제59조에 따른 근로소득세액공제만 적용됩니다.
• 「소득세법」 제59조에서 규정하는 근로소득세액공제만을 적용합니다.
rewritten: 비거주자의 연말정산 세액공제신고서 작성 방법
top1 rerank: 0.9649685025215149
need_web: False

Q: 2024년 개정 세법 중에 월세와 관련한 내용이 있을까?

A:
 • 총급여액 8천만원 이하인 근로자 및 성실사업자 (p.215)
• 국민주택규모(85㎡ 이하) 또는 기준시가 4억원 이하 주택 임차자 (p.215)
• 월세액 연간 1,000만원 한도 (p.216)
rewritten: 월세 관련 소득 공제 혜택 변경 사항 확인 (2024년 기준)
top1 rerank: 0.9686283469200134
need_web: False

Q: 외국인 근로자의 경우에는 어떤 점에 주의해야 할까?

A:
 - ’24년 귀속 급여 중 미지급 급여는 해당 과세기간의 12월 31일에 지급한 것으로 보임
- 일용근로자는 건설공사 종사자 제외 3개월 이상 연속 근무 시 일반급여자로 처리
- 인정상여는 주택자금 대여 이자, 퇴직금 인정 이자, 회사 제품 무상 지급 현물 급여 등 포함
- 단, 할인판매가격이 법인 취득가액 이상이고 일반 소비자 판매가 대비 현저히 낮지 않은 경우 제외
- 국내원천소득에는 근로소득, 거주자 외국인 임원의 급여, 내국법인 임원의 급여, 법인세 상여금 포함
- 연말정산 시 거주자와 동일한 계산규정을 적용하되, 비거주자 본인 외 인적공제, 특별소득공