In [25]:
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableMap, RunnableLambda
from sqlalchemy import create_engine
import tqdm
import time
import pickle
import os


from dotenv import load_dotenv

load_dotenv()


True

In [26]:
table_description = {

    'distributor_closing_stock': '''It contains data related to the product stock available with each distributor at the end of a specific date.
sku_code, sku_name, and sku_short_description describe the product.
brand_name and brand_code indicate product branding.
distributor_code identifies the distributor.
quantity gives the amount of product in stock.
closing_stock_date denotes when the stock count was recorded.''',

    'retailer_order_summary': '''Contains summary-level information about each retailer's order.
Includes retailer_code and order_number for identification.
shipping_details and city_code represent delivery info.
subtotal, total_discount, shipping_charge, and total_amount capture the cost breakdown.
order_status represents the order state.
created_at and updated_at show the order creation and update timestamps.''',

    'retailer_order_product_details': '''Contains item-level information for each order placed by a retailer.
order_number and sku_code identify which products belong to which order.
sku_name describes the product.
order_quantity shows how many units were ordered.
price and subtotal give financial data.
created_at indicates when the record was logged.''',

    'product_master': '''It contains master-level information about products including hierarchy and pricing.
Includes product segmentation (segment_code, segment_name_category), brand_code, and brand_name.
sku_code and sku_name identify products.
sku_short_description gives product description.
mrp is the maximum retail price.
price_to_retailer indicates the purchase price for the retailer.
active indicates whether the product is currently sold.''',

    'retailer_master': '''Contains master information about retailers and their associations.
Includes regional and sales hierarchy (region_code, rsm_code, so, salesman_code).
retailer_code and retailer_name identify the retailer.
distributor_code and distributor_name link retailers to distributors.
distributor_type and distributor_channel_type define channel info.
distributor_city and distributor_state indicate the location.
warehouse_id links to logistics warehouse.''',

    'scheme_details': '''Contains information about promotional schemes applied to products.
Includes scheme_name, discount_percent, and is_active to define the scheme.
sku_code indicates which product the scheme applies to.
start_date_time and end_date_time represent validity.
scheme_type, apply_type, and scheme_group_id show application details.
level_1 and level_2 are metadata fields for scheme segmentation.'''

}


In [27]:
engine = create_engine(
        "mysql+pymysql://root:Iameighteeni%4018@127.0.0.1:3306/txt2sql"
    )

def read_sql(table):

# Query to get shuffled rows and limit to 5
    query = "SELECT * FROM {} ORDER BY RAND() LIMIT 5;".format(table)

    # Execute and load into DataFrame
    df_sample = pd.read_sql(query, con=engine)
    return df_sample

In [28]:
llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0,
    max_tokens=None,
    timeout=None,
    max_retries=2,
    # api_key="...",  # if you prefer to pass api key in directly instaed of using env vars
    # base_url="...",
    # organization="...",
    # other params...
)

template = ChatPromptTemplate.from_messages([
    ("system", """
You are an intelligent data annotator. Please annotate data as mentioned by human and give output without any verbose and without any additional explantion.
You will be given sql table description and sample columns from the sql table. The description that you generate will be given as input to text to sql automated system.
Output of project depends on how you generate description. Make sure your description has all possible nuances.

"""),

    ("human", '''

- Based on the column data, please generate description of entire table along with description for each column and sample values(1 or 2) for each column.
- While generating column descriptions, please look at sql table description given to you and try to include them in column description. 
- DONT write generic description like "It provides a comprehensive view of the order lifecycle from purchase to delivery". Just write description based on what you see in columns.

      
Context regarding the tables:
These tables are provided by a supply shain food based company. Retailers buy the different products(skus) from the distributors if the distributor has the available stock. 
    

Output should look like below in form of list of strings and lists properly. MAKE SURE YOU CLOSE THE QUOTES in list of strings properly always.
["<table description based on all column values>" , [["<column 1> : Detail description of column along with datatype, <sample values:v1,v2 etc(indicate there are more values)>"],
["<column 2> : Detail description of column 2 along with datatype, <sample values:v1,v2 etc(indicate there are more values)>"]]  
]
     
SQL table description:
{description}

Sample rows from the table:
{data_sample}     

     ''')
])

# Fix the RunnableMap implementation
chain = (
    RunnableMap({
        "description": lambda x: x["description"],
        "data_sample": lambda x: x["data_sample"]
    })
    | template
    | llm
    | StrOutputParser()
)

In [29]:
def get_annotated_description(table_name: str, description: str) -> str:
    try:
        df = pd.read_sql(f"SELECT * FROM {table_name} ORDER BY RAND() LIMIT 5", engine)
        sample_text = df.head().to_markdown(index=False)
        result = chain.invoke({
            "description": description,
            "data_sample": sample_text
        })
        return result
    except Exception as e:
        print(f"Error for table {table_name}: {e}")
        return None

In [30]:
all_outputs = []
dict_knowledge = {}
for table_name, desc in table_description.items():
    annotated_text = get_annotated_description(table_name, desc)
    if annotated_text:
        markdown_block = f"### **{table_name}**\n```json\n{annotated_text}\n```\n"
        dict_knowledge[table_name] = markdown_block
        all_outputs.append(markdown_block)
with open("annotated_schema.md", "w", encoding="utf-8") as f:
    f.write("\n\n".join(all_outputs))
with open('kb.pkl', 'wb') as f:
    pickle.dump(dict_knowledge, f)
print("✅ Annotated markdown saved to 'annotated_schema.md'")

✅ Annotated markdown saved to 'annotated_schema.md'
