In [6]:
!pip install langgraph
!pip install langchain_community
!pip install langchain_openai
!pip install --upgrade langgraph langchain langchain-core langchain-community
!pip install --upgrade openai pydantic typing_extensions



Collecting typing_extensions
  Using cached typing_extensions-4.14.0-py3-none-any.whl.metadata (3.0 kB)
Using cached typing_extensions-4.14.0-py3-none-any.whl (43 kB)
Installing collected packages: typing_extensions
  Attempting uninstall: typing_extensions
    Found existing installation: typing_extensions 4.13.2
    Uninstalling typing_extensions-4.13.2:
      Successfully uninstalled typing_extensions-4.13.2
Successfully installed typing_extensions-4.14.0


In [None]:
import os
from typing import Any, Annotated, Literal
from langchain_core.messages import ToolMessage, AIMessage, HumanMessage
from langchain_core.runnables import RunnableLambda, RunnableWithFallbacks
from langgraph.prebuilt import ToolNode
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain_openai import ChatOpenAI
from langchain_community.utilities import SQLDatabase
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from pydantic import BaseModel, Field
from typing_extensions import TypedDict
from langgraph.graph import END, StateGraph, START
from langgraph.graph.message import AnyMessage, add_messages
# from flask import Flask, request, jsonify
# from flask_cors import CORS
import json
import logging

# Configure Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Environment setup

OPENAI_API_KEY = os.getenv("OPEN_AI_API_KEY")
LLM_MODEL = os.getenv("LLM_MODEL")
llm = ChatOpenAI(model=LLM_MODEL, api_key=OPENAI_API_KEY, reasoning_effort= "high")


# FLASK_PORT = int(os.getenv("FLASK_PORT", 5010))  # Default to 5001
# FLASK_DEBUG = os.getenv("FLASK_DEBUG", "False").lower() == "true"

# Database and tools
db = SQLDatabase.from_uri("sqlite:///coffee_co.db")
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
tools = toolkit.get_tools()
list_tables_tool = next(tool for tool in tools if tool.name == "sql_db_list_tables")
get_schema_tool = next(tool for tool in tools if tool.name == "sql_db_schema")

# Utility functions
def create_tool_node_with_fallback(tools: list) -> RunnableWithFallbacks[Any, dict]:
    """
    Create a ToolNode with a fallback to handle errors and surface them to the agent.
    """
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)], exception_key="error"
    )

def handle_tool_error(state) -> dict:
    error = state.get("error")
    tool_calls = state["messages"][-1].tool_calls
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",
                tool_call_id=tc["id"],
            ) for tc in tool_calls
        ]
    }

@tool
def db_query_tool(query: str) -> str:
    """
    Execute a SQL query against the database and get back the result.
    If the query is not correct, an error message will be returned.
    If an error is returned, rewrite the query, check the query, and try again.
    """
    result = db.run_no_throw(query)
    if not result:
        return "Error: Query failed. Please rewrite your query and try again."
    return result

# Query check
query_check_system = """You are a SQL expert with a strong attention to detail.
Double check the SQLite query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins

If there are any mistakes, rewrite the query. If not, reproduce the original query.
You will call the appropriate tool to execute the query after this check."""
query_check_prompt = ChatPromptTemplate.from_messages(
    [("system", query_check_system), ("placeholder", "{messages}")]
)
query_check = query_check_prompt | llm.bind_tools([db_query_tool], tool_choice="required")

# State definition
class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

# Workflow
workflow = StateGraph(State)

# Add a node for the first tool call
def first_tool_call(state: State) -> dict[str, list[AIMessage]]:
    logger.info("State===>", state)
    return {
        "messages": [
            AIMessage(
                content="",
                tool_calls=[
                    {
                        "name": "sql_db_list_tables",
                        "args": {},
                        "id": "tool_abcd123",
                    }
                ],
            )
        ]
    }

# Nodes
workflow.add_node("first_tool_call",first_tool_call )
workflow.add_node("list_tables_tool", create_tool_node_with_fallback([list_tables_tool]))
workflow.add_node("get_schema_tool", create_tool_node_with_fallback([get_schema_tool]))
workflow.add_node("model_get_schema", lambda state: {"messages": [llm.bind_tools([get_schema_tool]).invoke(state["messages"])]})

# Describe a tool to represent the end state
class SubmitFinalAnswer(BaseModel):
    """Submit the final answer to the user based on the query results."""

    final_answer: str = Field(..., description="The final answer to the user")


query_gen_system = """You are a SQL expert with a strong attention to detail.
Based on the user's question and the database schema, generate a syntactically correct SQLite query to answer the question.
Do not call any tools; just output the SQL query.
Unless the user specifies a number of examples, limit your query to at most 5 results.
Order results by a relevant column to return the most interesting examples.
Never query all columns; select only relevant ones.
If a previous query resulted in an error, rewrite it to fix the mistake.
If you get an empty result set, rewrite the query to get results.
NEVER make up data if information is insufficient; say you don’t have enough info.
DO NOT make DML statements (INSERT, UPDATE, DELETE, DROP etc.).
"""


query_gen_prompt = ChatPromptTemplate.from_messages(
    [("system", query_gen_system), ("placeholder", "{messages}")]
)
query_gen = query_gen_prompt | llm
workflow.add_node("query_gen", lambda state: {"messages": [query_gen.invoke(state)]})

# Corrected model_check_query
workflow.add_node("correct_query", lambda state: {"messages": [query_check.invoke(state)]})

workflow.add_node("execute_query", create_tool_node_with_fallback([db_query_tool]))

# New formulate_final_answer node
final_answer_system = """Based on the query result, formulate the final answer to the user's question and call SubmitFinalAnswer with the final answer."""
final_answer_prompt = ChatPromptTemplate.from_messages(
    [("system", final_answer_system), ("placeholder", "{messages}")]
)
final_answer_gen = final_answer_prompt | llm.bind_tools([SubmitFinalAnswer], tool_choice="required")
workflow.add_node("formulate_final_answer", lambda state: {"messages": [final_answer_gen.invoke(state)]})

# Edges
workflow.add_edge(START, "first_tool_call")
workflow.add_edge("first_tool_call", "list_tables_tool")
workflow.add_edge("list_tables_tool", "model_get_schema")
workflow.add_edge("model_get_schema", "get_schema_tool")
workflow.add_edge("get_schema_tool", "query_gen")
workflow.add_edge("query_gen", "correct_query")
workflow.add_edge("correct_query", "execute_query")

def route_after_execute(state: State) -> Literal["formulate_final_answer", "query_gen"]:
    last_message = state["messages"][-1]
    if isinstance(last_message, ToolMessage) and not last_message.content.startswith("Error:"):
        return "formulate_final_answer"
    return "query_gen"

workflow.add_conditional_edges(
    "execute_query",
    route_after_execute,
    {"formulate_final_answer": "formulate_final_answer", "query_gen": "query_gen"}
)
workflow.add_edge("formulate_final_answer", END)

# Compile
sql_agent = workflow.compile()

# === Example Python Prompt ===
query = "What is the latest date in orders"
state = {"messages": [HumanMessage(content=query)]}
output = sql_agent.invoke(state)
result = output["messages"][-1].tool_calls[0]['args']['final_answer']
print("\nFinal Answer:\n", result)



INFO:__main__:State===>
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



Final Answer:
 The latest date in the orders table is 2025-07-03 00:00:00.
[HumanMessage(content='What is the latest date in orders', additional_kwargs={}, response_metadata={}, id='4acdc17b-ae58-432d-bffe-5f6795b65b32'), AIMessage(content='', additional_kwargs={}, response_metadata={}, id='ce90ecb4-be18-4639-a00a-0097cb745bda', tool_calls=[{'name': 'sql_db_list_tables', 'args': {}, 'id': 'tool_abcd123', 'type': 'tool_call'}]), ToolMessage(content='all_customers, conversions, orders, purchases', name='sql_db_list_tables', id='eb20d447-01e1-479d-8ff2-3273c0223619', tool_call_id='tool_abcd123'), AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_EyRsCSpOGeownvaLffg54Xha', 'function': {'arguments': '{"table_names": "orders"}', 'name': 'sql_db_schema'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 223, 'prompt_tokens': 156, 'total_tokens': 379, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens'

In [4]:
query = "can you get orders by type for the last week full week an the one before using full weeks"
state = {"messages": [HumanMessage(content=query)]}
output = sql_agent.invoke(state)
result = output["messages"][-1].tool_calls[0]['args']['final_answer']
print("\nFinal Answer:\n", result)

INFO:__main__:State===>
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



Final Answer:
 Below is an example query that returns the order counts and the sums of each 'order type' (represented here as the number of 1-star, 2-star, etc. orders) for two consecutive full weeks – the last full week and the week prior to that. In this solution, we define two full weeks using Monday as the start of the week and Sunday as the end of the week. For example:

────────────────────
WITH weeks AS (
  SELECT 
    'last_full_week' AS week_type,
    date('now', '-7 days', 'weekday 0', '-6 days') AS week_start,
    date('now', '-7 days', 'weekday 0') AS week_end
  UNION ALL
  SELECT 
    'week_before_last' AS week_type,
    date('now', '-7 days', 'weekday 0', '-6 days', '-7 days') AS week_start,
    date('now', '-7 days', 'weekday 0', '-7 days') AS week_end
)
SELECT 
  w.week_type,
  w.week_start,
  w.week_end,
  COUNT(o.purchase_date) AS order_count,
  COALESCE(SUM(o."1_star"), 0) AS one_star,
  COALESCE(SUM(o."2_star"), 0) AS two_star,
  COALESCE(SUM(o."3_star"), 0) AS thr

In [None]:
# Need revenue target YTD and FY
# Compute time is long: can precompute, can add more intermediate tables
