# 모듈 불러오기

In [8]:
from dotenv import load_dotenv

from langchain_community.tools import QuerySQLDatabaseTool
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnableLambda, RunnablePassthrough
from langchain_community.tools.tavily_search import TavilyAnswer, TavilySearchResults
from langsmith import Client

from BK.db import DB
import re

from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
from operator import itemgetter
from langchain.schema import Document
from typing import List, Literal, Any

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.pydantic_v1 import BaseModel, Field

from langchain.agents import create_react_agent,AgentExecutor

from typing import Annotated
from datetime import datetime 
from operator import add

  from .autonotebook import tqdm as notebook_tqdm

For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  exec(code_obj, self.user_global_ns, self.user_ns)


# 필요 환경정의 

In [9]:
load_dotenv('.env')

# llm
llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash', temperature=0)
llm_chat = ChatGoogleGenerativeAI(model='gemini-2.5-flash', temperature=0.2)

# 랭스미스
client = Client()
# DB
db = SQLDatabase.from_uri('postgresql://postgres:postgres@10.10.50.155:1108/postgres', schema="doosan")

# 벡터 DB
from langchain_community.embeddings import HuggingFaceEmbeddings
embedding_model_name = "nlpai-lab/KoE5"
embedding = HuggingFaceEmbeddings(
    model_name=embedding_model_name,
    model_kwargs={
        "device": "cpu",
        # "trust_remote_code": True,  # 모델에 따라 필요할 수 있음
    },
    # encode_kwargs={"normalize_embeddings": True},  # 코사인 유사도 안정화 (버전에 따라 지원)
)

vectorstore = Chroma(
    persist_directory='./chroma_reports_db',
    embedding_function=embedding,
    collection_name="reports_ko1" 
)

  embedding = HuggingFaceEmbeddings(
  vectorstore = Chroma(


# Tools

## DataLoader

In [10]:
# DataLoader
class DataLoader(object):
    def __init__(self, db:SQLDatabase = db):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash', temperature=0)
        self.db = db
        self.write_query = create_sql_query_chain(llm, db)
        self.execute_query = QuerySQLDatabaseTool(db=db)
        self.chain = RunnableLambda(self.__create_prompt) | self.write_query | RunnableLambda(self.__clean_answer) |RunnableLambda(self._execute_query)

    def __create_prompt(self, question:str):
        question = question['question']
        prompt = f"""
        당신은 SQL 쿼리 전문가입니다. 사용자의 요청을 SQL 쿼리로 변환하는 임무를 맡습니다. 
        주어진 input을 활용하여 툴을 호출한 후 나오는 DB정보를 활용해 SELECT문을 써서 PostgreSQL로 정의하시오.

        쿼리 작성 규칙:
        - 주어진 정보를 깊게 생각하여 쿼리를 생성하여라.
        - tool을 있는 그대로(축약금지, 꾸며내기 금지) 활용해 참조한 DB정보를 바탕으로 불러온 DB정보안에 들어있는 컬럼명을 반드시 참조해서 SQL문 작성.
        - 모든 컬럼명에는 ""로 감싸주며, as를 통해서 alias 하더라도 ""를 사용할 것.
        - Select외 나머지 DDL 사용 금지.
        - 최종 출력은 SQL 쿼리만 출력.
        - 어떤 경우에도 "SQLQuery:", "Answer:", "Output:" 등의 접두어를 포함하지 않는다.
        - 반드시 SQL문만 순수하게 출력한다. (SELECT로 시작해야 함)
        - 지표별 리그 내 순위 생성이 명시된 콘텐츠에서만 순위 생성 시 반드시 아래와 같은 규칙을 따라줘:
            * 논리적 순서: 먼저 해당 시즌 데이터만 필터링하고, 그 안에서 모든 팀의 각 지표에 대한 순위를 계산해줘. 마지막으로 내가 원하는 특정 팀의 데이터만 필터링해서 보여줘.
            * 포함할 컬럼: "팀 명", "시즌"을 제외한 지표들의 값.
        - tool을 호출하지 않고는 절대 판단하지 말 것.
        - Schema정보를 반드시 참조하여 테이블명을 완성시킬 것.
        - 소수점이 긴 경우, numeric 타입으로 변환 후 둘째 자리까지 반올림한다.
        - 어떤 경우에도 쿼리는 ```을 사용하지 않고 작성한다.
        - LIMIT에 대한 요청이 없는 경우 전체 데이터 조회

        사용자 요청:
        {question}
        """
        
        return {'question':prompt}

    def _execute_query(self, query):
        rdb = DB('pg', 'postgres')
        data = rdb.read_table(query)
        return data

    def invoke(self, query:dict):
        return self.chain.invoke({'question':query})

    def ainoke(self, query:dict):
        return self.chain.ainvoke({'question':query})
    
    def __clean_answer(self, query):
        clean_query = re.sub('SQLQuery: {0,1}', '', query)
        clean_query = re.sub('^sql|sql', '', clean_query)
        clean_query = re.sub('```', "'''", clean_query)
        return clean_query.strip()
        

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## DataAnalyst

In [11]:
class DataAnalyst(object):
    def __init__(self, db:SQLDatabase = db):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
        self.prompt = self.__create_prompt()
        self.data_loader_chain = DataLoader(db).chain
        self.chain = RunnablePassthrough.assign(data= self.data_loader_chain) | self.prompt | llm | StrOutputParser()

    def __create_prompt(self):
        template = """
                    당신은 데이터 분석 전문가입니다. 사용자에게 요청에 따라 데이터를 해석하고 분석하는 임무를 맡습니다. 
                
                    데이터 해석 규칙:
                    - 사용자 요청과 주어진 데이터만 활용해서 해석을 합니다.
                    - 다만 주어진 데이터로 새로운 변수를 만들어 해석할 여지가 있으면 새로운 변수를 만들어서 해석을 해야합니다.
                    - 데이터는 사용자 요청과 관련있는 데이터입니다.
                    
                    사용자 요청: 
                    {question}
                    
                    데이터:
                    {data}
                """ 
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query:dict):
        return self.chain.invoke({'question':query})

    def ainoke(self, query:dict):
        return self.chain.ainvoke({'question':query})
    
    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## BI

In [12]:
class BI(object):
    def __init__(self, db:SQLDatabase = db):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
        self.prompt = self.__create_prompt()
        self.data_loader_chain = DataLoader(db).chain
        self.chain = RunnablePassthrough.assign(data= self.data_loader_chain) | self.prompt | llm | StrOutputParser() | RunnableLambda(self.__clean_answer)

    def __create_prompt(self):
        template = """
        당신은 Python Matplotlib 및 Seaborn 시각화 전문가입니다.
        당신의 임무는 제공된 데이터(DataFrame)와 사용자 요청을 바탕으로 가장 적합한 시각화 유형을 자동으로 선택하고, 실행 가능한 코드를 생성하는 것입니다.
        제공된 데이터(DataFrame)를 분석하고, 컬럼의 데이터 타입(num / category / datetime)을 직접 정의해서 가장 적합한 시각화 유형을 자동으로 선택해야 합니다.

        1. 데이터 분석 단계
        - 컬럼 데이터 타입 판별: num, category, datetime
        - 고유값 개수(unique count) 및 결측치 비율 확인
        - 단일 핵심 지표 여부 확인
        - 누적값 여부 확인 (시계열/누적 구조 판단)
        - 데이터를 정의할 경우 반드시 DB에서 조회된 데이터만 사용한다. (임의의 값 생성 금지)
        - 모든 컬럼 길이가 동일하도록 반드시 확인

        2. 후보 그래프 선택 로직 (조건 기반)
        - 분포(Distribution):
            * x=num → Histogram
            * x=category, y=num → Histogram Four
            * x & y=num → Histo Split / Density Split / KDE 2D
            * x=category, y=num → Violin
            * x=category, y=num, hue=category → Violin Split
        - 관계(Relationship):
            * x & y=num → Scatter / Jointplot / Connected Scatter
            * x & y=num, size=num, category=category → Bubble
            * num 컬럼 여러 개 → Splom
        - 비교(Comparison):
            * x=category, y=num → Bar
            * x=category, y=num, 비율 강조 필요 → Pie
            * Box(num) + Line(num) → Mixed
            * min_ num, max_ num, target category → Min to Max
        - 상관 / 행렬:
            * x=category, y=category, target=num → Heatmap
        - 구조적 / 복합:
            * target, value=num, group=category → Circular
            * 시계열/누적값 구조 → Area Stack

        3. 판단 기준 예시 (우선순위)
        - x=category, y 값 편차 큼 → BarChart
        - x=category, y 비율 강조 필요 → PieChart
        - datetime + 데이터 포인트 많음 → LineChart
        - datetime + 누적 값 → AreaChart
        - 단일 핵심 지표 → BigNumber
        - num 컬럼 2개 이상 → ScatterPlot / BubbleChart
        - 변수 분포 비교 → BoxPlot / ViolinPlot
        - 변수 상관관계 → Heatmap
        - 여러 변수 관계 → PairPlot / Splom
        - 여러 항목 균형 비교 → RadarChart
        - 카테고리별 누적 구조 → StackedBar / AreaStack

        4. 코드 생성 규칙
        - 반드시 실행 가능한 Python 코드만 작성
        - 한글이 깨지지 않도록 Python 코드 작성
        - 코드는 절대 마크다운(`````, ''') 블록 안에 넣지 말고, 실행 가능한 Python 코드만 출력하라.
        - 주석(#)은 허용하되, 불필요한 설명, 문자열, 마크다운(`````, `'''`)은 포함하지 않음
        - exec() 함수로 바로 실행 가능한 형태여야 함
        - 폰트는 라이브러리에 내장된 기본 폰트를 반드시 사용한다. (Windows 환경)
        - plt.show()는 생략
        - print()로 요청에 대한 부연 설명 생략
        - 그래프 제목, X/Y축 라벨, 주요 범례 및 색상 강조 필수
        - 모든 컬럼 길이가 동일해야 함 (데이터 정합성 확인 필수)
        - 시각화에 사용된 컬럼명을 코드 주석으로 명시
        - 버전과 상관없는 오류가 가장 적은 기본적인 코드로만 구성
        - 결과를 저장할 필요 없음 (코드만 생성)

        입력 데이터:
        {data}

        사용자 요청:
        {question}
        """ 
        
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def __clean_answer(self, answer:str):
        answer = answer.replace('```', "'''")
        answer = answer.replace('python', '')
        return answer

    def invoke(self, query:dict):
        return self.chain.invoke({'question':query})

    def ainoke(self, query:dict):
        return self.chain.ainvoke({'question':query})

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [13]:
# Tools
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilyAnswer, TavilySearchResults

@tool(name_or_callable="DataLoaderTool", description='사용자 요청에 따라 DB에서 적절한 데이터를 불러와서 데이터 프레임으로 반환합니다.' ,return_direct=False)
def load_data(query:str) -> str:
    loader = DataLoader(db)
    return loader.invoke(query)

@tool(name_or_callable="DataAnalystTool", description='사용자 요청에 따라 DB에서 적절한 데이터를 불러온 후 분석하고 해석합니다.' ,return_direct=False)
def analysis_data(query:str) -> str:
    analyst = DataAnalyst(db)
    return analyst.invoke(query)

@tool(name_or_callable="BITool", description='사용자 요청에 따라 DB에서 적절한 데이터를 불러와서 시각화할 수 있는 코드를 작성합니다.' ,return_direct=False)
def visualize_data(question: str) -> str:
    bi = BI(db)
    return bi.invoke(question)

tools = [TavilySearchResults(max_results=3), load_data, visualize_data]

  tools = [TavilySearchResults(max_results=3), load_data, visualize_data]


# Output Class

In [33]:
from pydantic import BaseModel, Field
from typing import Literal, Optional



class TaskInfo(BaseModel):
    contents_id: str = Field(..., description="콘텐츠 목차 + 콘텐츠 명")
    type: Literal["Chart", "Table"] = Field(..., description="콘텐츠 결과 유형")
    contents: str = Field(..., description="태스크 설명")

class Output(object):
    class GetMode(BaseModel):
        mode: Literal['task', 'report', 'ect'] = Field(..., description='사용자 요청을 분석하여 요청 유형을 task, report, ect로 분류')
        reason: str = Field(default='', description='사용자 요청을 분류한 근거를 2~3로 작성하세요.')
        
    class PassiveGoalCreator(BaseModel):
        report_title: str = Field(..., description='보고서 제목')
        description: str = Field(..., description='목표 설명')
        
        
        @property
        def text(self) -> str:
            return f'{self.description}'
            
    class GoalOptimizer(BaseModel):
        description:str = Field(..., description='목표 설명')
    
        @property
        def text(self) -> str:
            return f'{self.description}'

    
    class TaskDecomposer(BaseModel):
        tasks: list[TaskInfo] = Field(
            default_factory=list,
            min_item=3,
            max_item=8,
            description='3~8개로 분해된 테스크')
    
    class GetReportInfo(BaseModel):
        doc_val_response: Literal["YES","NO"] = Field(default='YES', description='문서 검증 결과')
        doc_val_reason:str= Field(default='', description='문서 검증 증거')
        documents:str = Field(default='', description='검증에 참조한 문서')

# LLM&Agent

## GetMode | get_mode

In [15]:
class GetMode(object):
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash', temperature=0)
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm.with_structured_output(Output.GetMode)

    def __create_prompt(self):
        template = """사용자 입력을 분석하여 mode를 task, report, ect로 구분한 후, mode를 그렇게 판단한 이유는 reason에 2~3줄로 간략하게 작성하세요.
        task:
        - 야구 관련 데이터를 DB에서 추출하는 요청
        - 야구 관련 데이터를 분석하는 요청
        - 야구 관련 데이터 시각화하는 요청

        report:
        - 야구 관련 리포트를 작성하는 요청

        ect:
        - task와 report에 속하지 않는 요청
        - 야구와 관련되지 않은 요청
        
        사용자 요청:
        {query} 
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt
        
    def invoke(self, query:str):
        answer = self.chain.invoke({'query':query})
        return answer

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## PassiveGoalCreator | get_goal

In [62]:
class PassiveGoalCreator(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm.with_structured_output(Output.PassiveGoalCreator)

    def __create_prompt(self):
        template = """너는 야구전문가야. 사용자 입력을 분석하여 명확한 목표와 그에 해당하는 문서 제목을 생성해주세요.
            요건
            1. 사용자의 입력을 바탕으로 꾸밈없이 명확한 톤으로 다음 LLM이 처리할 수 있도록 문장을 생성하시오.
            2. 기간에 대한 범위가 없다면 가장 최신 시즌을 고려하여 작성하십시오. 
            3. 사용자의 요청과 생성된 목적들을 기반으로 문서 제목을 생성해주십시오. 
            다만 기간에 대한 사용자의 정확한 요청이 없을 경우 제목을 작성할 때 기간을 명시하는 텍스트는 제외하십시오.
            사용자 입력: {query}
            """
            
            # 사용자의 부정확한 입력을 다음 LLM이 처리할 수 있도록 명확하게 한문장의로 재작성하는 작업 담당
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query):
        answer = self.chain.invoke({'query':query})
        return {
            "goal": answer.description,
            "report_title": answer.report_title,
            }

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [63]:
a = PassiveGoalCreator()
ab = a.invoke("ARI팀의 전력분석보고서써줘")

In [64]:
ab

{'goal': 'ARI 팀의 최신 시즌 전력 분석 보고서를 작성합니다.', 'report_title': 'ARI 팀 전력 분석 보고서'}

## GetReportInfo | get_report_info

In [65]:
class GetReportInfo(object):
    def __init__(self, vectorstore: Chroma):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
        self.prompt = self.__create_prompt()
        self.retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
        
        def format_docs(docs: List[Document]) -> str:
            return "\n\n".join(d.page_content for d in docs) if docs else "(no context)"
        
        self.chain = (
        RunnablePassthrough.assign(
            # 입력 dict -> "query"만 뽑아 retriever에 넣고 -> 문자열로 포맷
            context = RunnableLambda(itemgetter("query")) | self.retriever | RunnableLambda(format_docs)
        )
        | self.prompt
        | self.llm.with_structured_output(Output.GetReportInfo)
        )
    def __create_prompt(self):
        template = """
                당신은 엄격하고 객관적인 문서 유효성 검사관입니다. 아래에 제시된 **[사용자 목표/질문]**과 **[검색된 문서]**를 철저하게 비교하여, 문서가 목표를 설명하는 데 충분한 정보를 제공하는지 여부를 판단하세요.

                [판단 기준]
                YES:
                문서의 내용이 **[사용자 목표/질문]**의 핵심 키워드나 주제에 대해 직접적이고 구체적인 정보를 포함하고 있는 경우.
                제시된 문서만으로도 목표에 대한 초안 설명이나 답변을 구성하는 데 충분하다고 판단되는 경우.

                NO:
                문서의 내용이 **[사용자 목표/질문]**과 완전히 다른 주제를 다루고 있거나, 매우 일반적인 정보만 제공하여 목표에 대한 실질적인 설명을 할 수 없는 경우.
                문서가 목표와 관련된 키워드를 포함하고 있더라도, 그 내용이 목표에 대한 의도를 충족시키지 못하는 경우.
                
                마지막으로 증거를 남기기위해 들고온 문서에 대해서 그대로 들고와주십시오.
                 
                목표:
                {query}
                
                들고온 문서:
                {context} 
                """ 
                # 키워드 위주로 비교하며 정해진 목표가 문서로서 완성될 수 있는지 판단
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query:dict):
        answer = self.chain.invoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            "doc_val_reason": answer.doc_val_reason,
            "documents": answer.documents
            }
        
    def ainvoke(self, query:dict):
        answer = self.chain.ainvoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            "doc_val_reason": answer.doc_val_reason,
            "documents": answer.documents
            }
    
   
    
    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [66]:
cc  =GetReportInfo(vectorstore=vectorstore)
ccc = cc.invoke(ab['goal'])
ccc

{'doc_val_response': 'YES',
 'doc_val_reason': "문서의 제목, 범위, 목적 및 목차 내용이 'ARI 팀의 최신 시즌 전력 분석 보고서를 작성합니다.'라는 사용자 목표에 직접적으로 부합합니다. 특히, '시즌 팀 종합 성적', '직전년도 성적과 비교', '투수/타자/수비 능력' 등 구체적인 분석 항목들이 보고서 작성에 필요한 정보를 충분히 제공합니다.",
 'documents': '문서 제목: [시즌](팀 명) 전력 분석 보고서 -문서 타입: 보고서 -문서 범위: 특정 팀, 특정 시즌 -문서 목적: 팀의 전력을 분석하기 위해 해당 팀의 시즌 종합 성적을 강함을 평가하고 투수, 타자, 수비진의 시즌 지표를 활용해 구체적인 전력을 분석하기 위함 - -주요 목차: 1. 시즌 팀 종합 성적 2. 시즌 팀 직전년도 성적과 비교 3. 시즌 팀 경 기력 분석 -목차 콘텐츠: 1. 시즌 팀 종합 성적 - 목적: 해당 팀의 특정 시즌의 종합 성적을 분석하는 콘텐츠 - 콘텐츠 타입: Table - 쿼리 플랜: 특정 팀의 특정 시즌의 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실을 불러오기 2. 시즌 팀 직전년도 성적과 비교 - 목적: 차트를 활용해 직전년도의 성적과 비교해 무엇이 개선됐고 후퇴했는지 확인하는 콘텐츠 - 콘텐츠 타입: Chart - 쿼리 플랜: 특정 팀의 특정 시즌의 순위, 승률, 평균 경기당 득/실과 직전년도의 순위, 승률, 평균 경기당 득/실을 불러오기 3. 시즌 팀 경기력 분석 - 목적: 해당 팀의 각 포지션별 경기력을 분석하는 콘텐츠 - 콘텐츠 타입: Multi-Table - 하위 콘텐츠: 3.1. 투수 능력: - 쿼리 플랜: 특정 팀의 특정 시즌의 투수 대체선수 대비 승리 기여도를 가져온다. - 상세 분석 쿼리 플랜: 상세 분석의 요청이 있는 경우 기존 쿼리 플랜에서 평균 자책점, 이닝당 출루 허용률, 수비 무관 투구 지표도 불러온다. 3.2. 타자 능력: - 쿼리 플랜: 특정 팀의 특정

## SingleGetInfo | single_get_info

In [67]:
class SingleGettInfo(object):
    def __init__(self, vectorstore: Chroma):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
        self.prompt = self.__create_prompt()
        
        self.chain = (
        RunnablePassthrough.assign(query=itemgetter("query"))
        | self.prompt
        | self.llm.with_structured_output(Output.GetReportInfo)
        )
    def __create_prompt(self):
        template = """
        너는 "요구사항 명확성 심사관"이다.
        사용자의 질문이 다음 단계를 실행하기에 충분히 구체적이고 실행 가능한지 판단해야 한다.
        다음 기준을 따른다:
        1. 질문이 **명확한 목표**(무엇을, 왜, 어떻게)를 포함하면 YES
        2. 질문이 **모호하거나**, **추상적이거나**, **행동이 정의되지 않으면** NO
        3. YES/NO 판단 외에는 불필요한 설명 없이 결과만 JSON 형식으로 출력한다.

        예시:
        - "ARI팀의 2024년 득점과 실점을 DB에서 조회해줘" → YES
        - "ARI팀에 대해 알려줘" → NO
        - "시즌 데이터를 분석해줘" → NO
        - "ARI팀의 2024년 투수 WHIP과 OBP 상관관계 분석해줘" → YES

        사용자 질문:
        {query}

        JSON 출력 형식:
        {{
          "doc_val_response": "YES" | "NO"
        }}
        """
                
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query:dict):
        answer = self.chain.invoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            }
        
    def ainvoke(self, query:dict):
        answer = self.chain.ainvoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            }
    
    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## GoalOptimizer | optimize_goal

In [68]:
class GoalOptimizer(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm.with_structured_output(Output.GoalOptimizer)

    def __create_prompt(self):
        template = """당신은 목표 설정 전문가입니다.주어진 원래목표와 주어진 문서(계획서)를 기반으로 콘텐츠별로 달성 가능한 세부적인 목표를 생성하십시오.
        [원래목표]
        {query}

        [지시사항]
        1. 주어진 문서에서의 목차번호와 해당 콘텐츠의 제목을 반드시 세부적인 목표와 같이 명시해주십시오.
        2. 주어진 문서의 콘텐츠별로 최종적으로 달성해야할 목표를 아래와 같이 매핑하여 명시해주십시오.
            - 표, Table : 데이터 반환
            - Chart : 파이썬 코드 반환
            
        3. 원래 목표의 범위를 기준으로 주어진 문서를 기반으로 콘텐츠별로 달성해야될 목표를 상세하게 작성하십시오.
        4. 상세한 분석 요청이 있을 경우만 상세분석쿼리를 참조하여 목표를 작성해주십시오.
        
        [주어진 문서]
        {docs}
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query, docs):
        answer = self.chain.invoke({'query':query, 'docs':docs})
        return answer.text

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [69]:
go = GoalOptimizer()
gogo = go.invoke(ab['goal'],ccc['documents'])
gogo

'1. 시즌 팀 종합 성적: ARI 팀의 최신 시즌 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실 데이터를 반환합니다.\n2. 시즌 팀 직전년도 성적과 비교: ARI 팀의 최신 시즌과 직전년도의 순위, 승률, 평균 경기당 득/실을 비교하는 파이썬 코드를 반환합니다.\n3.1. 투수 능력: ARI 팀의 최신 시즌 투수 대체선수 대비 승리 기여도 데이터를 반환합니다.\n3.2. 타자 능력: ARI 팀의 최신 시즌 타자 대체선수 대비 승리 기여도 데이터를 반환합니다.\n3.3. 수비 능력: ARI 팀의 최신 시즌 수비로 막아낸 득점 데이터를 반환합니다.'

## SingleGoalOptimizer | single_optimize_goal

In [70]:
class SingleGoalOptimizer(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm.with_structured_output(Output.GoalOptimizer)

    def __create_prompt(self):
        template = """
        당신은 목표 설정 전문가입니다. 
        원래목표
        {query}
        지시사항
        1. 원래 목표를 해석하지 말고, 그대로 정확히 수행하세요.
        2. 수행 가능한 행동은 아래 중 하나입니다.
        - 사용자의 요청에 따라 DB에서 적절한 데이터를 조회하고 마크다운 형식의 데이터를 제공합니다
        - 사용자의 요청에 따라 DB에서 조회한 데이터를 분석하고 해석합니다.
        - 사용자의 요청에 따라 DB에서 조회한 데이터를 기반으로 Python 시각화 코드를 작성합니다. (plt.show()는 주석처리)
        3. 모든 데이터는 'DB에서 조회된 실제 데이터'만 사용합니다. 임의의 랜덤/가상/추정 데이터 생성을 엄격 금지합니다.
        4. 주의: 2번 행동 중 하나를 수행할 수 있으며, 절대 2번 행동 이외에는 수행하지 마세요.
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query):
        answer = self.chain.invoke({'query':query})
        return answer.text

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## ResponseOptimizer | optimize_response

In [71]:
# class ResponseOptimizer(object):
#     def __init__(self, llm=llm):
#         self.llm = llm
#         self.prompt = self.__create_prompt()
#         self.chain = self.prompt | self.llm | StrOutputParser()

#     def __create_prompt(self):
#         human_message = """다음 절차에 따라 응답 최적화 프롬프트를 작성해 주세요.
#         1. 목표분석
#         제시된 목표를 분석하고 주요 요소와 의도를 파악해 주세요.
#         2. 응답 사양 수립
#         목표 달성을 위한 최적의 응답 사양을 고안해 주세요. 톤, 구조, 내용의 초점 등을 고려해 주세요.
#         3. 구체적인 지침 작성
#         사전에 수집된 정보에서 사용자의 기대에 부합하는 응답을 위해 필요한, AI 에이전트에 대한 명확하고 실행 가능한 지침을 작성해 주세요. 귀하의 지침으로 AI 에이전트가 수행할 수 있는 있는 것은 이미 조사된 결과를 정리하는 것뿐입니다. 인터넷에 접근할 수 없습니다.
#         4. 예시 제공
#         가능하다면 목표에 맞는 응답의 예시를 하나 이상 포함해주세요.
#         5. 평가 기준 설정
#         응답의 효과를 측정하기 위한 기준을 정의해 주세요.
#         다음 구조로 응답 최적화 프롬프트를 출력해 주세요.
#         목표 분석:
#         [여기에 목표 분석 결과를 기입]
#         응답 사양:
#         [여기에 수립된 응답 사영을 기입]
#         AI 에이전트에 대한 지침
#         [여기에 AI 에이전트에 대한 구체적인 지침을 기입]
#         응답 예시
#         [여기에 응답 예시를 기입]
#         평가 기준
#         [여기에 평가 기준을 기입]
#         그럼, 다음 목표에 대한 응답 최적화 프롬프트를 작성해 주세요.
#         {query}
#         """
#         prompt = ChatPromptTemplate.from_messages(
#             [
#                 ('system', '당신은 AI 에이전트 시스템의 응답 최적화 전문가입니다. 주어진 목표에 대해 에이전트가 목표에 맞는 응답을 반환하기 위한 응답 사양을 수립해 주세요.'),
#                 ('human', human_message),
#             ]
#         )
#         return prompt

#     def invoke(self, query):
#         answer = self.chain.invoke({'query':query})
#         return answer

#     def __str__(self):
#         return f'{self.__class__.__name__}'

#     def __repr__(self):
#         return f'{self.__class__.__name__}'

## TaskDecomposer | decompose_tasks

In [72]:
class TaskDecomposer(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm.with_structured_output(Output.TaskDecomposer)

    def __create_prompt(self):
        template = """태스크: 주어진 목표를 콘텐츠만을 기준으로 실행 가능한 태스크로 분해해 주세요.
        요건
        1. 다음 행동만으로 목표를 달성할 것. 절대 지정된 이외의 행동을 취하지 말 것.
        - 사용자의 요청에 따라 적절한 데이터를 불러와 데이터프레임으로 변환합니다.
        - 사용자 요청에 따라 DB에서 적절한 데이터를 불러와서 시각화할 수 있는 코드를 작성합니다.
        - DB외에 추가로 필요한 정보가 필요할 경우만 판단해서 인터넷 검색을 통해서 정보를 추가합니다.
        2. 각 태스크는 구체적으로 상세하게 기재하며, 단독으로 실행 및 검증 가능한 정보를 포함할 것. 추상적인 표현을 일절 포함하지 말것
        3. 목표에 작성된 콘텐츠 수를 기준으로 태스크를 최소한으로 생성할 것.
        4. 아래와 같이 구성할 것
        - contents_id: 콘텐츠 번호. 기존 콘텐츠 명 , type: 결과유형 [Chart, Table], contents: 태스크 내용
        5. 태스크는 실행 가능한 순서로 리스트화 할 것
        목표: {query}
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query):
        answer = self.chain.invoke({'query':query})
        return answer.tasks

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [73]:
td=TaskDecomposer()

ttd = td.invoke(gogo)

In [74]:
ttd

[TaskInfo(contents_id='1. 시즌 팀 종합 성적', type='Table', contents='DB에서 ARI 팀의 최신 시즌 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실 데이터를 불러와 데이터프레임으로 변환합니다.'),
 TaskInfo(contents_id='2. 시즌 팀 직전년도 성적과 비교', type='Chart', contents='DB에서 ARI 팀의 최신 시즌과 직전년도의 순위, 승률, 평균 경기당 득/실 데이터를 불러와, 두 시즌의 순위, 승률, 평균 경기당 득/실을 비교하는 시각화 파이썬 코드를 작성합니다.'),
 TaskInfo(contents_id='3.1. 투수 능력', type='Table', contents='DB에서 ARI 팀의 최신 시즌 투수 대체선수 대비 승리 기여도 데이터를 불러와 데이터프레임으로 변환합니다.'),
 TaskInfo(contents_id='3.2. 타자 능력', type='Table', contents='DB에서 ARI 팀의 최신 시즌 타자 대체선수 대비 승리 기여도 데이터를 불러와 데이터프레임으로 변환합니다.'),
 TaskInfo(contents_id='3.3. 수비 능력', type='Table', contents='DB에서 ARI 팀의 최신 시즌 수비로 막아낸 득점 데이터를 불러와 데이터프레임으로 변환합니다.')]

## ExecuteTask | report_execute_task, task_execute_task

In [75]:
"""다음 태스크를 실행하고 상세한 답변을 제공해주세요. 당신은 다음 도구에 접근할 수 있습니다:
    
        {tools}
        
        다음 형식을 사용하세요:
        
        Question: 답변해야 하는 입력 질문
        Thought: 무엇을 할지 항상 생각하세요.
        Action: 취해야 할 행동, [{tool_names}] 중 하나여야 합니다. 리스트에 있는 도구 중 1개를 택하십시오.
        Action Input: 행동에 대한 입력값
        Observation: 행동의 결과
        ... (이 Thought/Action/Action Input/Observation의 과정이 N번 반복될 수 있습니다.)
        Thought: 이제 최종 답변을 알겠습니다.
        Final Answer: 원래 입력된 질문에 대한 최종 답변
        
        ## 추가적인 주의사항
        - 반드시 [Thought/Action/Action Input format] 이 사이클의 순서를 준수하십시오. 항상 Action 전에는 Thought가 먼저 나와야 합니다.
        - 최종 답변은 Question의 지시사항을 준수한 답변만 제공하십시오
        - 각각 표(테이블) 형태를 Question에서 요구하면 DataLoader, 차트(시각화) 형태를 Question에서 요구하면 BITool, 요약 또는 분석 DataAnalyst 활용
        - Question의 지시사항의 최종적인 요구의 산출문만 제공해주세요
        - DataLoader툴을 사용했다면 Json형식의 데이터를 제공하고, BITool을 사용했다면 순수한 Python 코드만을 제공하고 마지막으로 DataAnalyst를 활용했다면 700이내의 요약을 부가적인 타이틀 없이 줄글로만 제공해주세요
        - 정보가 취합되었다면 불필요하게 사이클을 반복하지 마십시오.
        - 묻지 않은 정보를 찾으려고 도구를 사용하지 마십시오.
        - 실행은 철저하고 포괄적으로 수행하세요.
        
        시작하세요!
        
        Question: {task}
        Thought: {agent_scratchpad}
        """

'다음 태스크를 실행하고 상세한 답변을 제공해주세요. 당신은 다음 도구에 접근할 수 있습니다:\n\n        {tools}\n\n        다음 형식을 사용하세요:\n\n        Question: 답변해야 하는 입력 질문\n        Thought: 무엇을 할지 항상 생각하세요.\n        Action: 취해야 할 행동, [{tool_names}] 중 하나여야 합니다. 리스트에 있는 도구 중 1개를 택하십시오.\n        Action Input: 행동에 대한 입력값\n        Observation: 행동의 결과\n        ... (이 Thought/Action/Action Input/Observation의 과정이 N번 반복될 수 있습니다.)\n        Thought: 이제 최종 답변을 알겠습니다.\n        Final Answer: 원래 입력된 질문에 대한 최종 답변\n\n        ## 추가적인 주의사항\n        - 반드시 [Thought/Action/Action Input format] 이 사이클의 순서를 준수하십시오. 항상 Action 전에는 Thought가 먼저 나와야 합니다.\n        - 최종 답변은 Question의 지시사항을 준수한 답변만 제공하십시오\n        - 각각 표(테이블) 형태를 Question에서 요구하면 DataLoader, 차트(시각화) 형태를 Question에서 요구하면 BITool, 요약 또는 분석 DataAnalyst 활용\n        - Question의 지시사항의 최종적인 요구의 산출문만 제공해주세요\n        - DataLoader툴을 사용했다면 Json형식의 데이터를 제공하고, BITool을 사용했다면 순수한 Python 코드만을 제공하고 마지막으로 DataAnalyst를 활용했다면 700이내의 요약을 부가적인 타이틀 없이 줄글로만 제공해주세요\n        - 정보가 취합되었다면 불필요하게 사이클을 반복하지 마십시오.\n        

In [76]:
class ExecuteTask(object):
    def __init__(self, tools:list=tools, llm=llm):
        self.llm = llm
        self.tools = tools
        self.prompt = self.__create_prompt()
        self.agent = self.__create_agent()

    def __create_prompt(self):
    
        template = """다음 태스크를 실행하고 상세한 답변을 제공해주세요. 당신은 다음 도구에 접근할 수 있습니다:
    
        {tools}
        
        다음 형식을 사용하세요:
        
        Question: 답변해야 하는 입력 질문
        Thought: 무엇을 할지 항상 생각하세요.
        Action: 취해야 할 행동, [{tool_names}] 중 하나여야 합니다.
        Action Input: 행동에 대한 입력값
        Observation: 행동의 결과
        ... (이 Thought/Action/Action Input/Observation의 과정이 N번 반복될 수 있습니다.)
        Thought: 이제 최종 답변을 알겠습니다.
        Final Answer: 원래 입력된 질문에 대한 최종 답변
        
        ## 추가적인 주의사항
        - 반드시 [Thought/Action/Action Input format] 이 사이클의 순서를 준수하십시오. 항상 Action 전에는 Thought가 먼저 나와야 합니다.
        - 한 번의 검색으로 해결되지 않을 것 같다면 문제를 분할하여 푸는 것이 중요합니다.
        - 정보가 취합되었다면 불필요하게 사이클을 반복하지 마십시오.
        - 묻지 않은 정보를 찾으려고 도구를 사용하지 마십시오.
        - 실행은 철저하고 포괄적으로 수행하세요.
        - 가능한 구체적인 사실이나 데이터를 제공하세요.
        - 차트를 생성하는 코드를 작성하는 요청이 들어오면, 차트를 생성하는 순수한 파이썬 코드만 답변하세요.
        
        시작하세요!
        
        Question: {task}
        Thought: {agent_scratchpad}
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def __create_agent(self):
        agent = create_react_agent(llm=self.llm, tools=self.tools, prompt=self.prompt)
        executor = AgentExecutor(agent=agent, tools=self.tools, verbose=True, handle_parsing_errors=True)
        return executor
        
    def invoke(self, task):
        answer = self.agent.invoke({'task':task})
        return answer['output']

    def ainvoke(self, task):
        answer = self.agent.ainvoke({'task':task})
        return answer['output']

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

In [77]:
et = ExecuteTask()

etts = []
for i in ttd: 
    ett = et.invoke(i)
    etts.append(ett)




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mAction: DataLoaderTool
Action Input: query="contents_id='1. 시즌 팀 종합 성적' type='Table' contents='DB에서 ARI 팀의 최신 시즌 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실 데이터를 불러와 데이터프레임으로 변환합니다.'"[0m[33;1m[1;3m   Rank    G   W   L       W-L    R   RA  평균 경기당 득점  평균 경기당 실점
0     8  162  89  73  0.549383  886  788       5.47       4.86[0m[32;1m[1;3mFinal Answer: DB에서 ARI 팀의 최신 시즌 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실 데이터를 불러와 데이터프레임으로 변환한 결과입니다:

```
   Rank    G   W   L       W-L    R   RA  평균 경기당 득점  평균 경기당 실점
0     8  162  89  73  0.549383  886  788       5.47       4.86
```[0m

[1m> Finished chain.[0m


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3mAction: BITool
Action Input: DB에서 ARI 팀의 최신 시즌과 직전년도의 순위, 승률, 평균 경기당 득/실 데이터를 불러와, 두 시즌의 순위, 승률, 평균 경기당 득/실을 비교하는 시각화 파이썬 코드를 작성합니다.[0m[38;5;200m[1;3mimport pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 데이터 정의 (DB에서 조회된 데이터만 사용)
data = 

In [86]:
etts[0]

'DB에서 ARI 팀의 최신 시즌 순위, 경기 수, 승리 수, 패배 수, 승률, 득점, 실점, 평균 경기당 득/실 데이터를 불러와 데이터프레임으로 변환한 결과입니다:\n\n```\n   Rank    G   W   L       W-L    R   RA  평균 경기당 득점  평균 경기당 실점\n0     8  162  89  73  0.549383  886  788       5.47       4.86\n```'

## ResultAggregator | aggregate_result

In [None]:
class ResultAggregator(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm | StrOutputParser()

    def __create_prompt(self):
        template = """주어진 목표:
        {optimized_goal}
        
        조사결과:
        {results}
        
        주어진 목표에 대해서 조사 결과를 활용하여 다음 지시에 기반한 응답을 생성해 주세요.
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt
        
    def invoke(self, optimized_goal:str, results:str):
        answer = self.chain.invoke({'optimized_goal':optimized_goal, 'results':results})
        return answer

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## SingleResultAggregator | single_aggregate_result

In [None]:
class SingleResultAggreagor(object):
    def __init__(self, llm=llm):
        self.llm = llm
        self.prompt = self.__create_prompt()
        self.chain = self.prompt | self.llm | StrOutputParser() | RunnableLambda(self.__clean_answer)

    def __create_prompt(self):
        template = """
        주어진 목표:
        {optimized_goal}
        
        조사결과:
        {results}
        
        주어진 목표에 대해서 조사 결과를 활용하여 다음 지시에 기반한 응답을 무조건 하나만 생성해 주세요.
        - 오직 조사결과만 ouput으로 지정한다. (수행 과정, 로그, 코멘트 등 작성 금지)
        """
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def __clean_answer(self, answer:str):
        answer = answer.replace('```', '')
        answer = answer.replace('python', '')
        answer = answer.replace('plt.show()', '')
        return answer
    
    def invoke(self, optimized_goal:str, results:str):
        answer = self.chain.invoke({'optimized_goal':optimized_goal, 'results':results})
        return answer

    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'



## ResponseCustomer | response_customer

In [None]:
class ResponseCustomer(object):
    def __init__(self):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash', temperature=0.2)
        self.prompt = self.__create_prompt()
        
        self.chain = self.prompt | self.llm | StrOutputParser()
        
    def __create_prompt(self):
        template = """
                당신은 챗봇입니다.
                아래 '결정 사유'를 참고해, 사용자가 무엇을 원했는지 확인하고 아래의 조건을 선택해서 답변을 생성하십시오.
                이전 질문이 있다면 이것도 기억하고 답변을 생성하십시오.
                
                조건 1: 결정 사유가 라우팅 실패에 관련된 내용일 경우
                - 형식 가이드(3~5문장, 한국어, 공손하지만 간결하게):
                1) 확인: 사용자의 의도를 한 문장으로 재진술 (예: "~정보를 요청하셨군요.")
                2) 한계: 현재 참조 가능한 자료에 무엇이 없는지 명확히 (예: "현재 제가 참고할 수 있는 정보에는 ~가 포함되어 있지 않습니다.")
                3) 대안: a) 가능한 주제로 이어가기

                조건 2: 결정 사유가 모드에 대한 선택에 대한 이유일 경우
                - 형식 가이드(3~5문장, 한국어, 공손하게 간결하게):
                1) 확인: 사용자의 의도를 한 문장으로 재진술 (예: "~정보를 요청하셨군요.")
                2) 답변: 사용자의 의도에 대해서 답변을 한다.
                3) 한계: 최대한 꾸며내거나 부풀리지 않고 정확하게 답변하려고 노력하여야한다. 그리고 답변이 정확하지 않을 수 있다는 사실을 분명히 한다.

                [사용자 질문]
                {question}

                [결정 사유]
                {doc_val_reason}
                
                [이전 질문]
                {prev_user_question}
                """ 
        prompt = ChatPromptTemplate.from_template(template)
        return prompt
    
    def invoke(self, query:dict, doc_val_reason:str, prev_user_query:str):
        return self.chain.invoke({'question':query,
                                  'doc_val_reason': doc_val_reason,
                                  'prev_user_question': prev_user_query})

    def ainoke(self, query:dict, doc_val_reason:str, prev_user_query:str):
        return self.chain.ainvoke({'question':query,
                                  'doc_val_reason': doc_val_reason,
                                  'prev_user_question': prev_user_query})
        
    
    def __format_docs(docs: List[Document]) -> str:
            return "\n\n".join(d.page_content for d in docs) if docs else "(no context)"
    
    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

## GetReportInfo

In [None]:
class GetReportInfo(object):
    def __init__(self, vectorstore: Chroma):
        self.llm = ChatGoogleGenerativeAI(model='gemini-2.5-flash')
        self.prompt = self.__create_prompt()
        self.retriever = vectorstore.as_retriever(search_kwargs={"k": 1})
        
        def format_docs(docs: List[Document]) -> str:
            return "\n\n".join(d.page_content for d in docs) if docs else "(no context)"
        
        self.chain = (
        RunnablePassthrough.assign(
            # 입력 dict -> "query"만 뽑아 retriever에 넣고 -> 문자열로 포맷
            context = RunnableLambda(itemgetter("query")) | self.retriever | RunnableLambda(format_docs)
        )
        | self.prompt
        | self.llm.with_structured_output(Output.GetReportInfo)
        )
    def __create_prompt(self):
        template = """
                너는 "요구사항-계획서 정합성(Alignment) 심사관"이다.
                입력으로 사용자 질문과 벡터DB에서 가져온 계획서 요약/본문이 주어진다.
                너의 임무는: 
                (a) 질문의 구체적 목표(의도/스펙)를 추출하고,
                (b) 들고온 계획서가 그 목표를 충족하는지 평가하고, 데이터의 수집 방법론에 대해서는 평가하지않는다.
                (c) 사용자의 목표에 구체적인 보고서 작성 대상이 있는지 판단한다.
                (d) 최종 라우팅 결정을 JSON으로 반환하는 것이다.
                 - JSON형태 : "doc_val_response":"YES|NO","doc_val_reason":"한 줄 요약","documents":"들고온 문서(그대로)"
                 
                사용자 요구사항:
                {query}
                
                들고온 문서:
                {context} 
                """ 
                
        prompt = ChatPromptTemplate.from_template(template)
        return prompt

    def invoke(self, query:dict):
        answer = self.chain.invoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            "doc_val_reason": answer.doc_val_reason,
            "documents": answer.documents
            }
        
    def ainvoke(self, query:dict):
        answer = self.chain.ainvoke({'query':query})
        return {
            "doc_val_response": answer.doc_val_response,
            "doc_val_reason": answer.doc_val_reason,
            "documents": answer.documents
            }
    
   
    
    def __str__(self):
        return f'{self.__class__.__name__}'

    def __repr__(self):
        return f'{self.__class__.__name__}'

# LangGraph

## State

In [None]:
class Turn(BaseModel):
    user_query: str=Field(default_factory=list, description='유저 쿼리')      
    mode: Literal['task', 'report', 'ect'] = Field(default='', description="모드")
    ai_answer: str=Field(default_factory=list, description='답변')                           
    ts: datetime=Field(default_factory=datetime.now())

class State(BaseModel):
    query: str = Field(..., description='사용자가 입력한 쿼리')
    history: Annotated[list[Turn], add] = Field(default_factory= list, description=' 이전 기록')
    goal: str = Field(default='', description='사용자가 입력한 쿼리에서 목표 추출')
    report_title: str(default='', description='사용자요청을 기반으로 생성된 보고서 제목')
    mode: Literal['task', 'report', 'ect'] = Field(default='', description='사용자 요청 유형')
    doc_val_response: Literal["YES","NO"] = Field(default='YES', description='문서 검증 결과')
    doc_val_reason:str= Field(default='', description='문서 검증 증거')
    documents:str = Field(default='', description='검증에 참조한 문서')
    optimized_goal: str = Field(default='', description='최적화된 목표') 
    #optimized_response: str = Field(default='', description='최적화된 응답 정의')
    tasks: list = Field(default_factory=list, description='실행할 테스크 리스트')
    current_task_index: int = Field(default=0, description='현재 실행 중인 테스크 변호')
    results: list = Field(default_factory=list, description='실행 완료된 테스크 결과 리스트')
    final_output: str = Field(default='', description='최종 출력 결과')

## Node

In [None]:
from langgraph.graph import START, END, StateGraph
from functools import wraps

def node_logging(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print(f"{func.__name__} 시작")
        result = func(*args, **kwargs)
        print(f"{func.__name__} 완료")
        return result
    return wrapper

### Node 정의

In [None]:
@node_logging
def get_mode(state:State):
    state = state.dict()
    query = state['query']
    answer = GetMode().invoke(query)
    return {'mode':answer.mode, 'doc_val_reason':answer.reason}


# 리포트
@node_logging
def get_goal(state:State):
    state = state.dict()
    query = state['query']
    answer = PassiveGoalCreator().invoke(query)
    return {'goal':answer['goal'], 'report_title': answer['report_title']}

@node_logging
def get_report_info(state:State):
    state = state.dict()
    query = state['goal']
    answer = GetReportInfo(vectorstore=vectorstore).invoke(query)
    return {'doc_val_response': answer['doc_val_response'],
            "doc_val_reason": answer['doc_val_reason'],
            "documents": answer['documents']}

@node_logging
def optimize_goal(state:State):
    state = state.dict()
    query = state['goal']
    docs = state['documents']
    answer = GoalOptimizer().invoke(query, docs)
    
    # tasks, index 초기화

    return {
            'optimized_goal':answer,
            "tasks": [],
            'results': [],
            "current_task_index": 0
            }
 
# @node_logging
# def optimize_response(state:State):
#     state = state.dict()
#     query = state['optimized_goal']
#     answer = ResponseOptimizer().invoke(query)
#     return {'optimized_response':answer}

@node_logging
def decompose_tasks(state:State):
    state = state.dict()
    query = state['optimized_goal']
    answer = TaskDecomposer().invoke(query)
    return {'tasks': answer}

@node_logging
def execute_task(state:State):
    state = state.dict()
    complete_tasks = state['results']
    current_task_index = state['current_task_index']
    task = state['tasks'][current_task_index]
    answer = ExecuteTask().invoke(task)
    current_task_index += 1
    return {'results': complete_tasks + [answer], 'current_task_index':current_task_index}

@node_logging
def aggregate_result(state:State):
    state = state.dict()
    results = state['results']
    optimized_goal = state['optimized_goal']
    #optimized_response = state['optimized_response']
    answer = ResultAggregator().invoke(optimized_goal, results)
    
    history = {
        "user_query": state['query'],
        "ai_answer": answer,
        "ts" : datetime.now()
    }
    
    return {'final_output': answer, "history" : [history]}



# 단일 태스크


@node_logging
def single_get_info(state:State):
    state = state.dict()
    query = state['goal']
    answer = SingleGettInfo(vectorstore=vectorstore).invoke(query)
    return {'doc_val_response': answer['doc_val_response']}

@node_logging
def single_optimize_goal(state:State):
    state = state.dict()
    query = state['goal']
    answer = SingleGoalOptimizer().invoke(query)
    return {'optimized_goal':answer}

@node_logging
def single_execute_task(state:State):
    state = state.dict()
    task = state['optimized_goal']
    answer = ExecuteTask().invoke(task)
    return {'results': [answer]}

@node_logging
def single_aggregate_result(state:State):
    state = state.dict()
    results = state['results']
    optimized_goal = state['optimized_goal']
    answer = SingleResultAggreagor().invoke(optimized_goal, results)
    
    history = {
        "user_query": state['query'],
        "ai_answer": answer,
        "ts" : datetime.now()
    }
    
    return {'final_output': answer, 'history': [history]}


# 챗봇
@node_logging
def response_customer(state:State):
    state = state.dict()
    query = state['query']
    doc_val_reason = state['doc_val_reason']
    
    history = state.get('history', [])
    if history:
        prev_user_query = history[-1]['user_query']
    else: 
        prev_user_query="이전 질문 없음"
        
    answer = ResponseCustomer().invoke(query = query, doc_val_reason = doc_val_reason, prev_user_query=prev_user_query)
    
    history = {
        "user_query": state['query'],
        "ai_answer": answer,
        "ts" : datetime.now()
    }
    
    return {'final_output': answer, "history" : [history]}

In [None]:
workflow = StateGraph(State)

workflow.add_node('get_mode', get_mode)

# 리포트
workflow.add_node('get_goal', get_goal)
workflow.add_node('get_report_info', get_report_info)
workflow.add_node('optimize_goal', optimize_goal)
#workflow.add_node('optimize_response', optimize_response)
workflow.add_node('decompose_tasks', decompose_tasks)
workflow.add_node('report_execute_task', execute_task)
workflow.add_node('aggregate_result', aggregate_result)

# 단일 태스크
workflow.add_node('single_get_goal', get_goal)
workflow.add_node('single_get_info', single_get_info)
workflow.add_node('single_optimize_goal', single_optimize_goal) 
workflow.add_node('single_execute_task', single_execute_task)
workflow.add_node('single_aggregate_result', single_aggregate_result)



# 챗봇
workflow.add_node('response_customer', response_customer)

## Edge

In [None]:
workflow.add_edge(START, 'get_mode')
workflow.add_conditional_edges('get_mode', lambda state: state.dict()['mode'], {'task':'single_get_goal', 'report':'get_goal', 'ect':'response_customer'})
workflow.add_edge('single_get_goal', 'single_get_info')
workflow.add_conditional_edges('single_get_info', lambda state: state.dict()['doc_val_response'], {'YES':'single_optimize_goal', 'NO': END})
workflow.add_edge('single_optimize_goal', 'single_execute_task')
workflow.add_edge('single_execute_task', 'single_aggregate_result')

workflow.add_edge('response_customer', END)

workflow.add_edge('get_goal', 'get_report_info')
workflow.add_conditional_edges('get_report_info', lambda state: state.dict()['doc_val_response'], {'YES':'optimize_goal', 'NO':'response_customer'})
workflow.add_edge('optimize_goal', 'decompose_tasks')
workflow.add_edge('decompose_tasks', 'report_execute_task')
workflow.add_conditional_edges('report_execute_task', lambda state: state.dict()['current_task_index'] < len(state.dict()['tasks']), {True:'report_execute_task', False:'aggregate_result'})
workflow.add_edge('aggregate_result', END)

## Graph

In [None]:
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
conversation_id = "my-test-session-001" 
config = {"configurable": {"thread_id": conversation_id}}

# 테스트

In [None]:
result_ect = graph.invoke({'query':'야구에서 WHIP이라는 지표에 대해서 알려줘'}, config)
result_ect['final_output']

In [None]:
result_ect2 = graph.invoke({'query':'그럼 내가 전에 물어봤던 지표랑 비슷한 지표는 뭐가 있을까'}, config)
result_ect2['final_output']

In [None]:
result_task = graph.invoke({'query':'ARI팀의 2024년 시즌 득점과 실점 데이터를 보여줘'}, config)
result_task['final_output']

In [None]:
result_task2 = graph.invoke({'query':'2024년 시즌 1위 팀의 승률을 차트로 그려줘.'}, config)
print(result_task2['final_output'])

In [None]:
result_task3 = graph.invoke({'query':'2024년 ARI팀의 전력분석보고서 작성해줘.'}, config)
result_task3['final_output']