## 0. Connect Database

In [16]:
import sys
import re

from pathlib import Path
sys.path.append(str(Path("D:/code/text-to-sql-agent")))

from src.db.db_connection import get_db_connection

conn = get_db_connection()
cursor = conn.cursor()

## 1. Clean SQL 

In [17]:
def preprocess_sql(sql: str) -> str:
    if not sql:
        return ""

    # Remove triple backtick code fences (```sql ... ``` or ``` ... ```)
    sql = re.sub(r"```(?:sql)?", "", sql, flags=re.IGNORECASE)
    sql = re.sub(r"```", "", sql)

    # Remove triple-quoted strings (""" ... """)
    sql = re.sub(r'^"""|"""$', "", sql.strip(), flags=re.DOTALL)

    return sql.strip()

def normalize_sql(sql: str) -> str:
    """
    Normalize SQL for safety checks:
    - Remove SQL comments
    - Collapse whitespace
    - Lowercase everything
    """
    if not sql:
        return ""

    # Remove single-line comments (-- ...)
    sql = re.sub(r"--.*?$", "", sql, flags=re.MULTILINE)

    # Remove multi-line comments (/* ... */)
    sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL)

    # Normalize whitespace
    sql = re.sub(r"\s+", " ", sql)

    return sql.strip().lower()

def clean_sql(raw_sql):
    preprocessed_sql = preprocess_sql(raw_sql)
    clean_sql = normalize_sql(preprocessed_sql)

    return clean_sql

## 2. Detect Forbidden SQL Operations (DDL / DML)

In [18]:
FORBIDDEN_KEYWORDS = {
    "insert",
    "update",
    "delete",
    "drop",
    "alter",
    "create",
    "truncate",
    "grant",
    "revoke",
    "replace",
    "merge",
    "call",
    "execute",
    "commit",
    "rollback",
    "savepoint"
}

def contains_forbidden_sql(sql: str) -> bool:
    """
    Check whether normalized SQL contains forbidden operations.
    Assumes input SQL is already normalized.
    """
    tokens = set(sql.split())
    return not FORBIDDEN_KEYWORDS.isdisjoint(tokens)

## 3. Check if its Read-Only query

In [19]:
def is_read_only_query(sql: str) -> bool:
    """
    Allow only SELECT or WITH ... SELECT queries.
    """
    return sql.startswith("select") or sql.startswith("with")

## 4. Check If it has multiple queries

In [20]:
def is_single_statement(sql: str) -> bool:
    """
    Ensure only one SQL statement is present.
    """
    return sql.count(";") <= 1

## 5. Final SQL validation

In [21]:
def validate_sql(sql: str) -> tuple[bool, str]:
    """
    Validate SQL against guardrails.
    Returns (is_valid, reason).
    """
    sql = clean_sql(sql)

    if not sql:
        return False, "Empty SQL query"

    if not is_single_statement(sql):
        return False, "Multiple SQL statements detected"

    if contains_forbidden_sql(sql):
        return False, "Forbidden SQL operation detected"

    if not is_read_only_query(sql):
        return False, "Only SELECT queries are allowed"

    return True, "SQL is safe to execute"

## 6. Test with PostgreSQL database

In [22]:
def safe_execute_sql(cursor, sql: str):
    is_valid, reason = validate_sql(sql)

    if not is_valid:
        raise ValueError(f"SQL blocked by guardrails: {reason}")

    cursor.execute(sql)
    return cursor.fetchall()

In [23]:
try:
    safe_execute_sql(cursor, "DELETE FROM orders")
except Exception as e:
    print(e)

SQL blocked by guardrails: Forbidden SQL operation detected


## 7. Full sanity checks

In [24]:
tests = [
    "```SELECT * FROM orders```",
    "WITH t AS (SELECT * FROM orders) SELECT * FROM t",
    "DELETE FROM orders",
    "DROP TABLE products",
    "SELECT * FROM orders; DELETE FROM orders",
    "UPDATE orders SET order_dow = 1"
]

for q in tests:
    valid, reason = validate_sql(q)
    print(f"\nQuery: {q}")
    print("Valid:", valid)
    print("Reason:", reason)


Query: ```SELECT * FROM orders```
Valid: True
Reason: SQL is safe to execute

Query: WITH t AS (SELECT * FROM orders) SELECT * FROM t
Valid: True
Reason: SQL is safe to execute

Query: DELETE FROM orders
Valid: False
Reason: Forbidden SQL operation detected

Query: DROP TABLE products
Valid: False
Reason: Forbidden SQL operation detected

Query: SELECT * FROM orders; DELETE FROM orders
Valid: False
Reason: Forbidden SQL operation detected

Query: UPDATE orders SET order_dow = 1
Valid: False
Reason: Forbidden SQL operation detected
