In [None]:
import os
from typing import List,Any,Dict,Optional

from deepagents import create_deep_agent
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
import sqlite3
import threading
load_dotenv()

In [None]:
llm=ChatOpenAI(model="gpt-4o-mini",temperature=0)
answer=llm.invoke("hi")

In [None]:
customers = [
        (1, "Acme GmbH", "DE"),
        (2, "Blue Ocean SARL", "FR"),
        (3, "Nordic Trading AB", "SE"),
        (4, "Sunrise Ltd", "UK"),
        (5, "Iberia SA", "ES"),
      ]

products = [
        (1, "SKU-001", "Keyboard", 49.90),
        (2, "SKU-002", "Mouse", 19.90),
        (3, "SKU-003", "Monitor", 199.00),
        (4, "SKU-004", "USB-C Dock", 129.00),
        (5, "SKU-005", "Laptop Stand", 39.00),
        (6, "SKU-006", "Noise Cancelling Headphones", 299.00),
      ]
orders = [
    (1001, 1, "2026-01-01"),
    (1002, 2, "2026-01-02"),
    (1003, 1, "2026-01-03"),
    (1004, 3, "2026-01-04"),
    (1005, 4, "2026-01-05"),
    (1006, 2, "2026-01-06"),
    (1007, 5, "2026-01-07"),
    (1008, 1, "2026-01-08"),
  ]
order_items = [
        (1, 1001, 1, 2, 49.90),
        (2, 1001, 2, 3, 19.90),
        (3, 1002, 3, 1, 199.00),
        (4, 1003, 4, 1, 129.00),
        (5, 1003, 2, 2, 19.90),
        (6, 1004, 1, 1, 49.90),
        (7, 1004, 5, 2, 39.00),
        (8, 1005, 6, 1, 299.00),
        (9, 1006, 2, 10, 19.90),
        (10, 1007, 5, 1, 39.00),
        (11, 1008, 3, 2, 199.00),
      ]

In [None]:
products

In [None]:
def create_demo_db() -> sqlite3.Connection:
    conn = sqlite3.connect(":memory:", check_same_thread=False)
    conn.row_factory = sqlite3.Row
    cur = conn.cursor()

    cur.executescript(
    '''
    CREATE TABLE customers (
      customer_id INTEGER PRIMARY KEY,
      name TEXT NOT NULL,
      country TEXT NOT NULL
    );

    CREATE TABLE products (
      product_id INTEGER PRIMARY KEY,
      sku TEXT NOT NULL,
      name TEXT NOT NULL,
      price_eur REAL NOT NULL
    );

    CREATE TABLE orders (
      order_id INTEGER PRIMARY KEY,
      customer_id INTEGER NOT NULL,
      order_date TEXT NOT NULL,
      FOREIGN KEY (customer_id) REFERENCES customers(customer_id)
    );

    CREATE TABLE order_items (
      order_item_id INTEGER PRIMARY KEY,
      order_id INTEGER NOT NULL,
      product_id INTEGER NOT NULL,
      quantity INTEGER NOT NULL,
      unit_price_eur REAL NOT NULL,
      FOREIGN KEY (order_id) REFERENCES orders(order_id),
      FOREIGN KEY (product_id) REFERENCES products(product_id)
    );
    '''
   )
    cur.executemany("INSERT INTO customers VALUES (?, ?, ?)", customers)
    cur.executemany("INSERT INTO products VALUES (?, ?, ?, ?)", products)
    cur.executemany("INSERT INTO orders VALUES (?, ?, ?)", orders)
    cur.executemany("INSERT INTO order_items VALUES (?, ?, ?, ?, ?)", order_items)

    conn.commit()
    return conn

CONN = create_demo_db()
SQLITE_LOCK = threading.Lock()

In [50]:
def _normalize_sql(sql: str) -> str:
    s = (sql or "").strip()
    while s.endswith(";"):
        s = s[:-1].rstrip()
    return s

def _is_read_only_sql(sql: str) -> bool:
    s = _normalize_sql(sql).lstrip().lower()
    return s.startswith("select") or s.startswith("with")

In [None]:
@tool("get_schema")
def get_schema() -> Dict[str, Any]:
    """Return the current SQLite schema (tables -> columns[name,type])."""
    tables: Dict[str, List[Dict[str, Any]]] = {}

    with SQLITE_LOCK:
        cur = CONN.execute(
            """
            SELECT name
            FROM sqlite_master
            WHERE type='table'
              AND name NOT LIKE 'sqlite_%'
            ORDER BY name
            """
        )
        table_names = [r[0] for r in cur.fetchall()]

        for t in table_names:
            cols: List[Dict[str, Any]] = []
            for row in CONN.execute(f"PRAGMA table_info({t})").fetchall():
                cols.append(
                    {
                        "name": row[1],
                        "type": row[2],
                        "notnull": bool(row[3]),
                        "pk": bool(row[5]),
                    }
                )
            tables[t] = cols

    return {"tables": tables}

In [None]:
@tool("execute_sql")
def execute_sql(
    sql: str,
    params: Optional[Dict[str, Any]] = None,
    hard_limit: int = 200,
) -> Dict[str, Any]:
    """Execute a read-only SQLite query and return rows as JSON-friendly dicts.

    Guardrails:
    - Only SELECT/WITH allowed.
    - Applies an outer LIMIT (hard_limit) so you don't accidentally fetch huge result sets.
    """
    if params is None:
        params = {}

    s = _normalize_sql(sql)
    if not _is_read_only_sql(s):
        raise ValueError("Only read-only SELECT/WITH statements are allowed.")

    wrapped_sql = "SELECT * FROM (" + s + ") AS __q LIMIT :__hard_limit"
    exec_params = dict(params)
    exec_params["__hard_limit"] = int(hard_limit)

    with SQLITE_LOCK:
        cur = CONN.execute(wrapped_sql, exec_params)
        rows = cur.fetchall()
        cols = [d[0] for d in cur.description] if cur.description else []

    return {"columns": cols, "rows": [dict(r) for r in rows]}

In [None]:
MAIN_AGENT_PROMPT = """You are a SQL orchestrator.

You have two subagents:
- sql_author: drafts a parameterized SQLite SELECT/WITH query.
- sql_auditor: reviews (and if needed fixes) the query.

Workflow:
1) Ask sql_author to draft SQL for the user's question.
2) Ask sql_auditor to review/fix the SQL.
3) Execute the final SQL using the execute_sql tool (pass params).
4) Answer the user with the result. Be concise and include the rows.

Important:
- Only read-only SELECT/WITH queries are allowed.
- Do not invent tables/columns; rely on get_schema when needed.
"""

In [None]:
SQL_AUTHOR_PROMPT = """You are sql_author. You write parameterized SQLite SELECT/WITH queries.

Rules:
- You may call the tool get_schema to inspect tables/columns.
- Return ONLY JSON with keys: sql, params, notes.
- sql must be a single SQLite SELECT/WITH statement.
- Use named parameters like :country, :start_date, :end_date when helpful.
- Do not call execute_sql.
"""

In [None]:
SQL_AUDITOR_PROMPT = """You are sql_auditor. You review and fix SQL for correctness, safety, and clarity.

Rules:
- You may call the tool get_schema to verify tables/columns.
- Return ONLY JSON with keys: sql, params, audit_notes.
- Ensure the SQL is a single SELECT/WITH statement (read-only).
- If you change the SQL or params, explain why briefly in audit_notes.
- Prefer explicit column lists (avoid SELECT *).
"""

In [None]:
subagents = [
    {
        "name": "sql_author",
        "description": "Write parameterized SQLite SELECT/WITH queries from user intent.",
        "system_prompt": SQL_AUTHOR_PROMPT,
        "prompt": SQL_AUTHOR_PROMPT,
        "tools": [get_schema],
    },
    {
        "name": "sql_auditor",
        "description": "Review SQL for correctness, safety, and policy compliance.",
        "system_prompt": SQL_AUDITOR_PROMPT,
        "prompt": SQL_AUDITOR_PROMPT,
        "tools": [get_schema],
    },
]

In [54]:
agent = create_deep_agent(
    model=llm,
    tools=[get_schema, execute_sql],
    system_prompt=MAIN_AGENT_PROMPT,
    subagents=subagents,
    debug=False,
    name="sql-orchestrator",
)

In [52]:
def ask(question: str) -> str:
    state = agent.invoke({"messages": [HumanMessage(content=question)]})
    return state["messages"][-1].content

In [53]:
print(ask('Which 3 customers have the highest revenue (sum(quantity * unit_price_eur))?'))

[1m[values][0m {'messages': [HumanMessage(content='Which 3 customers have the highest revenue (sum(quantity * unit_price_eur))?', additional_kwargs={}, response_metadata={}, id='319de0eb-4edb-4db3-9b0d-22218ef5b342')]}
[1m[updates][0m {'PatchToolCallsMiddleware.before_agent': {'messages': Overwrite(value=[HumanMessage(content='Which 3 customers have the highest revenue (sum(quantity * unit_price_eur))?', additional_kwargs={}, response_metadata={}, id='319de0eb-4edb-4db3-9b0d-22218ef5b342')])}}
[1m[values][0m {'messages': [HumanMessage(content='Which 3 customers have the highest revenue (sum(quantity * unit_price_eur))?', additional_kwargs={}, response_metadata={}, id='319de0eb-4edb-4db3-9b0d-22218ef5b342')]}
[1m[updates][0m {'SummarizationMiddleware.before_model': None}
[1m[updates][0m {'model': {'messages': [AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 10, 'prompt_tokens': 4644, 'total_tokens': 4654, 'comp

In [46]:
print(ask('Highly Ordered Products'))

Here are the highly ordered products based on the total quantity ordered:

| Product ID | Name                          | Total Quantity Ordered |
|------------|-------------------------------|------------------------|
| 2          | Mouse                         | 15                     |
| 1          | Keyboard                      | 3                      |
| 3          | Monitor                       | 3                      |
| 5          | Laptop Stand                  | 3                      |
| 4          | USB-C Dock                    | 1                      |
| 6          | Noise Cancelling Headphones    | 1                      |

If you need further analysis or details, feel free to ask!
