In [1]:
from sqlalchemy import create_engine
import os
from langchain.chat_models import init_chat_model
from dotenv import load_dotenv
from urllib.parse import quote_plus

load_dotenv()
os.environ["GOOGLE_API_KEY"] = os.getenv("gemini_api")
model = init_chat_model(model="google_genai:gemini-2.0-flash", temperature=0)

server = os.getenv("SQLSERVER")
database = os.getenv("DATABASE")
username = os.getenv("SERVER_USERNAME")
password = os.getenv("SERVER_PASSWORD")

params = quote_plus(
    f"DRIVER=ODBC Driver 17 for SQL Server;"
    f"SERVER={server};"
    f"DATABASE={database};"
    f"UID={username};"
    f"PWD={password};"
)
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={params}")

In [2]:
from langchain_community.utilities.sql_database import SQLDatabase
from langchain_community.tools.sql_database.prompt import QUERY_CHECKER
db = SQLDatabase(engine)

In [3]:
print(QUERY_CHECKER)


{query}
Double check the {dialect} query above for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins

If there are any of the above mistakes, rewrite the query. If there are no mistakes, just reproduce the original query.

Output the final SQL query only.

SQL Query: 


In [4]:
from langchain.tools import tool
from pydantic import BaseModel,Field
from langchain.messages import HumanMessage
import time
from langchain.agents import create_agent

class StructuredQuery(BaseModel):
    """Structured Query format extracted from the final query"""
    time.sleep(5)
    query: str = Field(description="Contains the MS SQL Query") 

@tool
def get_table_names(question: str) -> str:
    """Input is question string, Output is a comma-separated list of tables in the database."""
    time.sleep(5)
    return db.get_usable_table_names()

@tool
def get_schema(table_names: list[str]) -> str:
    """Input to this tool is a comma-separated list of tables, output is the schema and sample rows for those tables. Be sure that the tables actually exist by calling get_table_names first! Example Input: table1, table2, table3"""
    time.sleep(5)
    return db.get_table_info_no_throw(table_names)


@tool
def query_checker(query: str) -> str:
    """Use this tool to double check if your query is correct before executing it. Always use this tool before executing a query with run_sql_query!"""
    time.sleep(5)
    formatted_prompt = QUERY_CHECKER.format(query=query, dialect=db.dialect)
    response = model.invoke([HumanMessage(content=formatted_prompt)])
    return response.content

@tool
def run_sql_query(correct_query) -> str:
    """Input to this tool is a detailed and correct SQL query, output is a result from the database. If the query is not correct, an error message will be returned. If an error is returned, rewrite the query, check the query, and try again. If you encounter an issue with Unknown column 'xxxx' in 'field list', use get_schema to query the correct table fields."""
    time.sleep(5)
    return db.run_no_throw(correct_query)

system_prompt = """You are an AI agent designed to interact with a SQL database. 
Your task is to answer the userâ€™s question by generating a syntactically correct SQL query in {dialect}. 
Begin by discovering the database structure: always use get_table_names() to list all tables,
 and then use get_schema(table_name) to examine the columns and relationships of relevant tables.
   Do not guess any table or column names. Next, carefully generate the SQL query, considering filters,
     joins, and aggregations as needed. If the user does not specify a limit, restrict the query to {top_k} rows. 
     Before submitting, you must verify your work: first use query_checker(query) to check syntax, 
     and then strictly use run_sql_query(query) to ensure the query executes against the database without error. 
     If run_sql_query returns an error, you must rewrite and re-test the query. 
     Once verified, submit only the final executable SQL query as your answer,
       not the query results.""".format(dialect=db.dialect, top_k=5)

agent = create_agent(model, tools=[get_table_names, get_schema, query_checker,run_sql_query],
                     system_prompt=system_prompt,response_format=StructuredQuery)

question = """Analyze the sales performance of our physical retail footprint by calculating total 
revenue per offline store. Please rank the stores from highest to lowest performing locations to identify our 
top contributors and underperforming branches."""

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": question}]},
     
    stream_mode="values"
):
    message_object = chunk["messages"][-1]
    message_object.pretty_print()
    print("-" * 20)


Analyze the sales performance of our physical retail footprint by calculating total 
revenue per offline store. Please rank the stores from highest to lowest performing locations to identify our 
top contributors and underperforming branches.
--------------------
Tool Calls:
  get_table_names (dd176bd2-e013-4f3f-8eda-5c7745f4bf6b)
 Call ID: dd176bd2-e013-4f3f-8eda-5c7745f4bf6b
  Args:
    question: tables with store information and sales data
--------------------
Name: get_table_names

["campaign", "customer", "interaction", "transaction"]
--------------------
Tool Calls:
  get_schema (212fe4cc-c96c-439f-a0da-29d565ae2ade)
 Call ID: 212fe4cc-c96c-439f-a0da-29d565ae2ade
  Args:
    table_names: ['transaction']
--------------------
Name: get_schema


CREATE TABLE [transaction] (
	[index] BIGINT NULL, 
	transaction_id VARCHAR(max) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, 
	customer_id VARCHAR(max) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, 
	product_name VARCHAR(max) COLLATE SQL_Latin1