In [8]:
import src.tools.check_bank_balance
print(dir(src.tools.check_bank_balance.run))

['__annotate__', '__annotations__', '__builtins__', '__call__', '__class__', '__closure__', '__code__', '__defaults__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__get__', '__getattribute__', '__getstate__', '__globals__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__kwdefaults__', '__le__', '__lt__', '__module__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__type_params__']


In [None]:
import json
from typing import Any, Union, List, Dict, Tuple

In [None]:
def render_response(
    data: Union[dict, list, str],
    max_total_chars: int = 25000,
    max_rows: int = 200,
    max_cell_width: int = 120,
    debug: bool = False
) -> str:
    """
    Convert raw API responses (dict/list/JSON-string) into grouped Markdown.
    - Accepts dict/list directly, or {"text": "...json..."} or {"text": <dict/list>}
    - Truncates large values and very long outputs
    - Renders lists of dicts as Markdown tables
    - Recursively groups content at all nesting depths
    - Optional debug logging to trace make_table usage
    """

    debug_logs: List[str] = []

    # -------------------------------
    # 1) Normalize input into a Python object
    # -------------------------------
    def _to_obj(x: Any) -> Any:
        # Case A: {"text": ...}
        if isinstance(x, dict) and "text" in x:
            inner = x["text"]
            # If it's already a dict/list, use it
            if isinstance(inner, (dict, list)):
                return inner
            # If it's a string, try to parse JSON
            if isinstance(inner, str):
                try:
                    return json.loads(inner)
                except Exception:
                    # Raw, non-JSON string: present as leaf
                    return {"text": inner}
            # Fallback
            return {"text": str(inner)}

        # Case B: Raw dict/list directly
        if isinstance(x, (dict, list)):
            return x

        # Case C: Raw JSON string
        if isinstance(x, str):
            try:
                return json.loads(x)
            except Exception:
                return {"text": x}

        # Anything else → stringify
        return {"text": str(x)}

    obj = _to_obj(data)

    # -------------------------------
    # 2) Truncation helpers
    # -------------------------------
    def _truncate_cell(v: Any) -> str:
        s = "" if v is None else str(v)
        if len(s) > max_cell_width:
            return s[:max_cell_width] + "...(truncated)"
        return s

    # -------------------------------
    # 3) Table builder for list-of-dicts
    # -------------------------------
    def _is_list_of_dicts(records: list) -> bool:
        return (
            isinstance(records, list) and
            len(records) > 0 and
            all(isinstance(r, dict) for r in records)
        )

    def _collect_columns(records: List[Dict[str, Any]]) -> List[str]:
        # Preserve discovery order
        seen_cols: List[str] = []
        for r in records:
            for k in r.keys():
                if k not in seen_cols:
                    seen_cols.append(k)
        return seen_cols

    def _make_table(records: list, path: str = "") -> Union[str, None]:
        if not _is_list_of_dicts(records):
            if debug:
                debug_logs.append(f"[SKIP TABLE] Path={path} reason=not_list_of_dicts len={len(records) if isinstance(records, list) else 'N/A'}")
            return None

        cols = _collect_columns(records)
        if debug:
            debug_logs.append(f"[MAKE TABLE] Path={path} cols={cols} rows={len(records)}")

        header = "| " + " | ".join(cols) + " |\n"
        sep = "| " + " | ".join(["---"] * len(cols)) + " |\n"
        rows = []
        for r in records:
            row = [_truncate_cell(r.get(c, "")) for c in cols]
            rows.append("| " + " | ".join(row) + " |")
        return header + sep + "\n".join(rows) + "\n"

    # -------------------------------
    # 4) Recursive renderer to Markdown
    # -------------------------------
    def _is_primitive(x: Any) -> bool:
        return not isinstance(x, (dict, list))

    def _is_flat_dict(d: dict) -> bool:
        # flat dict: all values primitives (no dicts/lists)
        return all(_is_primitive(v) for v in d.values())

    def _walk(node: Any, level: int = 0, name: str = None, path: str = "root") -> str:
        md: List[str] = []
        h = max(1, min(6, level + 1))  # clamp H1..H6
        current_path = f"{path}.{name}" if name is not None else path

        if isinstance(node, dict):
            # Heading for this object if it has a name
            if name:
                md.append(f"\n{'#' * h} {name}")

            if _is_flat_dict(node):
                # For flat dicts, render as bullet list for readability
                for k, v in node.items():
                    md.append(f"- **{k}:** {_truncate_cell(v)}")
                return "\n".join(md)

            # Otherwise walk children
            for k, v in node.items():
                md.append(_walk(v, level + 1, k, current_path))
            return "\n".join(md)

        elif isinstance(node, list):
            # Try to render as a table if it's list of dicts
            table = _make_table(node, current_path)
            if table is not None:
                if name:
                    md.append(f"\n{'#' * h} {name}")
                md.append(table)
                return "\n".join(md)

            # Otherwise render items individually
            if name:
                md.append(f"\n{'#' * h} {name}")
            for i, item in enumerate(node):
                md.append(_walk(item, level + 1, f"{name}[{i}]" if name else f"[{i}]", current_path))
            return "\n".join(md)

        else:
            # primitive leaf
            val = _truncate_cell(node)
            if name is None:
                md.append(f"\n{'#' * h} value\n{val}")
            else:
                md.append(f"\n{'#' * h} {name}: {val}")
            return "\n".join(md)

    markdown = _walk(obj)

    # -------------------------------
    # 5) Global output truncation
    # -------------------------------
    if len(markdown) > max_total_chars:
        markdown = markdown[:max_total_chars] + "\n\n...(OUTPUT TRUNCATED: size limit)..."

    # Limit lines
    lines = markdown.splitlines()
    if len(lines) > max_rows:
        markdown = "\n".join(lines[:max_rows]) + "\n...(OUTPUT TRUNCATED: row limit)..."

    # Attach debug logs (optional)
    if debug and debug_logs:
        markdown += "\n\n---\n**Debug logs**\n\n```\n" + "\n".join(debug_logs) + "\n```"

    return markdown

In [11]:
def extract_sql_schema(table_meta):

    table = table_meta["name"]
    schema = table_meta.get("schema")

    cols = []

    for f in table_meta.get("fields", []):
        col = f"{f['name']} {f['database_type'].upper()}"

        if f.get("semantic_type") == "type/PK":
            col += " PRIMARY KEY"

        cols.append(col)

    # ✅ only add schema if it exists
    table_name = f"{schema}.{table}" if schema else table

    return f"""CREATE TABLE {table_name} (
  {", ".join(cols)}
);"""

In [None]:
SYSTEM_PROMPT = f""" 
You are an AI assistant that converts natural language questions into SQL queries. A database schema will be provided to you. Follow these rules strictly:
- Always return a single SQL query as your final output. Do not include explanations, descriptions, comments, or any additional text. Only return SQL.
- Use only the tables and columns explicitly provided in the schema. If the user asks for information that does not exist in the schema, return a SQL query that uses only the available fields in the most reasonable way.
- Do not invent new tables, columns, or relationships. Use only what is defined.
- When joins are required, infer the relationship based on foreign-key naming conventions or explicit schema instructions. If ambiguity exists, choose the relationship that is most consistent with typical relational database design.
- Fully qualify columns when necessary to avoid ambiguity.
- Never return placeholders. For example, do not return "table_name" or "column_name". Always return actual schema elements.
- If multiple SQL interpretations are possible, choose the simplest valid SQL that answers the user's question.
- Use standard ANSI SQL unless the schema or user explicitly requests a specific dialect.
- Do not include LIMIT clauses unless explicitly requested.
- Ensure your SQL is syntactically correct and ready to execute.
- DONT USE UNNECESSARILY COMPLEX COMMANDS. RESPOND WITH THE MOST UNDERSTANABLE AND USER READABLE COMMANDS.
Use the schema provided below for your query output:
{SCHEMA}
"""
QUERY = "Retrieve all virtual account transaction records between Feb 1 and Feb 19, 2026 ordered by corporate ID and latest transaction date."


In [8]:
from openai import OpenAI
import os
from dotenv import load_dotenv
load_dotenv()

client = OpenAI(api_key=os.getenv("OPEN_AI_API_KEY"))
response = client.responses.create(
    model="gpt-5",
    input=[
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": QUERY}
    ]
)