-
Notifications
You must be signed in to change notification settings - Fork 1
Architecture
This page describes the internal architecture of SQL Query Engine — how the components fit together, how data flows through the system, and the design decisions behind each layer.
SQL Query Engine is a FastAPI application that orchestrates a two-stage pipeline: query generation and query evaluation. It connects to three external services: an LLM server (any OpenAI-compatible endpoint), a PostgreSQL database (read-only), and Redis (session cache + real-time streaming).
graph TD
REQ["HTTP Request<br/>(Native or OpenAI-compat)"] --> FA
subgraph FA["FastAPI Application"]
ROUTES["main.py (/inference/*)<br/>openaiCompat.py (/v1/*)"]
ROUTES --> ENGINE["SQLQueryEngine<br/>(engine.py)"]
ENGINE --> GEN["QueryGenerator"]
ENGINE --> EVAL["QueryEvaluator"]
end
GEN --> LLM["LLM Server<br/>(vLLM, Ollama, OpenAI)"]
EVAL --> LLM
GEN --> PG["PostgreSQL<br/>(read-only via psycopg3)"]
EVAL --> PG
GEN --> RD["Redis<br/>(cache + Pub/Sub)"]
EVAL --> RD
The generator converts a natural language prompt into a SQL query. It follows this sequence:
flowchart TD
A["1. Check Redis for cached<br/>schema description"] --> B{"Cache miss?"}
B -->|Yes| C["2a. Introspect PostgreSQL<br/>schema via dbHandler<br/><i>tables, columns, sample rows</i>"]
B -->|No| F
C --> D["2b. LLM describes schema<br/>(streamed, cached)<br/><i>comprehensive markdown doc</i>"]
D --> F["3. LLM generates SQL query<br/>from prompt + schema<br/><i>structured output: description, query</i>"]
F --> G["Generated SQL + Description"]
The schema description is cached in Redis under {chatID}:SQLQueryEngine → dbSchemaDescription, so subsequent queries in the same session skip steps 2a and 2b entirely.
The evaluator executes the generated query and, if it fails, enters a repair loop with two key safety features: early-accept (queries returning rows are accepted immediately without an LLM call) and best-result tracking (the best result across all attempts is returned if retries exhaust).
flowchart TD
A["1. Resolve schema context<br/>(3-tier fallback:<br/>payload → Redis → fresh)"] --> B["2. Execute query<br/>against PostgreSQL"]
B --> C{"3. Returned rows?"}
C -->|"Yes (early-accept)"| D["Accept immediately ✓<br/>No LLM call needed"]
C -->|"No rows or error"| E["4. Track best result<br/>(if rows > best so far)"]
E --> F["5. Feed error + context<br/>to LLM evaluator"]
F --> G["6. LLM returns fix<br/>(multi-strategy parsing)"]
G --> H{"7. Retries left?"}
H -->|Yes| B
H -->|No| I["Return best result<br/>seen across all attempts"]
Schema context resolution follows a three-tier fallback:
- From payload — If the generator already ran, its context is passed directly
- From Redis — Load cached description from a previous session turn
- From scratch — Full schema introspection + LLM description (slowest path)
Key design decisions:
- Early-accept: If a query executes successfully and returns rows, the evaluator accepts it immediately without invoking the LLM. This prevents regressions where the LLM rewrites a working query into a broken one.
- Best-result tracking: The evaluator tracks the best result (most rows returned) across all retry attempts. If retries are exhausted without a successful accept, the best result is returned instead of nothing.
-
isValidis always False: TheQueryEvaluationSchemasetsisValid=Falseby default. The system never trusts the LLM's self-assessment — it verifies by executing the query.
graph LR
subgraph API["API Layer"]
MAIN["main.py<br/>Native /inference/* routes"]
OAI["openaiCompat.py<br/>/v1/* routes + SSE"]
end
subgraph Core["Core Pipeline"]
ENG["engine.py<br/>Orchestrator"]
GEN["queryGenerator.py<br/>Stage 1: NL → SQL"]
EVL["queryEvaluator.py<br/>Stage 2: Execute + Repair"]
end
subgraph Services["Services"]
DB["dbHandler.py<br/>PostgresDB (read-only)"]
SM["sessionManager.py<br/>Redis hash + Pub/Sub"]
end
subgraph Config["Configuration & Prompts"]
CC["connConfig.py<br/>Env vars + dependency injection"]
PT["promptTemplates.py<br/>LangChain templates"]
SG["sqlGuidelines.py<br/>PostgreSQL best-practices"]
end
MAIN --> ENG
OAI --> ENG
ENG --> GEN
ENG --> EVL
GEN --> DB
GEN --> SM
EVL --> DB
EVL --> SM
GEN --> PT
EVL --> PT
GEN --> SG
EVL --> SG
Here is the complete data flow for a POST /inference/sqlQueryEngine/{chatID} request:
sequenceDiagram
participant Client
participant main.py
participant Engine as SQLQueryEngine
participant Gen as QueryGenerator
participant Eval as QueryEvaluator
participant DB as PostgresDB
participant LLM as LLM Server
participant Redis
Client->>main.py: POST /inference/sqlQueryEngine/{chatID}
main.py->>Engine: run(chatID, basePrompt, ...)
Engine->>Gen: process(chatID, schemaExamples, basePrompt)
Gen->>Redis: getUserChatContext("dbSchemaDescription")
alt Cache MISS
Gen->>DB: getParsedSchemaDump(schemaExamples)
DB-->>Gen: schema dict + formatted string
Gen->>LLM: stream(postgreSchemaDescriptionPrompt)
LLM-->>Gen: schema description chunks
Gen->>Redis: publish(chatID, progress)
Gen->>Redis: postUserChatContext("dbSchemaDescription")
end
Gen->>LLM: invoke(queryGeneratorPrompt)
LLM-->>Gen: raw text response
Gen->>Gen: _parseResponse() — multi-strategy extraction
Gen->>Redis: postUserChatContext("dbQueryGenerator")
Gen-->>Engine: {description, query}
Engine->>Eval: process(chatID, basePrompt, query, ...)
Eval->>Eval: Resolve schema context (payload → Redis → scratch)
loop Repair Loop (up to retryCount)
Eval->>DB: queryExecutor(currentQuery)
alt Returned rows (early-accept)
Eval->>Redis: publish(chatID, "accepted")
Note over Eval: Break — no LLM call needed
else Error or empty
Eval->>Eval: Track best result
Eval->>Redis: publish(chatID, execution status)
Eval->>LLM: invoke(queryEvaluatorFixerPrompt)
LLM-->>Eval: raw text response
Eval->>Eval: _parseEvalResponse() — multi-strategy extraction
Eval->>Redis: publish(chatID, repair results)
end
end
Eval->>Redis: postUserChatContext("validatorChat:N")
Eval-->>Engine: {query, observation, results}
Engine-->>main.py: combined response
main.py-->>Client: JSON {code, chatID, generation, evaluation}
The /v1/chat/completions endpoint with stream: true uses a concurrent architecture to provide real-time progress:
sequenceDiagram
participant Client
participant FastAPI as FastAPI StreamingResponse
participant Redis as Redis Pub/Sub
participant Engine as Engine Thread (pool)
participant Mirror as Mirror Channel<br/>{chatID}:stream
FastAPI->>Redis: Subscribe to {chatID}
FastAPI->>Engine: Launch engine.run() in thread pool
loop Pipeline Progress
Engine->>Redis: Publish progress to {chatID}
Redis->>FastAPI: Receive progress message
FastAPI->>Client: SSE chunk (wrapped in think tags)
FastAPI->>Mirror: Mirror clean content
end
Engine-->>FastAPI: Return final result
FastAPI->>Redis: Drain residual messages
FastAPI->>Client: SSE chunk (final markdown result)
FastAPI->>Client: data: [DONE]
The streaming flow:
- Subscribe to Redis Pub/Sub channel
{chatID}before launching the engine - Run
engine.run()in a thread-pool executor (non-blocking) - Poll Redis for progress messages, extract content after
SPLIT_IDENTIFIER(<|-/|-/>) - Wrap progress in
<think>...</think>tags (collapsible in OpenWebUI) - After engine completes, drain residual messages (100 retries, 100ms timeout)
- Format the final result as markdown (description + SQL code block + results table)
- Mirror all content to
{chatID}:streamfor external subscribers
The engine does not use LangChain's with_structured_output() or function calling. Instead, both stages use a multi-strategy response parser (_parseResponse() static method) that extracts SQL from any LLM output format. This makes the engine compatible with any model — including those that don't support structured output.
Stage 1 — QueryGenerator._parseResponse():
- Strip
<think>...</think>tags (handles reasoning models like Qwen3, DeepSeek-R1) - Try full JSON parse →
AutomatedQuerySchema - Try extracting embedded JSON object with
"query"or"sql"key - Try extracting SQL from
```sqlcode blocks - Try matching a
SELECT ... ;statement via regex - Last resort: treat entire cleaned text as the query
Stage 2 — QueryEvaluator._parseEvalResponse():
Same 5-strategy cascade, but extracts fixedQuery from JSON keys "fixedQuery", "fixed_query", "sql", or "query".
Pydantic schemas with field aliases:
Both stages use Pydantic model_validator to normalize variant field names:
# Stage 1 — AutomatedQuerySchema
class AutomatedQuerySchema(BaseModel):
description: str # Plain English description
query: str # The SQL query
sql: str # Alias — model_validator normalizes sql → query
# Stage 2 — QueryEvaluationSchema
class QueryEvaluationSchema(BaseModel):
isValid: bool = False # Always False — system verifies by executing
modifiedUserPrompt: str # Optionally adjusted user question
observation: str # Diagnosis of what went wrong
fixedQuery: str # Corrected SQL query
fixed_query: str # Alias — normalized to fixedQuery
sql: str # Alias — normalized to fixedQuery
query: str # Alias — normalized to fixedQuery-
Read-only database access:
psycopgconnection enforced viaconn.set_read_only(True)— impossible to INSERT, UPDATE, DELETE, or DROP -
API key authentication: Bearer token validation on
/v1/*routes; supports multiple comma-separated keys -
Isolated sessions: Each
chatIDgets its own Redis namespace; no cross-session data leakage - No raw SQL exposure: User prompts and SQL are always mediated through LLM context with guidelines
-
Result limiting: Query results capped at 50 rows (
hardLimit) to prevent memory exhaustion
📄 Paper: arXiv:2604.16511 | 📊 Dataset: Hugging Face | 💻 Source: GitHub
SQL Query Engine
Design
Setup
API
Internals
Evaluation
Help