## 의학 데이터에서 단일 테이블 Text-to-SQL 최적화하기
---

https://github.com/aws-samples/text-to-sql-bedrock-workshop/blob/main/module_1/01_single-table-optimized-for-latency.ipynb

### 무엇을 배우게 될까요?

이 튜토리얼에서는 **자연어 질문을 SQL 쿼리로 변환하는 Text-to-SQL 시스템**을 구축해보겠습니다. 

**Text-to-SQL이란?** 
- 사용자가 "30세 이상 환자가 몇 명인가요?"라고 물으면
- 시스템이 자동으로 `SELECT COUNT(*) FROM patients WHERE age >= 30` 같은 SQL을 생성하는 기술입니다

### 왜 이 방법이 특별한가요?

일반적인 Text-to-SQL 시스템은 여러 번의 AI 모델 호출을 통해:
1. 데이터베이스 테이블 목록 조회
2. 각 테이블의 구조(스키마) 파악
3. SQL 쿼리 생성 및 검증
4. 최종 쿼리 실행

이 과정을 거치는데, **우리는 단일 테이블만 다루므로** 이런 단계들을 건너뛰어 **응답 속도를 크게 향상**시킬 수 있습니다.

### 사용할 데이터

실습에서는 **당뇨병 환자 데이터**를 사용합니다. 이 데이터는 환자의 나이, BMI, 혈당 수치 등의 정보를 포함하며 `diabetes.csv` 파일로 제공됩니다.

```
@article{Machado2024,
    author = "Angela Machado",
    title = "{diabetes.csv}",
    year = "2024",
    month = "3",
    url = "https://figshare.com/articles/dataset/diabetes_csv/25421347",
    doi = "10.6084/m9.figshare.25421347.v1"
}
```

### 시작하기 전에

- [LangChain](https://www.langchain.com)의 [SQLDatabaseToolkit](https://python.langchain.com/v0.2/docs/integrations/toolkits/sql_database/)을 활용합니다
- AWS Bedrock의 Claude 모델을 사용합니다
- 아래 `pip install` 명령 실행 시 경고 메시지가 나타날 수 있으나 무시하셔도 됩니다

In [ ]:
# 필요한 라이브러리 설치
# 경고 메시지가 나타나도 정상이니 걱정하지 마세요!
%pip install -qU openpyxl langchain boto3
%pip install -qU langchain-community langchain-aws

In [ ]:
# 필요한 라이브러리 불러오기
# 이 셀에서는 Text-to-SQL 시스템 구축에 필요한 모든 도구들을 가져옵니다
import os
import sys
from typing import List, Tuple
import itertools
from time import time

import jinja2  # 프롬프트 템플릿 생성용
from langchain_community.utilities import SQLDatabase  # SQLite 데이터베이스 연결
import sqlite3  # SQLite 데이터베이스 조작
import boto3  # AWS 서비스 연결
import pandas as pd  # 데이터 처리
from langchain_aws import ChatBedrock  # AWS Bedrock의 Claude 모델 사용
from langchain_community.agent_toolkits.sql.base import create_sql_agent  # SQL 에이전트 생성
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit  # SQL 도구 모음
from langchain.agents.agent_types import AgentType  # 에이전트 타입 지정
from langchain.chains import create_sql_query_chain  # SQL 쿼리 체인 생성
from langchain_core.prompts import PromptTemplate  # 프롬프트 템플릿
from langchain.callbacks.base import BaseCallbackHandler  # 콜백 핸들러

sys.path.append('../')
import utilities as u

In [ ]:
# 시스템 설정 및 초기화
# 여기서 AI 모델, 데이터베이스 연결, 각종 옵션들을 설정합니다

# 사용할 AI 모델 지정 (Claude 3 Sonnet 사용)
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
# model_id = "anthropic.claude-3-haiku-20240307-v1:0"  # 더 빠른 모델을 원할 경우

# SQLite 데이터베이스 연결 (test.db 파일에 저장)
con = sqlite3.connect("test.db")

# Jinja2 템플릿 엔진 설정 (프롬프트 생성용)
jenv = jinja2.Environment(trim_blocks=True, lstrip_blocks=True)

# LangChain 추적 설정 (선택사항 - 디버깅용)
#os.environ["LANGCHAIN_TRACING_V2"] = "true"
#os.environ["LANGCHAIN_API_KEY"] = "..."

# AWS 리전 설정
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"

# 시스템 동작 옵션들
is_conversational = True    # 대화형 모드 활성화 (이전 질문 맥락 고려)
force_setup_db = False     # 데이터베이스 강제 재생성 여부
do_few_shot_prompting = False  # Few-shot 프롬프팅 사용 여부
show_SQL = True            # 생성된 SQL 쿼리 표시 여부

# AI 모델 초기화 (AWS Bedrock의 Claude 사용)
llm = ChatBedrock(model_id=model_id, region_name="us-west-2")

# 데이터베이스 연결 객체 생성
db = SQLDatabase.from_uri("sqlite:///test.db")
context = db.get_context()

# SQL 쿼리 생성 체인 생성
chain = create_sql_query_chain(llm, db)

### 데이터베이스 구성하기

이제 당뇨병 환자 데이터를 데이터베이스에 저장해보겠습니다. 먼저 CSV 파일을 불러와서 어떤 데이터가 있는지 살펴보겠습니다.

In [ ]:
# CSV 파일에서 데이터 불러오기
df = pd.read_csv("diabetes.csv")

# 데이터의 첫 5행을 보여줍니다 - 어떤 컬럼들이 있는지 확인할 수 있습니다
df.head()

이제 이 데이터를 SQLite 데이터베이스의 'patients' 테이블에 저장합니다:

In [ ]:
def setup_db():
    """
    데이터베이스에 환자 데이터를 저장하는 함수
    DataFrame을 SQLite의 'patients' 테이블로 변환합니다
    """
    print("데이터베이스 설정 중...")
    # pandas의 to_sql 메서드로 DataFrame을 SQLite 테이블로 저장
    # if_exists="replace": 기존 테이블이 있으면 덮어쓰기
    # index=True: DataFrame의 인덱스도 함께 저장
    df.to_sql(name="patients", con=con, if_exists="replace", index=True)
    con.commit()  # 변경사항을 데이터베이스에 확실히 저장

In [None]:
def maybe_setup_db():
    if force_setup_db:
        print("Forcing DB setup")
        setup_db()
    else:
        try:
            cur = con.cursor()
            cur.execute("SELECT count(*) FROM patient")
            print(f"Table exists ({cur.fetchone()[0]}), no need to recreate DB")
        except Exception as ex:
            # print(f"Caught: {ex}")
            cur.close()
            if "no such table: patient" in str(ex):
                print(f"Table not there, need to recreate DB")
                setup_db()
            else:
                raise ex

In [None]:
maybe_setup_db()

### 대화형 챗봇을 위한 질문 컨텍스트 처리

**대화형 시스템의 핵심 문제점**

일반적인 대화에서는 이전 질문을 참조하는 후속 질문들이 자주 나옵니다:

**예시:**
1. 사용자: "30세 이상 환자가 몇 명인가요?"
2. 사용자: "그 중 BMI가 30 이상인 사람은 몇 명인가요?"

두 번째 질문의 "그 중"은 첫 번째 질문의 "30세 이상 환자"를 가리킵니다. 하지만 AI 모델이 두 번째 질문만 보면 "그 중"이 무엇을 의미하는지 알 수 없습니다.

**해결 방법**

따라서 우리는 **질문 맥락 해소(Question Decontextualization)**를 수행해야 합니다:
- "그 중 BMI가 30 이상인 사람은 몇 명인가요?"
- → "30세 이상이면서 BMI가 30 이상인 환자는 몇 명인가요?"

이렇게 변환하면 각 질문이 독립적으로 이해될 수 있습니다.

In [ ]:
def decontextualize_question(question: str, messages: List[List[str]]) -> str:
    """
    대화 맥락을 고려하여 질문을 독립적으로 이해할 수 있도록 다시 작성하는 함수
    
    매개변수:
    - question: 현재 사용자 질문
    - messages: 이전 대화 기록 [[질문1, 답변1], [질문2, 답변2], ...]
    
    반환값:
    - 맥락이 해소된 독립적인 질문
    """
    print(f"질문 맥락 해소 중: {question}")
    print(f"이전 대화 기록: {len(messages)}개")
    
    # AI 모델에게 전달할 프롬프트 템플릿
    prompt_template = """
질문과 답변의 기록과 새로운 질문을 제공하겠습니다.
새로운 질문을 이전 대화 맥락 없이도 독립적으로 이해할 수 있도록 다시 작성해주세요.

<이전_대화_기록>
{% for x in history %}
  <질문>{{ x[0] }}</질문>
  <답변>{{ x[1] }}</답변>
{% endfor %}
</이전_대화_기록>

새로운 질문:
<새_질문>
{{question}}
</새_질문>

의미를 명확하게 하기 위해 **최소한의 변경**만 하세요. 다른 변경은 하지 마세요.

다시 작성된 독립적인 질문을 <r></r> 태그 안에 반환하세요.
"""
    
    # Jinja2 템플릿으로 프롬프트 생성
    prompt = jenv.from_string(prompt_template).render(history=messages, question=question)
    
    # AI 모델에게 질문 전송
    response = llm.invoke(prompt)
    
    # 응답에서 <r> 태그 안의 내용 추출
    answer = u.extract_tag(response.content, "result")[0]
    
    return answer

**데이터베이스 스키마 정보 확인하기**

AI가 올바른 SQL을 생성하려면 데이터베이스의 구조를 알아야 합니다. SQLite에서 테이블 구조 정보는 `CREATE TABLE` 문으로 확인할 수 있습니다.

In [ ]:
# 데이터베이스 스키마 정보를 가져옵니다
# sqlite_master 테이블에는 데이터베이스의 모든 테이블/인덱스 정보가 들어있습니다
cur = con.cursor()
cur.execute("SELECT * FROM sqlite_master")

# CREATE TABLE 문을 추출합니다 (5번째 컬럼에 저장됨)
DDL = cur.fetchone()[4]
print("데이터베이스 테이블 구조:")
print(DDL)

LLM 호출 과정을 모니터링하기 위해 `BaseCallbackHandler`를 활용합니다. 이를 통해 생성된 SQL 쿼리와 도구 호출 횟수 등의 정보를 수집할 수 있습니다.

In [None]:
class SQLHandler(BaseCallbackHandler):
    def __init__(self):
        self._sql_result = []
        self._num_tool_actions = 0

    def on_agent_action(self, action, **kwargs):
        """Runs on agent action. if the tool being used is sql_db_query,
         it means we're submitting the sql and we can 
         record it as the final sql
        """
        self._num_tool_actions += 1
        if action.tool in ["sql_db_query_checker", "sql_db_query"]:
            self._sql_result.append(action.tool_input)

    def sql_results(self) -> List[str]:
        return self._sql_result

    def num_tool_actions(self) -> int:
        return self._num_tool_actions

SQL 생성 정확도 향상을 위해 스키마 관련 힌트나 주석을 추가할 수 있습니다. 현재 스키마가 단순하여 별도 주석을 추가하지 않았지만, 필요에 따라 여기에 추가할 수 있습니다.

In [None]:
notes: List[str] = []

다음은 [ReAct](https://arxiv.org/pdf/2210.03629) 워크플로우를 제어하는 핵심 프롬프트입니다. 기본적으로 에이전트는 sql_db_schema와 sql_db_list_tables 도구를 사용하여 데이터베이스 메타데이터를 조회하는데, 이는 추가적인 LLM 호출을 발생시켜 응답 시간을 늘립니다. 여기서는 테이블명과 `CREATE TABLE` 문을 직접 제공하고, 해당 도구들을 사용하지 않도록 지시하여 응답 속도를 최적화합니다.

In [None]:
prompt_template = '''
Answer the following questions as best you can.

You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

You might find the following tips useful:
{% for tip in tips %}
  - {{ tip }}
{% endfor %}

The database has the following single table:

{{ table_info }}

You should NEVER have to use either the sql_db_schema tool or the sql_db_list_tables tool
as you know the only table is the "patients" table and you know its schema.

You NEVER can product SELECT statement with no LIMIT clause. You should always have an ORDER BY
clause and a "LIMIT 20" to avoid returning too many useless results.

When describing the final result you don't have to describe HOW the SQL statement worked,
just describe the results.

Begin!

Question: {input}
Thought: {agent_scratchpad}'''

In [None]:
def create_prompt(notes, DDL, question: str):
    prompt_0 = jenv.from_string(prompt_template).render(tips=notes,
                                                        table_info=DDL)
    prompt = PromptTemplate.from_template(prompt_0)
    return prompt

## 질문 처리 시스템

챗봇 구동을 위한 핵심 함수들을 구현합니다.
- `answer_standalone_question`(단일 질문 처리)
- `answer_multiple_questions`(연속 질문 처리) 

현재는 기본적인 형태이지만, [gradio의 ChatBot](https://www.gradio.app/docs/gradio/chatbot) 등의 프레임워크와 연동하여 더 완성도 높은 사용자 인터페이스를 구현할 수 있습니다.

In [None]:
def answer_standalone_question(question: str,
                               messages: List[List[str]]) -> str:
    start_time: float = time()
    if is_conversational and messages:
        question = decontextualize_question(question, messages)
    handler = SQLHandler()
    try:
        agent_executor = create_sql_agent(
            llm=llm,
            toolkit=SQLDatabaseToolkit(db=db, llm=llm),
            verbose=True,
            prompt=create_prompt(notes, DDL, question),
            agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            callbacks=[handler],
            handle_parsing_errors=True)
        for iteration in itertools.count(0):
            try:
                answer = agent_executor.invoke(input={"input": question},
                                               config={"callbacks": [handler]})
                duration = time() - start_time
                iter_str = f", {iteration} iterations" if iteration > 1 else ""
                history_str = f", history {len(messages):,}" if len(messages) > 0 else ""
                sql_result = handler.sql_results()[-1].strip() if len(handler.sql_results()) > 0\
                             else None
                print(f"sql_result: {sql_result}")
                SQL_str = f"\n ```{sql_result}```" if show_SQL and sql_result else ""
                return answer['output'],\
                       f"{duration:.1f} secs, {handler.num_tool_actions():,} actions{iter_str}{history_str} {SQL_str}"
            except ValueError as ex:
                if iteration < 10:
                    print(f"iteration #{iteration}: caught {ex}")
                    print("retrying")
                else:
                    raise ex
    except Exception as ex:
        print(f"Caught: {ex}")
        raise ex

In [None]:
def answer_multiple_questions(questions: List[str]) -> List[Tuple[str, str]]:
    messages: List[Tuple[str, str]] = []
    answers: List[str] = []
    for question in questions:
        answer, extra_info = answer_standalone_question(question, messages)
        answers.append(answer)
        messages.append([question, answer])
    return list(zip(questions, answers))

다음 코드 실행 시 아래와 같은 오류가 발생하면:

![model access error](content/model-access-error.png)

AWS Bedrock 콘솔에서 해당 모델에 대한 액세스 권한을 요청해야 합니다.

In [None]:
answer_standalone_question("How many patients have a BMI over 20 and are older than 30?",
                           [])

In [None]:
answer_multiple_questions(
    ["How many patients have a BMI over 20 and are older than 30?",
     "How many are over 50?"])