In [45]:
import pandas as pd
import tqdm
import time
import pickle
import ast
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableMap
from langchain_openai import ChatOpenAI
from sqlalchemy import create_engine
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import json

In [46]:
table_description = {

'customer' : '''Contains customer information.
Columns:
- customer_id: unique identifier for each customer
- customer_unique_id: another unique identifier for each customer
- customer_zip_code_prefix: postal code prefix of the customer
- customer_city: city of the customer
- customer_state: state of the customer''',

'orders'  : '''Contains order information.
Columns:
- order_id: unique identifier for each order
- order_item_id: sequential number of items in the order
- product_id: foreign key referencing products.product_id
- seller_id: foreign key referencing sellers.seller_id
- shipping_limit_date: latest date the seller can hand over the item to the logistics partner
- price: price of the item in the order
- freight_value: freight cost for the item in the order''',

'order_items'  : '''Contains item-level information for each order.
Columns:
- order_id: foreign key referencing orders.order_id
- order_item_id: sequential number of the item in the order
- product_id: foreign key referencing products.product_id
- seller_id: foreign key referencing sellers.seller_id
- shipping_limit_date: latest date for the seller to hand over the item to logistics
- price: price of the item
- freight_value: freight cost of the item''',

'order_payments'  : '''Contains payment details for each order.
Columns:
- order_id: foreign key referencing orders.order_id
- payment_sequential: sequence number of the payment for an order, used if the customer pays with multiple methods
- payment_type: method of payment used
- payment_installments: number of installments chosen for this payment
- payment_value: value of this specific payment transaction''',

'order_reviews'  : '''Contains customer reviews of orders.
Columns:
- review_id: unique identifier for each review
- order_id: foreign key referencing orders.order_id
- review_score: numeric score given by the customer
- review_comment_title: short title or summary of the review provided by the customer
- review_comment_message: full text comment provided by the customer
- review_creation_date: date when the review was submitted by the customer
- review_answer_timestamp: date when the seller or platform responded to the review (if any)''',

'products'  : '''Contains product information.
Columns:
- product_id: unique identifier for each product
- product_category_name: category of the product in Brazilian Portuguese
- product_name_lenght: length of the product name in characters
- product_description_lenght: length of the product description in characters
- product_photos_qty: number of photos available for the product
- product_weight_g: weight of the product in grams
- product_length_cm: product length in centimeters
- product_height_cm: product height in centimeters
- product_width_cm: product width in centimeters''',

'product_category_translation' : '''Maps product categories from Brazilian Portuguese to English.
Columns:
- product_category_name: category name in Portuguese
- product_category_name_english: translated category name''',

'sellers'  : '''Contains seller information.
Columns:
- seller_id: unique identifier for each seller
- seller_zip_code_prefix: postal code prefix of the seller's location
- seller_city: city of the seller
- seller_state: state of the seller''',

}

In [47]:
# Load environment variables from .env file
load_dotenv()

# Read credentials from environment variables
host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
database = os.getenv("DB_NAME")

# Create SQLAlchemy engine
engine = create_engine(
    f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}"
)

try:
    with engine.connect() as conn:
        print("MySQL connection successful!")
except Exception as e:
    print("Connection failed:", e)


def read_sql_sample(table, n=5):
    """
    Returns:
    - df_sample: Pandas DataFrame with n random rows
    - sample_json: list of dicts (JSON style)
    - column_types: dict {column_name: datatype}
    """
    query = f"SELECT * FROM {table} ORDER BY RAND() LIMIT {n};"
    df_sample = pd.read_sql(query, con=engine)

    sample_json = df_sample.to_dict(orient='records')
    column_types = {col: str(dtype) for col, dtype in zip(df_sample.columns, df_sample.dtypes)}

    return df_sample, sample_json, column_types

MySQL connection successful!


In [48]:
model = ChatOpenAI(
    model="gpt-5.2",
    temperature=0.2,
    max_tokens=1000,
    timeout=30
)

template = ChatPromptTemplate.from_messages([
("system", """
You are an intelligent data annotator. Annotate SQL tables with detailed column-level descriptions.
Use the sample data and column types provided to generate JSON output.
Do not invent columns. Use exact column names.
Output must be valid JSON in the following format:

{{
  "table_description": "<table description>",
  "columns": [
    {{
      "column_name": "<column name>",
      "description": "<detailed description>",
      "datatype": "<datatype>",
      "sample_values": ["v1", "v2"]
    }}
  ]
}}

Always include 1-2 sample values per column.
Do not add any extra text.
"""),
("human", """
SQL table description:
{description}

Sample rows from the table:
{data_sample}

Column data types:
{column_types}
""")
])


# chain = (
#     RunnableMap({
#         "description": lambda x: x["description"],
#         "data_sample": lambda x: x["data_sample"],
#         "column_types": lambda x: x["column_types"]
#     })
#     | template
#     | model
#     | StrOutputParser()
# )

chain = template | model | StrOutputParser()

In [50]:
kb_final = {}

for table_name, description in tqdm.tqdm(table_description.items()):
    df, sample_json, column_types = read_sql_sample(table_name, n=5)

    # Convert to JSON string for LLM
    sample_str = json.dumps(sample_json[:2], ensure_ascii=False)  # only 1-2 rows
    types_str = json.dumps(column_types, ensure_ascii=False)

    response = chain.invoke({
        "description": description,
        "data_sample": sample_str,
        "column_types": types_str
    }).replace('```', '')

    try:
        kb_final[table_name] = json.loads(response)
    except json.JSONDecodeError:
        print(f"JSON parse error for table: {table_name}")
        print("LLM response:", response)
        kb_final[table_name] = None

100%|█████████████████████████████████████████████| 8/8 [01:13<00:00,  9.17s/it]


In [51]:
kb_final

{'customer': {'table_description': 'Stores customer location and identifier information, including customer IDs, postal code prefix, city, and state for each customer record.',
  'columns': [{'column_name': 'customer_id',
    'description': 'Primary unique identifier for a customer record in the dataset; typically a system-generated alphanumeric hash used to reference the customer in related tables (e.g., orders).',
    'datatype': 'object',
    'sample_values': ['60c9c9568fdcbbccfc7ddc387f76e200',
     '8877f3c1264d413a5033eca3d7510da2']},
   {'column_name': 'customer_unique_id',
    'description': 'Secondary unique identifier representing the underlying customer identity; may link multiple customer records that belong to the same person across different transactions or registrations.',
    'datatype': 'object',
    'sample_values': ['5cdbff27ae9bb0812d23e36f517feb23',
     '1c78cc523e89a9c1d0beef9598566562']},
   {'column_name': 'customer_zip_code_prefix',
    'description': "Numeric

In [52]:
with open('kb.pkl', 'wb') as f:
    pickle.dump(kb_final, f)