# NL→SQL Demo Notebook (Azure OpenAI + Azure SQL)

This notebook demonstrates the NL→Intent→SQL pipeline step by step, aligned with the project's flowchart.
Each step includes explanation cells for demo purposes (e.g., for a TERADATA partner presentation).

Prereqs: ensure you have a valid .env configured for Azure OpenAI and Azure SQL (see README).

In [1]:
# 1) Imports and environment bootstrap
import os, re, sys
from typing import List, Dict, Any
from dotenv import load_dotenv
load_dotenv()
print('Environment loaded. Project root:', os.getcwd())

Environment loaded. Project root: /Users/arturoquiroga/GITHUB/AQ-NEW-NL2SQL/docs


In [2]:
# Run & verbosity gate setup (placed early so checkpoints can call it)
import time
if 'RUN_START_TS' not in globals():
    RUN_START_TS = time.time()  # capture start as early as possible

# Toggle to reduce verbosity of intermediate checkpoint prints
if 'SHOW_COSTS' not in globals():
    SHOW_COSTS = True

def maybe_print_checkpoint(header: str):
    try:
        if SHOW_COSTS:
            print_usage_and_cost(DEPLOYMENT_NAME, header=header)
    except NameError:
        # Helpers not yet loaded; safe no-op
        pass

In [3]:
# Enable global token/cost tracking early in the notebook
import os, json, re
from typing import Any, Dict, Optional, Tuple
from pathlib import Path
import types

# Accumulator
TOKEN_USAGE = globals().get("TOKEN_USAGE", {"prompt": 0, "completion": 0, "total": 0})

def accumulate_usage(usage: Optional[Dict[str, Any]]):
    if not usage:
        return
    TOKEN_USAGE["prompt"] += int(usage.get("prompt_tokens", 0) or 0)
    TOKEN_USAGE["completion"] += int(usage.get("completion_tokens", 0) or 0)
    if usage.get("total_tokens") is not None:
        TOKEN_USAGE["total"] += int(usage.get("total_tokens") or 0)
    else:
        TOKEN_USAGE["total"] = TOKEN_USAGE["prompt"] + TOKEN_USAGE["completion"]

def extract_usage_from_rest(resp_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    return resp_json.get("usage") if isinstance(resp_json, dict) else None

def extract_usage_from_langchain(ai_msg: Any) -> Optional[Dict[str, Any]]:
    try:
        meta = getattr(ai_msg, "response_metadata", None) or {}
        tu = meta.get("token_usage")
        if isinstance(tu, dict):
            return {
                "prompt_tokens": tu.get("prompt_tokens") or tu.get("input_tokens") or 0,
                "completion_tokens": tu.get("completion_tokens") or tu.get("output_tokens") or 0,
                "total_tokens": tu.get("total_tokens") or (tu.get("prompt_tokens", 0) or 0) + (tu.get("completion_tokens", 0) or 0),
            }
        um = getattr(ai_msg, "usage_metadata", None)
        if isinstance(um, dict):
            return {
                "prompt_tokens": um.get("input_tokens") or um.get("prompt_tokens") or 0,
                "completion_tokens": um.get("output_tokens") or um.get("completion_tokens") or 0,
                "total_tokens": um.get("total_tokens") or (um.get("input_tokens", 0) or 0) + (um.get("output_tokens", 0) or 0),
            }
    except Exception:
        pass
    return None

def normalize_dep(name: str) -> str:
    return re.sub(r"[^A-Za-z0-9]+", "_", (name or "")).upper().strip("_")

def load_pricing() -> Dict[str, Dict[str, Any]]:
    # search repo root for azure_openai_pricing.json
    here = Path.cwd()
    candidates = [here / "azure_openai_pricing.json", here.parent / "azure_openai_pricing.json"] if here.name == "docs" else [here / "azure_openai_pricing.json"]
    for p in candidates:
        if p.exists():
            try:
                return json.loads(p.read_text())
            except Exception:
                return {}
    return {}

def get_target_currency() -> str:
    cur = (os.getenv("AZURE_OPENAI_PRICE_CURRENCY", "USD") or "USD").upper()
    return cur if cur in ("USD", "CAD") else "USD"

def convert_currency(amount: float, from_cur: str, to_cur: str) -> Optional[float]:
    if from_cur == to_cur:
        return amount
    if from_cur == "USD" and to_cur == "CAD":
        rate = os.getenv("AZURE_OPENAI_PRICE_USD_TO_CAD")
        return amount * float(rate) if rate else None
    if from_cur == "CAD" and to_cur == "USD":
        rate = os.getenv("AZURE_OPENAI_PRICE_CAD_TO_USD")
        return amount * float(rate) if rate else None
    return None

def get_pricing_for_deployment(dep_name: Optional[str]) -> Tuple[Optional[float], Optional[float], str, str]:
    dep = dep_name or ""
    dep_key = normalize_dep(dep)
    target = get_target_currency()

    # 1) Deployment-specific env
    in_env = os.getenv(f"AZURE_OPENAI_PRICE_{dep_key}_INPUT_PER_1K")
    out_env = os.getenv(f"AZURE_OPENAI_PRICE_{dep_key}_OUTPUT_PER_1K")
    if in_env and out_env:
        try:
            return float(in_env), float(out_env), f"env:{dep_key}", target
        except ValueError:
            pass

    # 2) Global env
    in_glob = os.getenv("AZURE_OPENAI_PRICE_INPUT_PER_1K")
    out_glob = os.getenv("AZURE_OPENAI_PRICE_OUTPUT_PER_1K")
    if in_glob and out_glob:
        try:
            return float(in_glob), float(out_glob), "env:global", target
        except ValueError:
            pass

    # 3) JSON file
    pm = load_pricing()
    entry = pm.get(dep.lower()) or pm.get(dep)
    if isinstance(entry, dict):
        # nested per-currency
        cur_block = entry.get(target)
        if isinstance(cur_block, dict) and "input_per_1k" in cur_block and "output_per_1k" in cur_block:
            try:
                return float(cur_block["input_per_1k"]), float(cur_block["output_per_1k"]), "file:azure_openai_pricing.json", target
            except Exception:
                pass
        # flat (assumed USD)
        if "input_per_1k" in entry and "output_per_1k" in entry:
            try:
                in_usd = float(entry["input_per_1k"])
                out_usd = float(entry["output_per_1k"])
                if target == "USD":
                    return in_usd, out_usd, "file:azure_openai_pricing.json", "USD"
                i = convert_currency(in_usd, "USD", target)
                o = convert_currency(out_usd, "USD", target)
                if i is not None and o is not None:
                    return i, o, "file:azure_openai_pricing.json (converted)", target
                return in_usd, out_usd, "file:azure_openai_pricing.json (USD; no conversion)", "USD"
            except Exception:
                pass
    return None, None, "unset", target

def print_usage_and_cost(dep_name: Optional[str], header: Optional[str] = None):
    in_p, out_p, src, cur = get_pricing_for_deployment(dep_name)
    pt = TOKEN_USAGE["prompt"]
    ct = TOKEN_USAGE["completion"]
    tt = TOKEN_USAGE["total"] or (pt + ct)
    if header:
        print(f"\n==== {header} ====")
    print("\n==== TOKEN USAGE (cumulative) ====")
    print(f"Input tokens: {pt}")
    print(f"Completion tokens: {ct}")
    print(f"Total tokens: {tt}")
    if in_p is not None and out_p is not None:
        ic = (pt/1000.0) * in_p
        oc = (ct/1000.0) * out_p
        tc = ic + oc
        print("==== COST ESTIMATE ====")
        print(f"Currency: {cur}")
        print(f"Per-1K prices (input={in_p}, output={out_p}) [source={src}]")
        print(f"Estimated cost: {tc:.6f}  [input={ic:.6f}, output={oc:.6f}]")
    else:
        print("[INFO] Pricing not configured. Set AZURE_OPENAI_PRICE_* env vars or azure_openai_pricing.json.")

# Monkey-patch requests.post to auto-capture usage for REST calls
try:
    import requests
    if not hasattr(requests, "_orig_post_for_tokens"):
        requests._orig_post_for_tokens = requests.post
        def _post_wrap(*args, **kwargs):
            resp = requests._orig_post_for_tokens(*args, **kwargs)
            try:
                url = args[0] if args else kwargs.get("url", "")
                if isinstance(url, str) and "/chat/completions" in url:
                    # non-blocking attempt to capture usage
                    data = resp.json()
                    u = extract_usage_from_rest(data)
                    accumulate_usage(u)
            except Exception:
                pass
            return resp
        requests.post = _post_wrap
except Exception:
    pass

# Patch LangChain llm.invoke (if present) to capture usage automatically
try:
    if 'llm' in globals() and llm is not None and not hasattr(llm, "_invoke_patched_for_tokens"):
        _orig_invoke = llm.invoke
        def _invoke_wrap(*args, **kwargs):
            res = _orig_invoke(*args, **kwargs)
            try:
                accumulate_usage(extract_usage_from_langchain(res))
            except Exception:
                pass
            return res
        llm.invoke = types.MethodType(_invoke_wrap, llm)
        llm._invoke_patched_for_tokens = True
except Exception:
    pass

print("[Token tracking] Enabled global interceptors for REST and LangChain.")

[Token tracking] Enabled global interceptors for REST and LangChain.


## Demo settings (global)

- Control printing and execution across the notebook from one place.
- You can still override the query text in runner cells if needed.

In [4]:
# Demo settings (global)
# These control printing and execution across the notebook.
SHOW_INTENT = True
SHOW_REASONING = True
NO_EXEC = False
REFRESH_SCHEMA = False

# Default query used by runner cells unless overridden
default_test_query = 'Weighted average interest rate by region'
print('Demo settings -> SHOW_INTENT:', SHOW_INTENT, '| SHOW_REASONING:', SHOW_REASONING, '| NO_EXEC:', NO_EXEC, '| REFRESH_SCHEMA:', REFRESH_SCHEMA)

Demo settings -> SHOW_INTENT: True | SHOW_REASONING: True | NO_EXEC: False | REFRESH_SCHEMA: False


In [5]:
# After this step, print cumulative token usage/cost
maybe_print_checkpoint("After environment/bootstrap")

### Display Helpers

In [6]:
# Display helpers (ANSI colors for sectioned output)
RESET = "\033[0m"
YELLOW = "\033[33m"           # intent
LIGHT_BLUE = "\033[96m"       # reasoning (bright cyan)
WHITE = "\033[97m"            # raw sql
GRAY = "\033[90m"             # legacy dark gray (not used for sanitized anymore)
LIGHT_GRAY = "\033[38;5;250m" # sanitized sql (lighter and more readable)
GOLD = "\033[38;5;220m"      # results (approx gold)

def colorize(text: str, color: str) -> str:
    return f"{color}{text}{RESET}"

def print_colored_block(label: str, content: str, color: str, sep: str = "\n") -> None:
    block = f"{label}{sep}{content}" if content is not None else label
    print(colorize(block, color))
    print()  # extra blank line for spacing between output blocks

In [7]:
# After schema preview step
maybe_print_checkpoint("After schema preview")

## Azure OpenAI setup and model routing
We detect reasoning-style deployments (o-series/GPT-5) and use direct REST Chat Completions without unsupported params.
For other models, we use the LangChain AzureChatOpenAI wrapper.

In [8]:
# 2) Azure OpenAI configuration & helpers
import json, requests
from langchain_openai import AzureChatOpenAI
from langchain.prompts import ChatPromptTemplate

API_KEY = os.getenv('AZURE_OPENAI_API_KEY')
ENDPOINT = os.getenv('AZURE_OPENAI_ENDPOINT')
DEPLOYMENT_NAME = os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME')
API_VERSION = os.getenv('AZURE_OPENAI_API_VERSION', '2025-04-01-preview')

def _is_reasoning_like_model(deployment_name: str | None) -> bool:
    name = (deployment_name or '').lower()
    return name.startswith('o') or name.startswith('gpt-5')

USING_REASONING = _is_reasoning_like_model(DEPLOYMENT_NAME)
print('Reasoning-style deployment:', USING_REASONING, '| Deployment:', DEPLOYMENT_NAME)

def _azure_chat_completions(messages: List[dict], max_completion_tokens: int | None = None) -> str:
    url = f"{ENDPOINT.rstrip('/')}/openai/deployments/{DEPLOYMENT_NAME}/chat/completions?api-version={API_VERSION}"
    payload: Dict[str, Any] = {'messages': messages}
    if max_completion_tokens is not None:
        payload['max_completion_tokens'] = max_completion_tokens
    headers = {'api-key': API_KEY or '', 'Content-Type': 'application/json'}
    resp = requests.post(url, headers=headers, json=payload, timeout=60)
    resp.raise_for_status()
    data = resp.json()
    return data['choices'][0]['message']['content']

def _make_llm():
    if USING_REASONING:
        return None
    return AzureChatOpenAI(
        openai_api_key=API_KEY,
        azure_endpoint=ENDPOINT,
        deployment_name=DEPLOYMENT_NAME,
        api_version=API_VERSION,
        max_tokens=8192,
    )

llm = _make_llm()
print('LangChain LLM initialized:', llm is not None)

Reasoning-style deployment: True | Deployment: gpt-5
LangChain LLM initialized: False


In [9]:
# After intent extraction
maybe_print_checkpoint("After intent & entities")


==== After intent & entities ====

==== TOKEN USAGE (cumulative) ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]


## Prompts & function: Parse NL into intent/entities
We keep prompt strings inline to mirror the main script.

In [10]:
# 3) Prompts and parsing function
INTENT_PROMPT_TEXT = (
    "You are an expert in translating natural language to database queries.\n"
    "Extract the intent and entities from the following user input:\n"
    "{input}"
)
SQL_PROMPT_TEXT = (
    "You are an expert in SQL and Azure SQL Database. Given the following database schema and the intent/entities,\n"
    "generate a valid T-SQL query for querying the database.\n\n"
    "IMPORTANT:\n"
    "- Do NOT use USE statements (USE [database] is not supported in Azure SQL Database)\n"
    "- Generate only the SELECT query without USE or GO statements\n"
    "- Return only executable T-SQL code without markdown formatting\n"
    "- The database connection is already established to the correct database\n\n"
    "Schema:\n"
    "{schema}\n"
    "Intent and Entities:\n"
    "{intent_entities}\n\n"
    "Generate a T-SQL query that can be executed directly:\n"
)
REASONING_PROMPT_TEXT = (
    "Before writing SQL, produce a short, high-level reasoning summary for how you will construct the query,\n"
    "based on the schema and the intent/entities.\n\n"
    "Rules:\n"
    "- Do NOT include any SQL code.\n"
    "- Keep it concise (<= 150 words) and actionable.\n"
    "- Use simple bullets covering: Entities mapping, Tables/Joins, Aggregations (if any), Filters, Order/Limit, Assumptions.\n\n"
    "Schema:\n"
    "{schema}\n"
    "Intent and Entities:\n"
    "{intent_entities}\n\n"
    "Provide the reasoning summary now:\n"
)

intent_prompt = ChatPromptTemplate.from_template(INTENT_PROMPT_TEXT)
sql_prompt = ChatPromptTemplate.from_template(SQL_PROMPT_TEXT)
reasoning_prompt = ChatPromptTemplate.from_template(REASONING_PROMPT_TEXT)

def parse_nl_query(user_input: str) -> str:
    if USING_REASONING:
        prompt_text = INTENT_PROMPT_TEXT.format(input=user_input)
        messages = [{ 'role': 'user', 'content': prompt_text.strip() }]
        return _azure_chat_completions(messages, max_completion_tokens=8192)
    chain = intent_prompt | llm
    res = chain.invoke({'input': user_input})
    return res.content

print('parse_nl_query ready.')

parse_nl_query ready.


In [11]:
# After reasoning summary (if executed)
maybe_print_checkpoint("After reasoning summary")


==== After reasoning summary ====

==== TOKEN USAGE (cumulative) ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]


## Schema context (with optional refresh)
We use the local cache-backed context from `schema_reader.get_sql_database_schema_context`.

In [12]:
# 4) Schema context helpers

def _safe_import_schema_reader():
    # add project root to path for relative imports while running from docs/
    repo_root = os.path.dirname(os.getcwd()) if os.path.basename(os.getcwd()) == 'docs' else os.getcwd()
    if repo_root not in sys.path:
        sys.path.insert(0, repo_root)
    try:
        from schema_reader import get_sql_database_schema_context, refresh_schema_cache  # type: ignore
        return get_sql_database_schema_context, refresh_schema_cache
    except Exception as e:
        raise ImportError('Unable to import schema helpers from schema_reader.py') from e

get_schema_ctx, refresh_cache = _safe_import_schema_reader()
if REFRESH_SCHEMA:
    try:
        path = refresh_cache()
        print('[INFO] Schema cache refreshed at', path)
    except Exception as e:
        print('[WARN] Schema refresh failed:', e)

schema_preview = get_schema_ctx()
print(schema_preview.split('\n')[0])  # first line only

DATABASE: CONTOSO-FI (Azure SQL)


In [13]:
# After SQL generation
maybe_print_checkpoint("After SQL generation")


==== After SQL generation ====

==== TOKEN USAGE (cumulative) ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]


## Optional reasoning step
Ask the model to outline a brief plan before SQL generation. Useful for transparency in demos.

In [14]:
# 5) Reasoning function

def generate_reasoning(intent_entities: str) -> str:
    schema = get_schema_ctx()
    if USING_REASONING:
        prompt_text = REASONING_PROMPT_TEXT.format(schema=schema, intent_entities=intent_entities)
        messages = [{ 'role': 'user', 'content': prompt_text.strip() }]
        return _azure_chat_completions(messages, max_completion_tokens=8192)
    chain = reasoning_prompt | llm
    res = chain.invoke({'schema': schema, 'intent_entities': intent_entities})
    return res.content

In [15]:
# After SQL sanitization
maybe_print_checkpoint("After SQL sanitization")


==== After SQL sanitization ====

==== TOKEN USAGE (cumulative) ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]


## SQL generation
Generate T-SQL for the given intent/entities and schema context.

In [16]:
# 6) SQL generation function

def generate_sql(intent_entities: str) -> str:
    schema = get_schema_ctx()
    if USING_REASONING:
        prompt_text = SQL_PROMPT_TEXT.format(schema=schema, intent_entities=intent_entities)
        messages = [{ 'role': 'user', 'content': prompt_text.strip() }]
        return _azure_chat_completions(messages, max_completion_tokens=8192)
    chain = sql_prompt | llm
    result = chain.invoke({'schema': schema, 'intent_entities': intent_entities})
    return result.content

In [17]:
# After execution or skip
maybe_print_checkpoint("After execution/skip")


==== After execution/skip ====

==== TOKEN USAGE (cumulative) ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]


## SQL sanitization
Extract only the executable SQL portion and normalize quotes for safety.

In [18]:
# 7) SQL sanitization

def extract_and_sanitize_sql(raw_sql: str) -> str:
    sql_code = raw_sql
    # Prefer fenced code blocks if present (captures full content, e.g., WITH ...)
    m = re.search(r"```sql\s*([\s\S]+?)```", raw_sql, re.IGNORECASE)
    if not m:
        m = re.search(r"```([\s\S]+?)```", raw_sql)
    if m:
        sql_code = m.group(1).strip()
    else:
        # If a CTE exists, start from WITH to capture the entire statement
        with_m = re.search(r"(?is)\bWITH\b\s+[A-Za-z0-9_\[\]]+\s+AS\s*\(", raw_sql)
        if with_m:
            sql_code = raw_sql[with_m.start():].strip()
        else:
            # Fallback to first SELECT
            m2 = re.search(r"(?is)\bSELECT\b[\s\S]+", raw_sql)
            if m2:
                sql_code = m2.group(0).strip()
            else:
                sql_code = raw_sql.strip()
    return (sql_code
            .replace('’', "'")
            .replace('‘', "'")
            .replace('“', '"')
            .replace('”', '"'))

## Execution (optional)
Use `sql_executor.execute_sql_query` to run against Azure SQL. Keep NO_EXEC=True for dry runs during demos.

In [19]:
# 8) SQL execution helpers (definitions only)

def _safe_import_sql_executor():
    repo_root = os.path.dirname(os.getcwd()) if os.path.basename(os.getcwd()) == 'docs' else os.getcwd()
    if repo_root not in sys.path:
        sys.path.insert(0, repo_root)
    try:
        from sql_executor import execute_sql_query  # type: ignore
        return execute_sql_query
    except Exception as e:
        raise ImportError('Unable to import execute_sql_query from sql_executor.py') from e


def _format_table(rows):
    if not rows:
        return 'No results returned.\n', []
    columns = list(rows[0].keys())
    col_widths = {c: max(len(c), max(len(str(r[c])) for r in rows)) for c in columns}
    header = ' | '.join(c.ljust(col_widths[c]) for c in columns)
    sep = '-+-'.join('-' * col_widths[c] for c in columns)
    lines = [header, sep]
    for r in rows:
        lines.append(' | '.join(str(r[c]).ljust(col_widths[c]) for c in columns))
    return ('\n'.join(lines) + '\n', [header, sep] + [' | '.join(str(r[c]).ljust(col_widths[c]) for c in columns) for r in rows])

# Note: Execution is controlled by the global NO_EXEC flag in the Demo settings cell.
# This cell only defines helpers; the end-to-end runner performs the actual execution.

## End-to-end demo cell
Update toggles (`REFRESH_SCHEMA`, `SHOW_REASONING`, `NO_EXEC`) and `test_query` above as needed.
Run this cell to repeat the flow with a different question.

In [20]:
# 9) Rerun flow with another question (single, hardened output)

# Global and per-cell guards to prevent duplicate section prints
PRINTED_SECTIONS = globals().get('PRINTED_SECTIONS', {})  # persists across cells/runs in this kernel
_printed_labels = set()  # per-execution guard

def print_once(label: str, content: str, color: str):
    # If this label already printed in this cell execution, skip
    if label in _printed_labels:
        return
    # If we've already printed the exact same content for this label in this kernel session, skip
    if PRINTED_SECTIONS.get(label) == (content or ''):
        return
    _printed_labels.add(label)
    PRINTED_SECTIONS[label] = (content or '')
    print_colored_block(label, content, color)

# Pick your test query (uses global default unless you set a new one here)
user_query = default_test_query

# 1) Intent/entities
intent_entities = parse_nl_query(user_query)
if SHOW_INTENT:
    trimmed = intent_entities[:200] + ('...' if len(intent_entities) > 200 else '') if intent_entities else '[empty]'
    print_once('INTENT ENTITIES:', trimmed, YELLOW)

# 2) Optional reasoning
if SHOW_REASONING:
    try:
        r = generate_reasoning(intent_entities)
        r_clean = (r.strip() if (r and isinstance(r, str)) else '')
        print_once('REASONING:', (r_clean if r_clean else '[no reasoning returned]'), LIGHT_BLUE)
    except Exception as e:
        print('[WARN] Reasoning step failed:', e)

# 3) SQL generation with guard
try:
    raw_sql = generate_sql(intent_entities)
except Exception as e:
    raw_sql = ''
    print('[ERROR] SQL generation failed:', e)

if not raw_sql or not raw_sql.strip():
    print_once('RAW SQL (truncated):', '[empty]', WHITE)
    sql_to_run = ''
    print_once('SANITIZED SQL:', '[empty]', LIGHT_GRAY)
else:
    raw_trimmed = (raw_sql[:500] + '...') if len(raw_sql) > 500 else raw_sql
    print_once('RAW SQL (truncated):', raw_trimmed, WHITE)
    sql_to_run = extract_and_sanitize_sql(raw_sql)
    print_once('SANITIZED SQL:', sql_to_run, LIGHT_GRAY)

# 4) Execution with safety checks
if not NO_EXEC:
    sql_final = sql_to_run if 'sql_to_run' in locals() else ''
    if not sql_final or not sql_final.strip():
        print('[INFO] Skipping execution: SQL is empty.')
    elif not re.search(r'\bselect\b', sql_final, re.IGNORECASE):
        print('[INFO] Skipping execution: No SELECT statement detected.')
    else:
        try:
            execute_sql_query = _safe_import_sql_executor()
            rows = execute_sql_query(sql_final)
            table_text, _ = _format_table(rows)
            print_once('RESULTS:', table_text, GOLD)
        except Exception as e:
            print('[ERROR] Query failed:', e)
else:
    print('[INFO] Execution skipped (NO_EXEC=True)')

[33mINTENT ENTITIES:
{
  "intent": "compute_aggregation",
  "entities": {
    "metric": "interest_rate",
    "aggregation": "weighted_average",
    "weight_field": null,
    "group_by": ["region"],
    "filters": [],
    ...[0m

[96mREASONING:
- Entities mapping: Metric = InterestRatePct; Weight = PrincipalAmount; Group by = RegionName.
- Tables/Joins: Use dbo.vw_LoanPortfolio (preferred for portfolio metrics). No joins needed since RegionName, InterestRatePct, and PrincipalAmount exist in the view.
- Aggregations: Compute weighted average interest rate per region as sum(InterestRatePct * PrincipalAmount) / sum(PrincipalAmount). Guard against divide-by-zero by excluding groups with zero principal.
- Filters: None specified; include all loans in the view. Optionally exclude loans with NULL InterestRatePct or PrincipalAmount.
- Order/Limit: None specified; default to ordering by RegionName for readability; no limit.
- Assumptions: Use RegionName from the view to represent “region.” In

## Token usage and pricing setup
This section adds lightweight helpers to capture token usage from Azure OpenAI responses and estimate cost in USD or CAD using `azure_openai_pricing.json` or environment variables.

In [21]:
# Helpers to track token usage and compute cost in the notebook
import os, json
from typing import Any, Dict, Optional, Tuple
from pathlib import Path

# Accumulator
TOKEN_USAGE = {"prompt": 0, "completion": 0, "total": 0}

def accumulate_usage(usage: Optional[Dict[str, Any]]):
    if not usage:
        return
    TOKEN_USAGE["prompt"] += int(usage.get("prompt_tokens", 0) or 0)
    TOKEN_USAGE["completion"] += int(usage.get("completion_tokens", 0) or 0)
    if usage.get("total_tokens") is not None:
        TOKEN_USAGE["total"] += int(usage.get("total_tokens") or 0)
    else:
        TOKEN_USAGE["total"] = TOKEN_USAGE["prompt"] + TOKEN_USAGE["completion"]

def normalize_dep(name: str) -> str:
    import re
    return re.sub(r"[^A-Za-z0-9]+", "_", (name or "")).upper().strip("_")

def load_pricing() -> Dict[str, Dict[str, Any]]:
    cfg_path = Path.cwd().parent / "azure_openai_pricing.json" if Path.cwd().name == "docs" else Path.cwd() / "azure_openai_pricing.json"
    if cfg_path.exists():
        try:
            return json.loads(cfg_path.read_text())
        except Exception:
            return {}
    return {}

def get_target_currency() -> str:
    cur = (os.getenv("AZURE_OPENAI_PRICE_CURRENCY", "USD") or "USD").upper()
    return cur if cur in ("USD", "CAD") else "USD"

def convert_currency(amount: float, from_cur: str, to_cur: str) -> Optional[float]:
    if from_cur == to_cur:
        return amount
    if from_cur == "USD" and to_cur == "CAD":
        rate = os.getenv("AZURE_OPENAI_PRICE_USD_TO_CAD")
        return amount * float(rate) if rate else None
    if from_cur == "CAD" and to_cur == "USD":
        rate = os.getenv("AZURE_OPENAI_PRICE_CAD_TO_USD")
        return amount * float(rate) if rate else None
    return None

def get_pricing_for_deployment(dep_name: Optional[str]) -> Tuple[Optional[float], Optional[float], str, str]:
    dep = dep_name or ""
    dep_key = normalize_dep(dep)
    target = get_target_currency()

    # 1) Deployment-specific env
    in_env = os.getenv(f"AZURE_OPENAI_PRICE_{dep_key}_INPUT_PER_1K")
    out_env = os.getenv(f"AZURE_OPENAI_PRICE_{dep_key}_OUTPUT_PER_1K")
    if in_env and out_env:
        try:
            return float(in_env), float(out_env), f"env:{dep_key}", target
        except ValueError:
            pass

    # 2) Global env
    in_glob = os.getenv("AZURE_OPENAI_PRICE_INPUT_PER_1K")
    out_glob = os.getenv("AZURE_OPENAI_PRICE_OUTPUT_PER_1K")
    if in_glob and out_glob:
        try:
            return float(in_glob), float(out_glob), "env:global", target
        except ValueError:
            pass

    # 3) JSON
    pm = load_pricing()
    entry = pm.get(dep.lower()) or pm.get(dep)
    if isinstance(entry, dict):
        # nested per-currency first
        cur_block = entry.get(target) if target in ("USD","CAD") else None
        if isinstance(cur_block, dict) and "input_per_1k" in cur_block and "output_per_1k" in cur_block:
            try:
                return float(cur_block["input_per_1k"]), float(cur_block["output_per_1k"]), "file:azure_openai_pricing.json", target
            except Exception:
                pass
        # flat legacy (assume USD)
        if "input_per_1k" in entry and "output_per_1k" in entry:
            try:
                in_usd = float(entry["input_per_1k"])
                out_usd = float(entry["output_per_1k"])
                if target == "USD":
                    return in_usd, out_usd, "file:azure_openai_pricing.json", "USD"
                i = convert_currency(in_usd, "USD", target)
                o = convert_currency(out_usd, "USD", target)
                if i is not None and o is not None:
                    return i, o, "file:azure_openai_pricing.json (converted)", target
                return in_usd, out_usd, "file:azure_openai_pricing.json (USD; no conversion)", "USD"
            except Exception:
                pass
    return None, None, "unset", target

def print_usage_and_cost(dep_name: Optional[str], header: Optional[str] = None):
    in_p, out_p, src, cur = get_pricing_for_deployment(dep_name)
    pt = TOKEN_USAGE["prompt"]
    ct = TOKEN_USAGE["completion"]
    tt = TOKEN_USAGE["total"] or (pt + ct)
    if header:
        print(f"\n==== {header} ====")
    print("\n==== TOKEN USAGE ====")
    print(f"Input tokens: {pt}")
    print(f"Completion tokens: {ct}")
    print(f"Total tokens: {tt}")
    if in_p is not None and out_p is not None:
        ic = (pt/1000.0) * in_p
        oc = (ct/1000.0) * out_p
        tc = ic + oc
        print("==== COST ESTIMATE ====")
        print(f"Currency: {cur}")
        print(f"Per-1K prices (input={in_p}, output={out_p}) [source={src}]")
        print(f"Estimated cost: {tc:.6f}  [input={ic:.6f}, output={oc:.6f}]")
    else:
        print("[INFO] Pricing not configured. Set AZURE_OPENAI_PRICE_* env vars or azure_openai_pricing.json.")

### Capture usage from responses
Below we add small wrappers to capture usage from the REST responses (reasoning/o-series path) and from LangChain responses (non-reasoning path).

In [22]:
def extract_usage_from_rest(resp_json: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    return resp_json.get("usage") if isinstance(resp_json, dict) else None

def extract_usage_from_langchain(ai_msg: Any) -> Optional[Dict[str, Any]]:
    try:
        meta = getattr(ai_msg, "response_metadata", None) or {}
        tu = meta.get("token_usage")
        if isinstance(tu, dict):
            return {
                "prompt_tokens": tu.get("prompt_tokens") or tu.get("input_tokens") or 0,
                "completion_tokens": tu.get("completion_tokens") or tu.get("output_tokens") or 0,
                "total_tokens": tu.get("total_tokens") or (tu.get("prompt_tokens", 0) or 0) + (tu.get("completion_tokens", 0) or 0),
            }
        um = getattr(ai_msg, "usage_metadata", None)
        if isinstance(um, dict):
            return {
                "prompt_tokens": um.get("input_tokens") or um.get("prompt_tokens") or 0,
                "completion_tokens": um.get("output_tokens") or um.get("completion_tokens") or 0,
                "total_tokens": um.get("total_tokens") or (um.get("input_tokens", 0) or 0) + (um.get("output_tokens", 0) or 0),
            }
    except Exception:
        pass
    return None

### Example: run a single NL→SQL and then print usage and cost
Use the same DEPLOYMENT_NAME already set in the notebook for the model.

In [23]:
# Demo: prompt → reasoning (optional) → sql, capturing usage (gated)
RUN_DEMO = globals().get('RUN_DEMO', False)
if not RUN_DEMO:
    print("[demo] Skipped (set RUN_DEMO=True to enable this demo cell).")
else:
    demo_question = "Show the 10 most recent loans"
    print(f"Question: {demo_question}")

    # Reasoning-like deployments (o*/gpt-5) use REST in the notebook as well, if llm is None
    if llm is None and USING_REASONING:
        import requests
        prompt_text = INTENT_PROMPT_TEXT.format(input=demo_question)
        messages = [{"role": "user", "content": prompt_text.strip()}]
        url = f"{ENDPOINT.rstrip('/')}/openai/deployments/{DEPLOYMENT_NAME}/chat/completions?api-version={API_VERSION}"
        payload = {"messages": messages}
        headers = {"api-key": API_KEY, "Content-Type": "application/json"}
        resp = requests.post(url, headers=headers, json=payload, timeout=60)
        resp.raise_for_status()
        data = resp.json()
        content = data["choices"][0]["message"]["content"]
        accumulate_usage(extract_usage_from_rest(data))
    else:
        # Non-reasoning path via LangChain already set up in this notebook
        res = intent_prompt | llm
        msg = res.invoke({"input": demo_question})
        content = msg.content
        accumulate_usage(extract_usage_from_langchain(msg))

    print("Intent & Entities:\n", content, "\n")

    # Generate SQL using the notebook's flow variables
    schema_preview = schema_preview  # assumed from earlier cells
    if llm is None and USING_REASONING:
        import requests
        prompt_text = SQL_PROMPT_TEXT.format(schema=schema_preview, intent_entities=content)
        messages = [{"role":"user","content": prompt_text.strip()}]
        url = f"{ENDPOINT.rstrip('/')}/openai/deployments/{DEPLOYMENT_NAME}/chat/completions?api-version={API_VERSION}"
        payload = {"messages": messages}
        headers = {"api-key": API_KEY, "Content-Type": "application/json"}
        resp = requests.post(url, headers=headers, json=payload, timeout=60)
        resp.raise_for_status()
        data = resp.json()
        raw_sql = data["choices"][0]["message"]["content"]
        accumulate_usage(extract_usage_from_rest(data))
    else:
        res = (sql_prompt | llm).invoke({"schema": schema_preview, "intent_entities": content})
        raw_sql = res.content
        accumulate_usage(extract_usage_from_langchain(res))

    print("Generated SQL (raw):\n", raw_sql)

    # Print token usage and estimated cost (uses DEPLOYMENT_NAME from earlier setup)
    print_usage_and_cost(DEPLOYMENT_NAME)

[demo] Skipped (set RUN_DEMO=True to enable this demo cell).


## Final summary: token usage, cost, and run duration
Toggle `SHOW_COSTS` to control checkpoint verbosity; the final summary always prints.

In [24]:
# Gate verbosity for checkpoints; final summary always prints
SHOW_COSTS = True  # set to False to suppress intermediate checkpoint prints

import time
if 'RUN_START_TS' not in globals():
    RUN_START_TS = time.time()

def maybe_print_checkpoint(header: str):
    if SHOW_COSTS:
        print_usage_and_cost(DEPLOYMENT_NAME, header=header)

# You can also call maybe_print_checkpoint("After some step") manually in earlier cells if desired
print("[Summary setup] SHOW_COSTS=", SHOW_COSTS)

[Summary setup] SHOW_COSTS= True


In [25]:
# Final summary cell: always prints cumulative usage/cost and run duration
import time
duration = time.time() - RUN_START_TS if 'RUN_START_TS' in globals() else None
print_usage_and_cost(DEPLOYMENT_NAME, header="Final cumulative totals")
if duration is not None:
    print("==== RUN DURATION ====")
    print(f"Run duration: {duration:.2f} seconds")
else:
    print("[INFO] RUN_START_TS not set; duration unavailable.")


==== Final cumulative totals ====

==== TOKEN USAGE ====
Input tokens: 0
Completion tokens: 0
Total tokens: 0
==== COST ESTIMATE ====
Currency: USD
Per-1K prices (input=0.00125, output=0.01) [source=file:azure_openai_pricing.json]
Estimated cost: 0.000000  [input=0.000000, output=0.000000]
==== RUN DURATION ====
Run duration: 32.39 seconds
