In [None]:
from dotenv import load_dotenv
from typing import Dict, Any
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.sql_database import SQLDatabase
from langgraph.graph import Graph, END
import logging

logging.basicConfig(level=logging.INFO)

# 데이터베이스 연결 설정
load_dotenv()
db_name = "korean_game_dev_project"
db = SQLDatabase.from_uri(f"sqlite:///{db_name}.db")
db_usable_table_names = db.get_usable_table_names()

llm = ChatOpenAI(model="gpt-4", temperature=0)

def query_analyzer(state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        human_input = state["human_input"]
        prompt = ChatPromptTemplate.from_messages([
            ("human", """
            프로젝트 관리 데이터베이스에 대한 다음 질문을 SQL 쿼리로 변환해주세요: {question}
            사용할 수 있는 데이터베이스 테이블 목록:
            {db_usable_table_names}
            """),
            ("human", "SQL 쿼리만 반환하세요.")
        ])
        
        
        chain = prompt | llm
        response = chain.invoke({
            "question": human_input,
            "db_usable_table_names": ", ".join(db_usable_table_names),
        })
        state["sql_query"] = response.content
        logging.info("SQL query generated successfully")
        return state
    except Exception as e:
        logging.error(f"Error in query_analyzer: {str(e)}")
        raise

def database_querier(state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        sql_query = state["sql_query"]
        result = db.run(sql_query)
        state["query_result"] = result
        logging.info("Database query executed successfully")
        return state
    except Exception as e:
        logging.error(f"Error in database_querier: {str(e)}")
        raise

def data_analyzer(state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        query_result = state["query_result"]
        human_input = state["human_input"]
        prompt = ChatPromptTemplate.from_messages([
            ("human", "다음 데이터를 분석하고 {question}에 대한 답변을 제공해주세요: {data}"),
            ("human", "데이터가 부족하다면 'NEED_MORE_DATA'를 반환하세요.")
        ])
        chain = prompt | llm
        response = chain.invoke({"question": human_input, "data": query_result})
        if "NEED_MORE_DATA" in response.content:
            state["need_more_data"] = True
        else:
            state["analysis_result"] = response.content
        logging.info("Data analysis completed")
        return state
    except Exception as e:
        logging.error(f"Error in data_analyzer: {str(e)}")
        raise

def additional_data_requester(state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        human_input = state["human_input"]
        prompt = ChatPromptTemplate.from_messages([
            ("human", """
            다음 질문에 답하기 위해 필요한 추가 정보를 SQL 쿼리 형태로 요청해주세요: {question}
            사용할 수 있는 데이터베이스 테이블 목록:
            {db_usable_table_names}
            """),
            ("human", "SQL 쿼리만 반환하세요.")
        ])
        
        
        chain = prompt | llm
        response = chain.invoke({
            "question": human_input,
            "db_usable_table_names": ", ".join(db_usable_table_names),
        })
        state["additional_query"] = response.content
        logging.info("Additional query generated")
        return state
    except Exception as e:
        logging.error(f"Error in additional_data_requester: {str(e)}")
        raise

# 그래프 정의
workflow = Graph()

workflow.add_node("query_analyzer", query_analyzer)
workflow.add_node("database_querier", database_querier)
workflow.add_node("data_analyzer", data_analyzer)
workflow.add_node("additional_data_requester", additional_data_requester)

# 엣지 정의
workflow.add_edge("query_analyzer", "database_querier")
workflow.add_edge("database_querier", "data_analyzer")
workflow.add_conditional_edges(
    "data_analyzer",
    lambda x: "additional_data_requester" if x.get("need_more_data") else END
)
workflow.add_edge("additional_data_requester", "database_querier")

# 시작 노드 설정
workflow.set_entry_point("query_analyzer")

# 그래프 컴파일
app = workflow.compile()

# 실행 함수
def run_analysis(question: str) -> Dict[str, Any]:
    try:
        logging.info(f"Starting analysis for question: {question}")
        initial_state = {"human_input": question}
        result = app.invoke(initial_state)
        
        if "analysis_result" in result:
            formatted_result = {
                "question": question,
                "answer": result["analysis_result"],
                "status": "success"
            }
        else:
            formatted_result = {
                "question": question,
                "answer": "분석을 완료할 수 없었습니다. 추가 정보가 필요할 수 있습니다.",
                "status": "incomplete"
            }
        
        logging.info("Analysis completed successfully")
        return formatted_result
    except Exception as e:
        logging.error(f"An error occurred during analysis: {str(e)}")
        return {
            "question": question,
            "answer": "분석 중 오류가 발생했습니다.",
            "status": "error",
            "error_message": str(e)
        }

# 사용 예
result = run_analysis("작업이 가장 많은 프로젝트는 어느 프로젝트인가요?")
print(result)

In [None]:
from dotenv import load_dotenv
from typing import Dict, Any
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.sql_database import SQLDatabase
from langgraph.graph import Graph, END
import logging

logging.basicConfig(level=logging.INFO)

# 데이터베이스 연결 설정
load_dotenv()
db_name = "korean_game_dev_project"
db = SQLDatabase.from_uri(f"sqlite:///{db_name}.db")
db_usable_table_names = db.get_usable_table_names()
# db_usable_table_names
# db_schema = "\n".join([f"{table}: {db.get_table_info(table)}" for table in db_usable_table_names])
# db_schema

db.get_table_info()