# Lab. 1-1 Schema Preparation-1

이 노트북에서는 아래 그림의 1 / 3 과정을 수행합니다. (2는 불필요하여 생략합니다)

복잡한 데이터베이스에서 Text2SQL의 가장 어려운 작업은 쿼리 생성에 필요한 스키마를 선별하는 과정, 즉 Schema Linking 입니다.

현실의 기업 환경에서는 테이블/컬럼 이름이 의미를 축약하고 있어서 LLM이 이를 파악하기 힘들거나, 테이블/컬럼이 너무 많아서 모든 목록을 프롬프트에 담아 전달하는 것이 불가능한 경우가 많습니다.

이를 해결하기 위해, 우리 DB에 맞춰 스키마 설명 문서를 정제하고, LLM에 필요한 컨텍스트를 선별하여 제공하는 작업이 필요합니다. 이 노트북에서는 스키마 준비 과정을 시뮬레이션 하기 위해, Chinook DB 설명 문서를 활용하겠습니다. 전체 작업 흐름은 아래와 같이 이어갈 예정입니다.

![Intro](../images/text2sql/schema-prep-1.png)


## Step 0: OpenSearch 환경 설정

In [1]:
# !pip install -q opensearch-py
# !pip install langchain-aws
# !pip install langchain-community



In [2]:
import sys
from libs.ssm import parameter_store

pm = parameter_store('us-east-1')
# pm = parameter_store('us-west-2')
domain_endpoint = pm.get_params(key="chatbot-opensearch_domain_endpoint", enc=False)
opensearch_domain_endpoint = f"https://{domain_endpoint}"
opensearch_user_id = pm.get_params(key="chatbot-opensearch_user_id", enc=False)
opensearch_user_password = pm.get_params(key="chatbot-opensearch_user_password", enc=True)
print(opensearch_domain_endpoint)


https://search-text2sql-kh-ujitj6xavl2cfvpkafvolvjjlm.us-east-1.es.amazonaws.com


## Step 1: Schema Description 문서 로드 (위 그림의 `1. Schema Loader`)

각 기업에는 Excel / CSV 등으로 스키마 설명 문서가 정의되어 있을 수 있습니다. 이를 Parsing하여 아래의 Schema Description 포맷으로 변경한다고 가정하겠습니다.

```
{
    "table_name": {
        "table_desc": "Description of the table",
        "cols": [
            {
                "col": "Column Name 1",
                "col_desc": "Description of the column including PK info"
            },
            {
                "col": "Column Name 2",
                "col_desc": "Description of the column"
            }
        ]
    }
}
```

초기 설명 문서에는 테이블의 이름과 테이블에 대한 기본 설명, 컬럼 이름과 컬럼에 대한 설명이 포함되어야 합니다. 기업에 잘 정리된 스키마 설명 문서가 없다면, 아주 기본적인 정보만 제공하고 LLM이 이를 증강하여 초기 설명문서 자체를 생성하도록 할 수도 있습니다. 이를 위한 LLM 호출 스크립트는 다음 [링크](https://github.com/kevmyung/db-schema-loader/blob/main/schema_loader.py)를 참고합니다.

In [3]:
import json

file_path = 'database/chinook_schema_kr.json'

with open(file_path, 'r') as file:
    schema_description = json.load(file)

# 한글나오게 해줘
print(json.dumps(schema_description, indent=4, ensure_ascii=False))


[
    {
        "Album": {
            "table_desc": "고유 ID, 제목 및 아티스트 ID를 통한 아티스트 링크가 있는 앨범 데이터를 저장합니다.",
            "cols": [
                {
                    "col": "AlbumId",
                    "col_desc": "기본 키, 앨범의 고유 식별자입니다."
                },
                {
                    "col": "Title",
                    "col_desc": "앨범의 제목입니다."
                },
                {
                    "col": "ArtistId",
                    "col_desc": "앨범의 아티스트를 참조하는 외래 키입니다."
                }
            ]
        },
        "Artist": {
            "table_desc": "ID와 이름이 있는 아티스트 정보를 보유합니다.",
            "cols": [
                {
                    "col": "ArtistId",
                    "col_desc": "기본 키, 아티스트의 고유 식별자입니다."
                },
                {
                    "col": "Name",
                    "col_desc": "아티스트의 이름입니다."
                }
            ]
        },
        "Customer": {
            "table_desc": "고객 세부 정보를 포함하고 지원 담당자에게 연결합니다.",
         

### 이제 Schema Description 문서를 활용해 후속 작업을 이어가겠습니다

## Step 2: SQL2Text 샘플 쿼리 변환 (위 그림의 `3. Query Translator`)

Lab 1 / Lab 2에서 언급했듯이, 좋은 샘플 쿼리를 LLM에게 제공하는 것은 쿼리 작성 뿐만 아니라 Schema Linking에도 도움이 됩니다.

그러나, 대부분의 기업 환경에서 자주 사용되는 쿼리를 로그로 관리하고 있는 반면, (기존에 Text2SQL을 사용하지 않았기 때문에) 쿼리에 매칭되는 자연어 질문은 없습니다. 

Step 2에서는 자주 사용하는 쿼리들을 자연어 질문으로 변환하는 SQL2Text 과정을 진행합니다.

In [6]:
sql_file = './database/chinook_sample_queries_augment.sql'

with open(sql_file, 'r') as file:
    data = file.read()

queries = [query.strip() for query in data.split(';') if query.strip()]

for i, query in enumerate(queries, start=1):
    print(f"Query {i}:\n{query}\n{'-'*80}\n")

Query 1:
SELECT * FROM Artist
--------------------------------------------------------------------------------

Query 2:
SELECT * FROM Album WHERE ArtistId = (SELECT ArtistId FROM Artist WHERE Name = 'AC/DC')
--------------------------------------------------------------------------------

Query 3:
SELECT * FROM Track WHERE GenreId = (SELECT GenreId FROM Genre WHERE Name = 'Rock')
--------------------------------------------------------------------------------

Query 4:
SELECT SUM(Milliseconds) FROM Track
--------------------------------------------------------------------------------

Query 5:
SELECT * FROM Customer WHERE Country = 'Canada'
--------------------------------------------------------------------------------

Query 6:
SELECT COUNT(*) FROM Track WHERE AlbumId = 5
--------------------------------------------------------------------------------

Query 7:
SELECT COUNT(*) FROM Invoice
--------------------------------------------------------------------------------

Query 8:
SEL

쿼리를 해석하기 위해, 각 쿼리에 사용된 테이블/컬럼의 의미를 파악해야 합니다.
따라서, 각 쿼리에 사용된 테이블/컬럼 정보를 아래와 같이 추출합니다.
```
{
  "table": ["table1", "table2", ...],
  "column": ["col1", "col2", ...]
}
```
다음은 SQL 쿼리에 활용된 스키마 목록을 추출하는 LLM 요청 구문입니다.

In [7]:
SYS_PROMPT_TEMPLATE1 = """ 
You are an expert in extracting table names and column names from SQL queries. 
From the provided SQL query, extract all table names and column names used for SELECT, WHERE, and JOIN clauses, excluding asterisks ("*"). 
Ensure that the response is in a valid JSON format that can be used directly with json.load(). 
Skip the preamble and only provide the answer in a JSON document:

{
  "table": ["table1", "table2", ...],
  "column": ["col1", "col2", ...]
}

<example>
SQL:
SELECT * from LOGIS_ADMIN.IAWD_TB_DCBSCD_BASISLC_M 
where basis_lclsf_cd_nm like '%예약구분%'
LIMIT 200;

{
  "table": ["IAWD_TB_DCBSCD_BASISLC_M"],
  "column": ["basis_lclsf_cd_nm"]
}
</example>
"""

USR_PROMPT_TEMPLATE1="""
SQL: {sql}
"""

In [8]:
from langchain_aws import ChatBedrock
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

In [9]:
model_kwargs =  { 
    "max_tokens": 200000,
    "temperature": 0.0,
    "top_k": 250,
    "top_p": 1
}

In [10]:
model_kwargs["system"] = SYS_PROMPT_TEMPLATE1
model1 = ChatBedrock(model_id="anthropic.claude-3-sonnet-20240229-v1:0", region_name='us-east-1', model_kwargs=model_kwargs)
# model1 = ChatBedrock(model_id="anthropic.claude-3-sonnet-20240229-v1:0", region_name='us-west-2', model_kwargs=model_kwargs)
prompt1 = ChatPromptTemplate.from_template(USR_PROMPT_TEMPLATE1)

chain1 = prompt1 | model1 | StrOutputParser()

예를 들어 아래 쿼리에 사용된 스키마를 추출해보겠습니다.

```SELECT CustomerId, SUM(Total) AS TotalPurchase FROM Invoice GROUP BY CustomerId ORDER BY TotalPurchase DESC LIMIT 5``` 

In [11]:
sql = queries[8].strip()
response = chain1.invoke({"sql": sql})
used_schema = json.loads(response)
print(used_schema)

{'table': ['Invoice'], 'column': ['CustomerId', 'Total']}


#### 이제 이 쿼리에 사용된 스키마 설명을 조회합니다.

In [12]:
def extract_descriptions(table_info, tables, columns):
    tables_lower = {table.lower() for table in tables}
    columns_lower = {column.lower() for column in columns}
    
    description = {
        "table": {},
        "column": {}
    }
    
    for table_schema in table_info:
        for table_name, table_info in table_schema.items():
            if table_name.lower() in tables_lower:
                description["table"][table_name] = table_info["table_desc"]
                for col in table_info["cols"]:
                    col_name = col["col"]
                    if col_name.lower() in columns_lower:
                        description["column"][col_name] = col["col_desc"]
    return description

In [13]:
extracted_description = extract_descriptions(schema_description, used_schema['table'], used_schema['column'])
print(extracted_description)

{'table': {'Invoice': '고객과 연결된 거래 세부 정보를 기록합니다.'}, 'column': {'CustomerId': '이 인보이스와 관련된 고객을 참조하는 외래 키입니다.', 'Total': '인보이스의 총 금액입니다.'}}


#### 이제 쿼리에 대한 자연어 변환을 요청합니다.

In [14]:
SYS_PROMPT_TEMPLATE2 = """ 
You are an SQL expert who can understand the intent behind a given SQL query. 
Translate the SQL query into a natural language request in Korean that a real user might make. 

- Keep your translation concise and conversational, mimicking how an actual user would ask for the information sought by the query. 
- Do not reference the <description> section directly and do not use a question form. 
- Ensure to include all conditions specified in the SQL query in the request.
- Write possible business and functional purposes of the query.
- Write very detailed purposes and motives of the query in detail.
- Skip the preamble and phrase only the natural language request using a concise and straightforward tone without a verb ending. 
"""

USR_PROMPT_TEMPLATE2="""
<description>
{description}
</description>

SQL: {sql}
"""

In [15]:
model_kwargs["system"] = SYS_PROMPT_TEMPLATE2
model2 = ChatBedrock(model_id="anthropic.claude-3-sonnet-20240229-v1:0", region_name='us-west-2', model_kwargs=model_kwargs)
prompt2 = ChatPromptTemplate.from_template(USR_PROMPT_TEMPLATE2)
chain2 = prompt2 | model2 | StrOutputParser()

#### 자연어 질문을 생성하는 프롬프트는 아래 형식으로 LLM에 전달됩니다.

In [16]:
print(SYS_PROMPT_TEMPLATE2)
print(prompt2.format(description=extracted_description, sql=queries[8]))

 
You are an SQL expert who can understand the intent behind a given SQL query. 
Translate the SQL query into a natural language request in Korean that a real user might make. 

- Keep your translation concise and conversational, mimicking how an actual user would ask for the information sought by the query. 
- Do not reference the <description> section directly and do not use a question form. 
- Ensure to include all conditions specified in the SQL query in the request.
- Write possible business and functional purposes of the query.
- Write very detailed purposes and motives of the query in detail.
- Skip the preamble and phrase only the natural language request using a concise and straightforward tone without a verb ending. 

Human: 
<description>
{'table': {'Invoice': '고객과 연결된 거래 세부 정보를 기록합니다.'}, 'column': {'CustomerId': '이 인보이스와 관련된 고객을 참조하는 외래 키입니다.', 'Total': '인보이스의 총 금액입니다.'}}
</description>

SQL: SELECT CustomerId, SUM(Total) AS TotalPurchase FROM Invoice GROUP BY CustomerId OR

In [17]:
response = chain2.invoke({"sql": queries[8], "description": extracted_description})
print(response)

고객별 총 구매 금액을 내림차순으로 정렬하여 상위 5명의 고객 ID와 총 구매 금액 조회. 고객 관계 관리 및 마케팅 전략 수립을 위해 최대 지출 고객 파악이 필요할 수 있음. 또한 VIP 고객 관리 프로그램 운영 시 대상 고객 선정에 활용 가능함.


#### 다음 쿼리에 대한 자연어 설명은 LLM에 의해 위와 같이 정의되었습니다.

```SELECT CustomerId, SUM(Total) AS TotalPurchase FROM Invoice GROUP BY CustomerId ORDER BY TotalPurchase DESC LIMIT 5```

#### 아래는 위 과정을 모든 SQL 쿼리에 대해 반복하는 스크립트입니다. (약 1~2분 소요됩니다)

In [30]:
import os

FILE_PATH_1 = 'database/example_queries_temp.jsonl'
def query_translation(table_info, queries, chain1, chain2):
    if os.path.exists(FILE_PATH_1):
        os.remove(FILE_PATH_1)

    with open(FILE_PATH_1, 'a') as output_file:
        for query in queries:
            sql = query.strip()
            
            try:
                response = chain1.invoke({"sql": sql})
                schema = json.loads(response)
            except json.JSONDecodeError:
                print(response)
                time.sleep(1)  

            description = extract_descriptions(table_info, schema["table"], schema["column"])
            
            input = chain2.invoke({"sql": sql, "description": description})
            # Write input and query to the file in JSON format
            data = {"input": input, "query": sql}
            output_file.write(json.dumps(data, ensure_ascii=False) + "\n")
            
query_translation(schema_description, queries, chain1, chain2)

#### 쿼리 변환이 완료된 결과는 `./lab3_text2sql_schema_preparation/example_queries_temp.jsonl` 파일에 저장되어 있습니다. 

In [26]:
with open(FILE_PATH_1, 'r') as file:
    for line in file:
        data = json.loads(line)
        print(data)

{'input': '모든 아티스트의 ID와 이름 정보 조회. 신규 아티스트 데이터베이스 구축, 아티스트 목록 생성, 아티스트 프로필 관리 등의 업무에 활용될 수 있습니다.', 'query': 'SELECT * FROM Artist'}
{'input': "아티스트 이름이 'AC/DC'인 아티스트의 모든 앨범 정보 요청\n\n이 쿼리는 특정 아티스트의 모든 앨범 정보를 검색하는 데 사용됩니다. 아티스트 테이블에서 'AC/DC'라는 이름을 가진 아티스트의 ArtistId를 찾은 다음, 해당 ArtistId를 Album 테이블에서 검색하여 관련 앨범 정보를 가져옵니다. 이를 통해 음악 스트리밍 서비스나 온라인 음악 상점에서 특정 아티스트의 전체 앨범 목록을 표시하거나 아티스트 프로필 페이지에 앨범 정보를 포함시킬 수 있습니다.", 'query': "SELECT * FROM Album WHERE ArtistId = (SELECT ArtistId FROM Artist WHERE Name = 'AC/DC')"}
{'input': '록 장르에 속하는 모든 트랙 정보 조회. 장르 이름을 기준으로 장르 ID를 찾아 해당 장르 ID를 가진 트랙들의 전체 정보를 가져옵니다. 사용자가 특정 장르의 음악을 검색하거나 재생 목록을 만들고자 할 때 유용합니다. 장르별로 트랙을 분류하여 관리하고 원하는 장르의 트랙만 필터링할 수 있습니다.', 'query': "SELECT * FROM Track WHERE GenreId = (SELECT GenreId FROM Genre WHERE Name = 'Rock')"}
{'input': '모든 트랙의 총 지속 시간 밀리초 합계 제공. 음악 스트리밍 서비스에서 전체 콘텐츠 길이를 파악하여 사용자 경험 최적화 및 데이터 사용량 예측에 활용 가능합니다.', 'query': 'SELECT SUM(Milliseconds) FROM Track'}
{'input': '캐나다에 거주하는 고객들의 전체 세부 정보 조회. 캐나다 시장 분석 및 맞춤형 마케팅 전략 수

## Step 3: 샘플 쿼리 벡터 임베딩 및 OpenSearch 저장

이제 <자연어 질문 & SQL 쿼리> 조합의 자연어 질문을 벡터로 임베딩하여, 사용자 질문과 유사한 SQL 쿼리를 찾아내기 용이하도록 저장해야 합니다.

아래 구문은 OpenSearch 환경을 초기화합니다. (연결 생성 및 Index 초기화)

In [27]:
endpoint = opensearch_domain_endpoint
host = [{'host': endpoint.replace("https://", ""),'port': 443}]
host

[{'host': 'search-text2sql-kh-ujitj6xavl2cfvpkafvolvjjlm.us-east-1.es.amazonaws.com',
  'port': 443}]

In [28]:
import yaml
from opensearchpy import OpenSearch, RequestsHttpConnection
INDEX_NAME = "example_queries"

def load_opensearch_config():
    # with open("./libs/opensearch.yml", 'r', encoding='utf-8') as file:
    #     return yaml.safe_load(file)
    with open("./libs/opensearch_fix.yml", 'r', encoding='utf-8') as file:
        return yaml.safe_load(file)

def init_opensearch(config):
    mapping = {"settings": config['settings'], "mappings": config['mappings-sql']}
    endpoint = opensearch_domain_endpoint
    http_auth = (opensearch_user_id, opensearch_user_password)

    os_client = OpenSearch(
            hosts=[{'host': endpoint.replace("https://", ""),'port': 443}],
            http_auth=http_auth, 
            use_ssl=True,
            verify_certs=True,
            timeout=300,
            connection_class=RequestsHttpConnection
    )

    create_os_index(os_client, mapping)
    return os_client

def create_os_index(os_client, mapping):
    exists = os_client.indices.exists(INDEX_NAME)

    if exists:
        os_client.indices.delete(index=INDEX_NAME)
        print("Existing index has been deleted. Create new one.")
    else:
        print("Index does not exist, Create one.")

    os_client.indices.create(INDEX_NAME, body=mapping)

config = load_opensearch_config()
os_client = init_opensearch(config)

Existing index has been deleted. Create new one.


이제 앞에 만들었던 <자연어 질문 & SQL 쿼리>를 벡터 임베딩으로 변환하고, OpenSearch에 bulk indexing 할 수 있는 Data-Action 포맷으로 구성합니다.

In [31]:
from langchain_aws import BedrockEmbeddings

FILE_PATH_2 = 'database/example_queries.jsonl'
emb_model = BedrockEmbeddings(model_id="amazon.titan-embed-text-v2:0", region_name='us-west-2', model_kwargs={"dimensions":1024}) 

def input_embedding(emb_model):
    num = 0
    if os.path.exists(FILE_PATH_2):
        os.remove(FILE_PATH_2)

    with open(FILE_PATH_1, 'r') as input_file, open(FILE_PATH_2, 'a') as output_file:
        for line in input_file:
            data = json.loads(line)
            input = data['input']
            query = data['query']
            
            # Data part
            body = { "input": input, "query": query, "input_v": emb_model.embed_query(input) }

            # Action part
            action = { "index": { "_index": INDEX_NAME, "_id": str(num) } }

            # Write action and body to the file in correct bulk format
            output_file.write(json.dumps(action, ensure_ascii=False) + "\n")
            output_file.write(json.dumps(body, ensure_ascii=False) + "\n")

            num += 1    

input_embedding(emb_model)

#### 위 코드를 실행한 뒤 `./lab3_text2sql_schema_preparation/example_queries.jsonl` 파일을 열어보면, 변환된 임베딩을 확인할 수 있습니다.

In [32]:
FILE_PATH_2

'database/example_queries.jsonl'

In [33]:
with open(FILE_PATH_2, 'r') as file:
    bulk_data = file.read()
        
response = os_client.bulk(body=bulk_data)
if response["errors"]:
    print("There were errors during bulk indexing:")
    for item in response["items"]:
        if 'index' in item and item['index']['status'] >= 400:
            print(f"Error: {item['index']['error']['reason']}")
else:
    print("Bulk-inserted all items successfully.")

Bulk-inserted all items successfully.


#### 이제 OpenSearch에 저장을 완료했습니다.

In [34]:
response

{'took': 40,
 'errors': False,
 'items': [{'index': {'_index': 'example_queries',
    '_id': '0',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 0,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'example_queries',
    '_id': '1',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 0,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'example_queries',
    '_id': '2',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 1,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'example_queries',
    '_id': '3',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 1, 'failed': 0},
    '_seq_no': 0,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'example_queries',
    '_id': '4',
    '_

In [35]:
# 인덱스의 전체 레코드 수 확인
def count_all_records(os_client, index_name):
    try:
        # count API 사용하여 인덱스의 총 문서 수 확인
        count_response = os_client.count(index=index_name)
        total_records = count_response['count']
        print(f"인덱스 '{index_name}'에 총 {total_records}개의 레코드가 있습니다.")
        return total_records
    except Exception as e:
        print(f"레코드 수 확인 중 오류 발생: {str(e)}")
        return None

# 사용 예시
index_name = "example_queries"  # 실제 인덱스 이름으로 변경하세요
total_count = count_all_records(os_client, index_name)

인덱스 'example_queries'에 총 19개의 레코드가 있습니다.


In [36]:
# # 인덱스의 모든 레코드 삭제
# def delete_all_records(os_client, index_name):
#     try:
#         # 방법 1: delete_by_query API 사용 (인덱스 유지, 문서만 삭제)
#         delete_response = os_client.delete_by_query(
#             index=index_name,
#             body={
#                 "query": {
#                     "match_all": {}
#                 }
#             },
#             refresh=True  # 삭제 후 인덱스 리프레시
#         )
        
#         deleted_count = delete_response['deleted']
#         print(f"인덱스 '{index_name}'에서 {deleted_count}개의 레코드가 삭제되었습니다.")
#         return deleted_count
#     except Exception as e:
#         print(f"레코드 삭제 중 오류 발생: {str(e)}")
#         return None

# # 인덱스 자체를 삭제하는 방법 (모든 데이터와 인덱스 구조 제거)
# def delete_index(os_client, index_name):
#     try:
#         delete_response = os_client.indices.delete(index=index_name)
#         if delete_response.get('acknowledged', False):
#             print(f"인덱스 '{index_name}'가 완전히 삭제되었습니다.")
#             return True
#         else:
#             print(f"인덱스 '{index_name}' 삭제 실패.")
#             return False
#     except Exception as e:
#         print(f"인덱스 삭제 중 오류 발생: {str(e)}")
#         return False


# # 레코드만 삭제하고 인덱스 구조는 유지하려면:
# delete_all_records(os_client, index_name)

# # 인덱스 자체를 완전히 삭제하려면:
# # delete_index(os_client, index_name)