In [3]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.vectorstores import FAISS
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.document_loaders import PyMuPDFLoader
from langchain.tools.retriever import create_retriever_tool
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.runnables.history import RunnableWithMessageHistory

# 환경 설정

In [31]:
# API 키를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()

True

# Agent Message 함수

In [10]:
from dataclasses import dataclass
from typing import Any, Dict, List, Callable
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain.agents.output_parsers.tools import ToolAgentAction

In [11]:
# 도구 호출 시 실행되는 콜백 함수입니다.
def tool_callback(tool) -> None:
    print("[도구 호출]")
    print(f"Tool: {tool.get('tool')}")  # 사용된 도구의 이름을 출력합니다.
    if tool_input := tool.get("tool_input"):  # 도구에 입력된 값이 있다면
        for k, v in tool_input.items():
            print(f"{k}: {v}")  # 입력값의 키와 값을 출력합니다.
    print(f"Log: {tool.get('log')}")  # 도구 실행 로그를 출력합니다.


# 관찰 결과를 출력하는 콜백 함수입니다.
def observation_callback(observation) -> None:
    print("[관찰 내용]")
    print(f"Observation: {observation.get('observation')}")  # 관찰 내용을 출력합니다.


# 최종 결과를 출력하는 콜백 함수입니다.
def result_callback(result: str) -> None:
    print("[최종 답변]")
    print(result)  # 최종 답변을 출력합니다.

In [12]:
@dataclass
class AgentCallbacks:
    """
    에이전트 콜백 함수들을 포함하는 데이터 클래스입니다.

    Attributes:
        tool_callback (Callable[[Dict[str, Any]], None]): 도구 사용 시 호출되는 콜백 함수
        observation_callback (Callable[[Dict[str, Any]], None]): 관찰 결과 처리 시 호출되는 콜백 함수
        result_callback (Callable[[str], None]): 최종 결과 처리 시 호출되는 콜백 함수
    """

    tool_callback: Callable[[Dict[str, Any]], None] = tool_callback
    observation_callback: Callable[[Dict[str, Any]], None] = observation_callback
    result_callback: Callable[[str], None] = result_callback

In [13]:
class AgentStreamParser:
    """
    에이전트의 스트림 출력을 파싱하고 처리하는 클래스입니다.
    """

    def __init__(self, callbacks: AgentCallbacks = AgentCallbacks()):
        """
        AgentStreamParser 객체를 초기화합니다.

        Args:
            callbacks (AgentCallbacks, optional): 파싱 과정에서 사용할 콜백 함수들. 기본값은 AgentCallbacks()입니다.
        """
        self.callbacks = callbacks
        self.output = None

    def process_agent_steps(self, step: Dict[str, Any]) -> None:
        """
        에이전트의 단계를 처리합니다.

        Args:
            step (Dict[str, Any]): 처리할 에이전트 단계 정보
        """
        if "actions" in step:
            self._process_actions(step["actions"])
        elif "steps" in step:
            self._process_observations(step["steps"])
        elif "output" in step:
            self._process_result(step["output"])

    def _process_actions(self, actions: List[Any]) -> None:
        """
        에이전트의 액션들을 처리합니다.

        Args:
            actions (List[Any]): 처리할 액션 리스트
        """
        for action in actions:
            if isinstance(action, (AgentAction, ToolAgentAction)) and hasattr(
                action, "tool"
            ):
                self._process_tool_call(action)

    def _process_tool_call(self, action: Any) -> None:
        """
        도구 호출을 처리합니다.

        Args:
            action (Any): 처리할 도구 호출 액션
        """
        tool_action = {
            "tool": getattr(action, "tool", None),
            "tool_input": getattr(action, "tool_input", None),
            "log": getattr(action, "log", None),
        }
        self.callbacks.tool_callback(tool_action)

    def _process_observations(self, observations: List[Any]) -> None:
        """
        관찰 결과들을 처리합니다.

        Args:
            observations (List[Any]): 처리할 관찰 결과 리스트
        """
        for observation in observations:
            observation_dict = {}
            if isinstance(observation, AgentStep):
                observation_dict["observation"] = getattr(
                    observation, "observation", None
                )
            self.callbacks.observation_callback(observation_dict)

    def _process_result(self, result: str) -> None:
        """
        최종 결과를 처리합니다.

        Args:
            result (str): 처리할 최종 결과
        """
        self.callbacks.result_callback(result)
        self.output = result

# 도구 정의

In [32]:
### 1-1. Search 도구 ###
# TavilySearchResults 클래스의 인스턴스를 생성합니다
# k=6은 검색 결과를 6개까지 가져오겠다는 의미입니다
search = TavilySearchResults(k=6)

In [17]:
import pickle
import os

def load_documents_from_pkl(filepath):
    """
    Pickle 파일에서 Langchain Document 리스트를 불러오는 함수

    Args:
        filepath: 원본 파일 경로 (예: path/to/filename.pdf)
    Returns:
        Langchain Document 객체 리스트
    """
    # 확장자 제거하고 절대 경로로 변환
    abs_path = os.path.abspath(filepath)
    base_path = os.path.splitext(abs_path)[0]
    pkl_path = f"{base_path}.pkl"

    with open(pkl_path, "rb") as f:
        documents = pickle.load(f)
    return documents

In [18]:
import glob
from pathlib import Path

# extract_path 디렉토리에서 모든 .pkl 파일 찾기
extract_path = "/root/Project/aayn/teddynote-parser-api-client/example/parsing_outputs/84955c6c-9975-47cd-a4af-b9da78d548a5"
pkl_files = glob.glob(str(Path(extract_path) / "*" / "*.pkl"))

if not pkl_files:
    print("❌ extract_path에서 .pkl 파일을 찾을 수 없습니다.")
else:
    # 모든 .pkl 파일에서 문서 로드
    all_documents = []
    for pkl_file in pkl_files:
        print(f"📄 {pkl_file} 파일 로드 중...")  # 한국어 코멘트
        documents = load_documents_from_pkl(pkl_file)
        all_documents.extend(documents)

    print(f"✅ 총 {len(all_documents)}개의 문서가 로드되었습니다.")

📄 /root/Project/aayn/teddynote-parser-api-client/example/parsing_outputs/84955c6c-9975-47cd-a4af-b9da78d548a5/84955c6c-9975-47cd-a4af-b9da78d548a5/84955c6c-9975-47cd-a4af-b9da78d548a5_RetirementPensionSubscriberTraining.pkl 파일 로드 중...
✅ 총 40개의 문서가 로드되었습니다.


In [20]:
### 1-2. PDF 문서 검색 도구 (Retriever) ###
# PDF 파일 로드. 파일의 경로 입력
loader = PyMuPDFLoader("/root/Project/aayn/pdf/RetirementPensionSubscriberTraining.pdf")

# 텍스트 분할기를 사용하여 문서를 분할합니다.
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

# 문서를 로드하고 분할합니다.
split_docs = loader.load_and_split(text_splitter)
print(f"✅ 총 {len(split_docs)}개의 문서가 로드되었습니다.")

✅ 총 15개의 문서가 로드되었습니다.


In [27]:
# VectorStore를 생성합니다.
vector = FAISS.from_documents(all_documents, OpenAIEmbeddings())

# Retriever를 생성합니다.
retriever = vector.as_retriever()

retriever_tool = create_retriever_tool(
    retriever,
    name="pdf_search",  # 도구의 이름을 입력합니다.
    description="use this tool to search information from the PDF document",  # 도구에 대한 설명을 자세히 기입해야 합니다!!
)

In [33]:
### 1-3. tools 리스트에 도구 목록을 추가합니다 ###
# tools 리스트에 search와 retriever_tool을 추가합니다.
tools = [retriever_tool]

########## 2. LLM 을 정의합니다 ##########
# LLM 모델을 생성합니다.
llm = ChatOpenAI(model="gpt-4o", temperature=0)

########## 3. Prompt 를 정의합니다 ##########

# Prompt 를 정의합니다 - 이 부분을 수정할 수 있습니다!
# Prompt 정의
prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a helpful assistant. "
            "Make sure to use the `pdf_search` tool for searching information from the PDF document. "
            "If you can't find the information from the PDF document, use the `search` tool for searching information from the web.",
        ),
        ("placeholder", "{chat_history}"),
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ]
)

# Agent 정의

In [34]:
########## 4. Agent 를 정의합니다 ##########

# 에이전트를 생성합니다.
# llm, tools, prompt를 인자로 사용합니다.
agent = create_tool_calling_agent(llm, tools, prompt)

########## 5. AgentExecutor 를 정의합니다 ##########

# AgentExecutor 클래스를 사용하여 agent와 tools를 설정하고, 상세한 로그를 출력하도록 verbose를 True로 설정합니다.
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=False)

########## 6. 채팅 기록을 수행하는 메모리를 추가합니다. ##########

# session_id 를 저장할 딕셔너리 생성
store = {}


# session_id 를 기반으로 세션 기록을 가져오는 함수
def get_session_history(session_ids):
    if session_ids not in store:  # session_id 가 store에 없는 경우
        # 새로운 ChatMessageHistory 객체를 생성하여 store에 저장
        store[session_ids] = ChatMessageHistory()
    return store[session_ids]  # 해당 세션 ID에 대한 세션 기록 반환


# 채팅 메시지 기록이 추가된 에이전트를 생성합니다.
agent_with_chat_history = RunnableWithMessageHistory(
    agent_executor,
    # 대화 session_id
    get_session_history,
    # 프롬프트의 질문이 입력되는 key: "input"
    input_messages_key="input",
    # 프롬프트의 메시지가 입력되는 key: "chat_history"
    history_messages_key="chat_history",
)


In [35]:
########## 7. Agent 파서를 정의합니다. ##########
agent_stream_parser = AgentStreamParser()

In [36]:
########## 8. 에이전트를 실행하고 결과를 확인합니다. ##########

# 질의에 대한 답변을 출력합니다.
response = agent_with_chat_history.stream(
    {"input": "퇴직연금제도의 개인형퇴직연금 관련 내용을 문서에서 찾아줘"},
    # 세션 ID를 설정합니다.
    # 여기서는 간단한 메모리 내 ChatMessageHistory를 사용하기 때문에 실제로 사용되지 않습니다
    config={"configurable": {"session_id": "abc123"}},
)

for step in response:
    agent_stream_parser.process_agent_steps(step)

[도구 호출]
Tool: pdf_search
query: 개인형퇴직연금
Log: 
Invoking: `pdf_search` with `{'query': '개인형퇴직연금'}`



[관찰 내용]
Observation: 퇴직연금
가입자 교육

# 연금수령 시 일시금 수령보다 세금이 30% 감액됩니다.
# 퇴직급여의 개인형퇴직연금(IRP) 이전 효과
# • IRP의 3가지 효과
참고 개인형퇴직연금제도 연간 납입액 한도와 세액공제
▶ 근로자가 본인 희망 시 자신의 비용 부담으로 연간 1,800만원까지(연금저축, DC, IRP 합산) 추가 납입할 수 있으며,
총 700만원 한도에서 세액공제를 받을 수 있습니다.
※ 50세 이상인 가입자는 연간 900만원(2020년 ~ 2022년 3년간 한시 적용)
총 700만원 한도
연금계좌 세액공제율 (지방소득세 포함)
연 400만원(퇴직연금 + 연금저축)
연 300만원(퇴직연금에 한해 추가)
총 급여액 5천 5백만원 이하 : 16.5%
총 급여액 5천 5백만원 초과 : 13.2%
▶ 세액공제대상 한도

# 개인형퇴직연금(IRP) 적립금 운용
• 퇴직연금사업자가 제공하는 운용상품 중 가입자의 투자 성향에 맞는 상품으로 포트폴리오를 구성하여 적립금을 운용
할 수 있습니다.
참고
# 개인형 퇴직연금(IRP) 운용형태
① 원리금보장형
- 금융회사에서 원금과 이자를 보장하는 상품
- 예금자보호 가능
② 실적배당형
- 국내외 주식 및 채권에 투자하여 운용을 하고, 그 운용수익을 투자자에 배당
- 운용성과에 따라 원금의 손실이 발생할 수 있는 상품
- 예금자 보호 대상 아님
# 퇴직연금 과세체계
• 연금계좌에서 적립금을 수령하는 경우 소득 원천(퇴직급여, 가입자추가부담금 및 운용수익)과 수령 방법(일시금과
연금 수령)에 따라 과세를 다음과 같이 적용하고 있습니다.
※ 지방소득세 포함
주1) 연금수령
·55세 이후에 5년 이상의 기간 동안 받는 금액 중 연금수령한도 이내의 금액

# 참고 기업형IRP(10인 미만 특례제도)
▶ 근로자 10인 미만