Skip to content

Module Reference

codeadeel edited this page Apr 7, 2026 · 3 revisions

Internal code documentation for contributors and developers extending the SQL Query Engine. This page covers every module in the sqlQueryEngine package — its classes, methods, and how they interconnect.

Package Structure

sqlQueryEngine/
├── __init__.py           # Public exports
├── main.py               # FastAPI app definition + native routes
├── engine.py             # SQLQueryEngine orchestrator
├── queryGenerator.py     # Stage 1: NL → SQL
├── queryEvaluator.py     # Stage 2: execute + repair
├── openaiCompat.py       # OpenAI-compatible API layer
├── dbHandler.py          # PostgreSQL database handler
├── sessionManager.py     # Redis session store + Pub/Sub
├── connConfig.py         # Environment config + dependency injection
├── promptTemplates.py    # LangChain prompt templates
└── sqlGuidelines.py      # PostgreSQL best-practices corpus

__init__.py — Public Exports

The package exposes the following for external import:

from sqlQueryEngine import app                # FastAPI application
from sqlQueryEngine import SQLQueryEngine     # Pipeline orchestrator
from sqlQueryEngine import QueryGenerator     # Stage 1
from sqlQueryEngine import QueryEvaluator     # Stage 2

main.py — FastAPI Application

Defines the FastAPI application and native inference routes.

App Instance

app = FastAPI(
    title="SQL Query Engine",
    version="1.0.0",
    openapi_tags=tagsMetadata
)

Tag groups: SQL Inference, SQL Generation, SQL Evaluation, Ping.

Routes

Route Method Handler Description
/ping GET ping() Health check — returns {"code": 200, "status": "...", "agent": "sqlQueryEngine", ...}
/inference/sqlQueryEngine/{chatID} POST sql_query_engine_inference() Full pipeline (Stage 1 + 2)
/inference/sqlQueryGeneration/{chatID} POST sql_query_engine_generation() Stage 1 only
/inference/sqlQueryEvaluation/{chatID} POST sql_query_engine_evaluation() Stage 2 only

Request Models (Pydantic)

queryEngineRequest — Full pipeline:

class queryEngineRequest(BaseModel):
    basePrompt: str
    retryCount: Optional[int] = 5
    schemaDescriptionKey: Optional[str] = "dbSchemaDescription"
    schemaExamples: Optional[int] = 5
    feedbackExamples: Optional[int] = 3
    extraPayload: Optional[Dict[Any, Any]] = None

queryGenerationRequest — Stage 1 only:

class queryGenerationRequest(BaseModel):
    basePrompt: str
    schemaDescriptionKey: Optional[str] = "dbSchemaDescription"
    schemaExamples: Optional[int] = 5
    extraPayload: Optional[Dict[Any, Any]] = None

queryEvaluationRequest — Stage 2 only:

class queryEvaluationRequest(BaseModel):
    basePrompt: str
    baseQuery: str
    baseDescription: str
    retryCount: Optional[int] = 5
    schemaDescriptionKey: Optional[str] = "dbSchemaDescription"
    schemaExamples: Optional[int] = 5
    feedbackExamples: Optional[int] = 3
    extraPayload: Optional[Dict[Any, Any]] = None

All three use model_config with json_schema_extra for Swagger UI examples.

Connection Parameters

All 13 LLM / PostgreSQL / Redis connection parameters are declared as FastAPI query parameters via the shared connectionDependency() function from connConfig.py. They appear as individual inputs in the Swagger UI.

engine.py — SQLQueryEngine

The orchestrator that coordinates Stage 1 and Stage 2.

Class: SQLQueryEngine

Constructor:

SQLQueryEngine(
    llmParams: dict,
    dbParams: dict,
    redisParams: dict,
    botName: str = "SQLBot",
    splitIdentifier: str = "<|-/|-/>"
)

Stores connection parameters and instantiates QueryGenerator and QueryEvaluator on each method call.

Methods:

Method Parameters Returns Description
run() chatID, basePrompt, retryCount=5, schemaExamples=5, feedbackExamples=3, schemaDescriptionKey dict Full pipeline: generate + evaluate
generate() chatID, basePrompt, schemaExamples=5, schemaDescriptionKey dict Stage 1 only
evaluate() chatID, basePrompt, baseQuery, baseDescription, retryCount=5, schemaExamples=5, feedbackExamples=3, schemaDescriptionKey dict Stage 2 only

Return structure (full pipeline):

{
    "code": 200,
    "chatID": str,
    "generation": {"queryDescription": str, "sqlQuery": str},
    "evaluation": {"currentQuery": str | None, "currentObservation": str | None, "results": list[dict]}
}

queryGenerator.py — Stage 1

Class: QueryGenerator

Constructor:

QueryGenerator(
    llmParams: dict,
    dbParams: dict,
    redisParams: dict,
    botName: str = "SQLBot",
    agentName: str = "SQLQueryEngine",
    splitIdentifier: str = "<|-/|-/>"
)

Internally creates: ChatOpenAI(**llmParams), PostgresDB(**dbParams), SessionManager(redisParams, agentName).

Methods:

Method Parameters Returns Description
process() chatID, schemaExamples, basePrompt dict Generate SQL from natural language
_parseResponse() content: str AutomatedQuerySchema Static. Multi-strategy response parser

_parseResponse() extraction cascade:

  1. Strip <think>...</think> tags (reasoning models like Qwen3, DeepSeek-R1)
  2. Try full JSON parse → AutomatedQuerySchema
  3. Try extracting embedded JSON with "query" or "sql" key
  4. Try extracting SQL from ```sql code blocks
  5. Try matching SELECT ... ; via regex
  6. Last resort: treat entire cleaned text as the query

Internal flow:

  1. Check Redis for cached schema context (dbSchemaDescription)
  2. On cache miss: introspect DB → LLM describes schema (streamed to Pub/Sub) → cache result
  3. Build query generation context with queryGeneratorPrompt
  4. Invoke LLM → _parseResponse() extracts AutomatedQuerySchema
  5. Save chat history to Redis (dbQueryGenerator)
  6. Return {response: {queryDescription, sqlQuery}, data: {...}}

Pydantic Schema: AutomatedQuerySchema

class AutomatedQuerySchema(BaseModel):
    description: str = ""    # Plain English description
    query: str = ""          # The SQL query
    sql: str = ""            # Alias — model_validator normalizes sql → query

queryEvaluator.py — Stage 2

Class: QueryEvaluator

Constructor:

QueryEvaluator(
    llmParams: dict,
    dbParams: dict,
    redisParams: dict,
    botName: str = "SQLBot",
    agentName: str = "SQLQueryEngine",
    splitIdentifier: str = "<|-/|-/>"
)

Internally creates: ChatOpenAI(**llmParams), PostgresDB(**dbParams), SessionManager(redisParams, agentName).

Methods:

Method Parameters Returns Description
process() chatID, basePrompt, baseQuery, baseDescription, retryCount=3, generatorContextKey, schemaExamples=5, feedbackExamples=5, localPayload, hardLimit=50 dict Execute + repair loop
_parseEvalResponse() content: str QueryEvaluationSchema Static. Multi-strategy response parser
_buildFromPayload() generatorContextKey, localPayload dict Extract schema context from generator's output payload
_buildFromRedis() chatID, generatorContextKey dict Load cached schema description from Redis
_buildFromScratch() chatID, generatorContextKey, schemaExamples dict Generate schema from DB introspection + LLM (fallback)

Pydantic Schema: QueryEvaluationSchema

class QueryEvaluationSchema(BaseModel):
    isValid: bool = False          # Always False — system verifies by executing
    modifiedUserPrompt: str = ""   # Optionally adjusted user question
    observation: str = ""          # Diagnosis of what went wrong
    fixedQuery: str = ""           # Corrected SQL query
    fixed_query: str = ""          # Alias → fixedQuery
    sql: str = ""                  # Alias → fixedQuery
    query: str = ""                # Alias → fixedQuery

The model_validator normalizes: if fixedQuery is empty, it falls back to fixed_query, then sql, then query.

Repair Loop Logic

The repair loop iterates up to retryCount times:

  1. Execute current query via PostgresDB.queryExecutor()
  2. Catch psycopg.Error → extract diag.sqlstate, diag.message_primary, diag.message_detail, diag.message_hint, full traceback
  3. Always call conn.rollback() after errors
  4. Track best result: if no errors and rows > best so far, update best result
  5. Early-accept on success: if no errors AND rows returned → accept immediately, skip LLM call, break loop
  6. If error or empty results → invoke LLM evaluator → _parseEvalResponse() extracts fix
  7. Update currentQuery to fixedQuery, increment counter, retry
  8. On loop exit: if validated → return current result; elif best result exists → return best; else → return None
  9. Results capped at hardLimit=50 rows

openaiCompat.py — OpenAI-Compatible Layer

Routes

Route Method Auth Description
/v1/models GET Yes List available models
/v1/chat/completions POST Yes Chat completions (streaming + non-streaming)
/v1/completions POST Yes Legacy text completions

Request Models

ChatCompletionRequest

class ChatCompletionRequest(BaseModel):
    model: str                          # Model identifier (ignored — uses configured engine)
    messages: List[ChatMessage]         # Conversation history
    stream: Optional[bool] = True       # Enable SSE streaming
    chat_id: Optional[str] = None       # Session ID from OpenWebUI
    temperature: Optional[float] = None # Ignored
    max_tokens: Optional[int] = None    # Ignored

CompletionRequest

class CompletionRequest(BaseModel):
    model: str                          # Ignored
    prompt: str                         # Natural language question
    stream: Optional[bool] = True
    temperature: Optional[float] = None # Ignored
    max_tokens: Optional[int] = None    # Ignored

Key Functions

Function Description
verifyApiKey() FastAPI dependency that validates Bearer tokens against OPENAI_API_KEY env var
_validateEnvConnParams() Raises HTTP 500 if required connection env vars are missing — called at the start of each completions request
_stableChatID() Derives a stable chat ID from MD5 hash of the first user message (fallback when chat_id is absent)
_streamSQLQueryEngine() Async generator: subscribes to Pub/Sub before engine starts, runs engine in thread pool, yields SSE chunks with <think> wrapping
_collectFullResponse() Collects the full SSE stream and returns a single OpenAI-format response object
_formatFinalResult() Formats engine output as markdown (description + SQL code block + results table)
_formatSSEChunk() Formats a content delta as an OpenAI-compatible SSE chunk
_formatSSEChunkRole() First SSE chunk establishing role=assistant

Pipeline Defaults

Read from environment variables, with fallbacks:

Variable Default Description
DEFAULT_RETRY_COUNT 5 Max repair loop iterations
DEFAULT_SCHEMA_EXAMPLES 5 Sample rows per table
DEFAULT_FEEDBACK_EXAMPLES 3 Feedback rows in evaluator

Chat ID Derivation

When chat_id is not provided in the request body:

# Uses MD5 hash of the first user message, truncated to 16 chars
chat_id = hashlib.md5(first_user_message.encode()).hexdigest()[:16]

When chat_id IS provided (e.g., by OpenWebUI), it is used directly as the Redis namespace key.

dbHandler.py — PostgreSQL Handler

Class: PostgresDB

Constructor:

PostgresDB(host: str, port: int, dbname: str, user: str, password: str)

Opens a psycopg.connect() connection and sets it to read-only mode via conn.set_read_only(True).

Methods:

Method Parameters Returns Description
listTables() list[str] List all tables in public schema
getTableSchema() table_name: str list[tuple] Returns (column_name, data_type) pairs
getFullTableDump() table_name: str list[tuple] All rows from a table (uses psycopg.sql.SQL + Identifier for safe quoting)
getSchemaDump() expLen: int = 5 dict Schema + sample rows per table
getParsedSchemaDump() expLen: int = 5 (dict, str) Raw dict + formatted string for LLM prompts
queryExecutor() query: str list[dict] Execute query, return results as list of dicts (Decimal→float, datetime→str)
close() None Close cursor and connection

sessionManager.py — Redis Session Manager

Class: SessionManager

Constructor:

SessionManager(redisParams: dict, agentName: str = "SQLQueryEngine")

Creates a redis.Redis client. All keys are namespaced as {chatID}:{agentName}.

Methods:

Method Parameters Returns Description
getUserChatContext() chatID, retrievalKey (list, list) LangChain message objects + raw JSON-serializable dicts
postUserChatContext() chatID, retrievalKey, chatParser None Serialize LangChain messages as JSON and store in Redis hash
getRawUserData() chatID, retrievalKey str Raw string read from hash field
postRawUserData() chatID, retrievalKey, data None Raw JSON write to hash field
updateUsageToken() chatID, currentCounter=-1, retrievalKey int Auto-increment historyCounterManager counter (or set explicit value)

Pub/Sub publishing is done directly via self.redisClient.publish(chatID, message) from the QueryGenerator and QueryEvaluator modules.

connConfig.py — Configuration

Parses environment variables into module-level dictionaries:

Dict Keys
LLM_PARAMS model, temperature, base_url, api_key
DB_PARAMS host, port, dbname, user, password
REDIS_PARAMS host, port, password, db, decode_responses

Module-level constants:

Constant Default Source
BOT_NAME "SQLBot" BOT_NAME env var
SPLIT_IDENTIFIER "<|-/|-/>" SPLIT_IDENTIFIER env var

FastAPI Dependency: connectionDependency()

A FastAPI Depends function that declares 13 query parameters (4 LLM + 5 PostgreSQL + 4 Redis). Each parameter defaults to the env var value when configured, or becomes required (...) when the env var is absent. Returns a dict with effective llm, db, and redis connection params.

promptTemplates.py — LLM Prompts

Four SystemMessagePromptTemplate objects:

Template Input Variables Used By
postgreSystemPrompt botName, botGoal, postgreManual Not currently used in the pipeline (general-purpose template)
postgreSchemaDescriptionPrompt botName, botGoal, dataContext, postgreManual QueryGenerator + QueryEvaluator (schema description generation)
queryGeneratorPrompt botName, botGoal, dataDescription, dataContext, postgreManual QueryGenerator (SQL generation)
queryEvaluatorFixerPrompt botName, botGoal, postgreManual QueryEvaluator (repair loop)

sqlGuidelines.py — PostgreSQL Guidelines

Static string constants injected into LLM prompts as best-practice guidance:

Constant Used By Contents
postgreManualData QueryGenerator (via both postgreSchemaDescriptionPrompt and queryGeneratorPrompt) Comprehensive SQL generation guidelines including: JSONB operators, ILIKE, EXISTS vs IN, EXTRACT/DATE_TRUNC, COALESCE, window functions, CTEs, indexes, ROUND for decimals, no bind parameters, plus CRITICAL OUTPUT RULES (only SELECT asked columns, always ROUND, no bind params, etc.)
postgreManualDataEval QueryEvaluator (via queryEvaluatorFixerPrompt) Condensed evaluation-specific fix rules: error diagnosis strategies, schema-aware column mapping, minimal-change repair philosophy

Evaluation Modules

The evaluation/ directory contains two independent benchmark pipelines and shared utilities. See the Evaluation and BIRD Benchmark wiki pages for full documentation.

Shared Utilities (evaluation/shared/)

Module Description
resultComparator.py Order-independent normalized comparison of predicted vs gold result sets. Handles both list[tuple] (psycopg) and list[dict] (engine) formats.
resourceMetrics.py Tracks wall time, peak memory (via resource.getrusage()), latency percentiles (min, p50, p90, p95, p99, max), and throughput (questions per minute).

Synthetic Evaluation (evaluation/synthetic/)

Module Description
entrypoint.py Orchestrates the full pipeline: seed → run questions → evaluate → score
evalRunner.py 3-config ablation runner with warmup phase + parallel execution via ThreadPoolExecutor
evalConfig.py Environment-driven configuration and PostgreSQL connection helpers
questionRunner.py Executes gold queries to produce reference results
seedData.py Creates and populates three PostgreSQL databases with Faker data (fixed seed for reproducibility)
schemaDefinitions.py Inline DDL for the three evaluation databases
scoreReport.py Generates summary tables (overall, by difficulty, by database, self-healing breakdown)

BIRD Benchmark (evaluation/bird/)

Module Description
birdEntrypoint.py Orchestrates: load dataset → migrate SQLite → evaluate → score
birdDataLoader.py Loads BIRD questions from JSON, converts gold SQL from SQLite to PostgreSQL dialect (14 transformation rules)
sqliteToPostgres.py Introspects SQLite schemas, converts DDL to PostgreSQL, bulk-inserts data with topological FK ordering
birdEvalRunner.py 3-config ablation runner adapted for BIRD (per-database warmup across 11 databases, evidence handling)
birdScoreReport.py BIRD-specific scoring with published baseline comparison and official prediction file generation
birdConfig.py Environment-driven configuration for BIRD dataset paths, evidence toggle, and connections

SQL Query Engine

Design

Setup

API

Internals

Evaluation

Help

Clone this wiki locally