<a href="https://colab.research.google.com/github/JawonHwang/SQLAgent/blob/main/SQL_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#테스트 데이터베이스 다운로드 후 파일 압축 해제
!wget https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip -O chinook.zip
!unzip chinook.zip

!pip install -q langchain langchain-openai tiktoken
#SQLAlchemy (SQLite 드라이버)
!pip install sqlalchemy
!pip install langchain-community

--2024-11-24 22:58:03--  https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip
Resolving www.sqlitetutorial.net (www.sqlitetutorial.net)... 104.21.30.141, 172.67.172.250, 2606:4700:3037::ac43:acfa, ...
Connecting to www.sqlitetutorial.net (www.sqlitetutorial.net)|104.21.30.141|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 305596 (298K) [application/zip]
Saving to: ‘chinook.zip’


2024-11-24 22:58:03 (9.19 MB/s) - ‘chinook.zip’ saved [305596/305596]

Archive:  chinook.zip
replace chinook.db? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: chinook.db              


In [None]:
import os
import json
from typing import List, Dict
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_community.agent_toolkits import create_sql_agent
from langchain.agents.agent_types import AgentType
#Text-to-SQL 체인 생성
from langchain.chains import create_sql_query_chain
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
#메시지 구조 - 사용자의 입력을 받아 답변을 내기 위함
from langchain.schema import SystemMessage, HumanMessage
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool

In [None]:
# API 키를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()

print(os.getenv("OPENAI_API_KEY"))

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


In [None]:
# OpenAI API 키 설정
# os.environ["OPENAI_API_KEY"] = ""

# 테스트 데이터베이스 연결
db = SQLDatabase.from_uri("sqlite:///chinook.db")

# ChatGPT 모델 초기화
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo")
# llm = ChatOpenAI(temperature=0, model="gpt-4o")

In [None]:
# SQL 에이전트 생성
agent_executor = create_sql_agent(
    llm=llm,
    db=db,
    agent_type=AgentType.OPENAI_FUNCTIONS, #에이전트 타입 지정
    verbose=True #로그 상세히 출력
)

# Text-to-SQL 체인 생성
sql_chain = create_sql_query_chain(llm, db)

# SQL 쿼리 실행 도구
execute_query = QuerySQLDataBaseTool(db=db)

# SQL 쿼리 해설 프롬프트
query_explanation_prompt = PromptTemplate(
    input_variables=["query"],
    template="다음 SQL 쿼리를 자세히 설명해주세요:\n\n{query}\n\n설명:"
)

# 쿼리 해설 체인
query_explanation_chain = LLMChain(llm=llm, prompt=query_explanation_prompt)

In [None]:
# 쿼리를 처리하고 결과를 반환하는 함수
def process_query(query: str) -> Dict:
    try:
        # Text-to-SQL 변환 - 자연어 질문을 SQL 쿼리로 변환
        response = sql_chain.invoke({"question": query})
        if not response:  # 비어 있는 응답 처리
            return {"error": "Empty response from Text-to-SQL chain"}

        # 반환된 SQL 쿼리 추출
        if isinstance(response, dict):
            sql_query = response.get("sql_query", "")
        elif isinstance(response, str):
            sql_query = response
        else:
            return {"error": f"Unexpected response type: {type(response)}"}

        if not sql_query: # 생성된 SQL 쿼리가 없을 경우 처리
            return {"error": "No SQL query generated"}

        # SQL 쿼리 실행
        result = execute_query.run(sql_query)
        if not result:  # 실행 결과가 없을 경우 예외 처리
            return {"error": "Empty result from SQL query execution"}

        # SQL 쿼리 해설 생성
        query_explanation = query_explanation_chain.run(sql_query)


        return {
            "original_query": query,
            "sql_query": sql_query,
            "result": result,
            "query_explanation": query_explanation,
        }
    except Exception as e:
        return {"error": str(e)}

In [None]:
# 자연어로 답변 생성하는 프롬프트
answer_prompt = PromptTemplate(
    input_variables=["question", "query", "result", "explanation"],
    template="""
기본적으로 사용자의 질문, SQL 쿼리, 쿼리 결과, 그리고 쿼리 설명을 바탕으로 자연어로 답변해주세요.
하지만 사용자의 질문에서 테이블의 정보에 대해 설명을 부탁한다면 SQL 쿼리는 만들지 말고 쿼리 결과와 설명 또한 없으며
테이블에 대한 자세한 정보만을 제공하세요. 테이블 정보에 대해서는 스키마, 컬럼 등과 같은 정보도 추가로 제공하세요.

질문: {question}
SQL 쿼리: {query}
쿼리 결과: {result}
쿼리 설명: {explanation}

답변:
"""
)

# 자연어 답 생성 체인
answer_chain = LLMChain(llm=llm, prompt=answer_prompt)

# 자연어로 답변 생성 함수
def generate_natural_language_answer(query_result: Dict) -> str:
    return answer_chain.run(
        question=query_result["original_query"],
        query=query_result["sql_query"],
        result=query_result["result"],
        explanation=query_result["query_explanation"]
    )

# 메인 프로그램 루프 (사용자가 입력한 질문에 대한 답변 처리)
if __name__ == "__main__":
    while True:
        user_input = input("데이터베이스에 대한 질문을 입력하세요 (종료하려면 'q' 입력): ")
        if user_input.lower() == 'q': # q 입력시 루프 종료
            break

        query_result = process_query(user_input) # 사용자 입력

        if "error" in query_result:
            print(f"오류 발생: {query_result['error']}")
        else:
            # 테이블 설명에 관한 질문인지 확인
            if "테이블 설명" in query_result["original_query"]:
                # 테이블 설명만 출력
                natural_language_answer = generate_natural_language_answer(query_result)
                print("\n답변:", natural_language_answer)
            else:
                # 일반적인 SQL 쿼리 결과와 설명도 함께 출력
                print("\nSQL 쿼리:", query_result["sql_query"])
                natural_language_answer = generate_natural_language_answer(query_result)
                print("\n답변:", natural_language_answer)
                print("\n쿼리 설명:", query_result["query_explanation"])

            print("\n")



데이터베이스에 대한 질문을 입력하세요 (종료하려면 'q' 입력): employees는 몇명이 있나요?

SQL 쿼리: SELECT COUNT("EmployeeId") AS "EmployeeCount" FROM employees;

답변: employees 테이블은 총 8명의 직원이 있습니다. 이 테이블은 직원들의 정보를 담고 있으며, EmployeeId, LastName, FirstName, Title, 등 다양한 컬럼으로 구성되어 있습니다.

쿼리 설명: 위의 SQL 쿼리는 "employees" 테이블에서 "EmployeeId" 열의 값이 NULL이 아닌 행의 수를 세는 쿼리입니다. 결과는 "EmployeeCount"라는 별칭을 가진 열에 표시됩니다. 따라서 결과는 "EmployeeId" 열이 NULL이 아닌 직원의 수를 나타내는 숫자 값이 됩니다.


데이터베이스에 대한 질문을 입력하세요 (종료하려면 'q' 입력): 국가별 총 판매액을 나열하세요. 어느 국가의 고객이 가장 많은 돈을 지출했나요?

SQL 쿼리: SELECT c."Country", SUM(i."Total") AS "TotalSales"
FROM customers c
JOIN invoices i ON c."CustomerId" = i."CustomerId"
GROUP BY c."Country"
ORDER BY "TotalSales" DESC
LIMIT 5;

답변: 가장 많은 돈을 지출한 국가는 미국(USA)입니다. 미국의 총 매출액은 523.06달러입니다.

테이블 정보:
- customers 테이블: 
  - CustomerId (고객 ID)
  - FirstName (고객의 이름)
  - LastName (고객의 성)
  - Country (고객이 사는 국가)
  - ...
  
- invoices 테이블:
  - InvoiceId (송장 ID)
  - CustomerId (고객 ID)
  - InvoiceDate (송장 발행일)
  - Total (총 매출액)
  - ...

쿼리 설명