# TPC-DS Text-to-SQL Execution Benchmark

This notebook extends the text-to-SQL pipeline and evaluates execution accuracy on a single TPC-DS benchmark.


In [None]:
# If needed (Kaggle), run once:
# !pip -q install -r ../requirements.txt


In [None]:
from pathlib import Path
import json
import random
import time
import re
import gc
from decimal import Decimal
from datetime import date, datetime

import duckdb
import pandas as pd
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForSeq2SeqLM


def find_repo_root(start: Path) -> Path:
    for p in [start] + list(start.parents):
        if (p / "research_pipeline").exists():
            return p
    return start

REPO_ROOT = find_repo_root(Path.cwd())
DB_PATH = REPO_ROOT / "research_pipeline" / "data" / "ecommerce_dw.duckdb"
PRIMARY_BENCHMARK = REPO_ROOT / "research_pipeline" / "data" / "test_queries_vi_200.json"
FALLBACK_BENCHMARK = REPO_ROOT / "research_pipeline" / "test_queries.json"
OUTPUT_DIR = REPO_ROOT / "research_pipeline"
OUTPUT_CSV_ALL = OUTPUT_DIR / "benchmark_text_to_sql_all.csv"

MODEL_CHOICES = {
    "qwen_3_4b_text_to_sql": {
        "type": "causal",
        "id": "Ellbendls/Qwen-3-4b-Text_to_SQL",
    },
    "t5_small_awesome": {
        "type": "seq2seq",
        "id": "cssupport/t5-small-awesome-text-to-sql",
    },
    "llama3_1_8b_lora": {
        "type": "lora_causal",
        "adapter_id": "philschmid/code-llama-3-1-8b-text-to-sql-lora",
        "base_id": None,
    },
}
MODEL_ORDER = ["qwen_3_4b_text_to_sql", "t5_small_awesome", "llama3_1_8b_lora"]
RUN_ALL_MODELS = True
MODEL_CHOICE = "qwen_3_4b_text_to_sql"
CONTINUE_ON_ERROR = True

MAX_SAMPLES = 50  # set None to run full benchmark
SAMPLE_SEED = 42
DEFAULT_LIMIT = None  # set to an int to force LIMIT on both GT and generated SQL
MAX_TABLES = None  # set to an int to shorten schema prompt
MAX_NEW_TOKENS = 256

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
USE_4BIT = torch.cuda.is_available()
print(f"Using device: {DEVICE}")


In [None]:
AUTO_SETUP_DB = False
if not DB_PATH.exists():
    if AUTO_SETUP_DB:
        import subprocess
        import sys
        setup_script = REPO_ROOT / "research_pipeline" / "setup_dw.py"
        if not setup_script.exists():
            raise FileNotFoundError(f"Missing setup script: {setup_script}")
        subprocess.check_call([sys.executable, str(setup_script)])
    else:
        raise FileNotFoundError(f"TPC-DS DuckDB not found: {DB_PATH}")


In [None]:
con = duckdb.connect(str(DB_PATH), read_only=True)

def duckdb_schema_prompt(con, *, table_schema: str = "main", max_tables: int | None = None) -> str:
    rows = con.execute(
        """
        SELECT table_name, column_name, data_type, ordinal_position
        FROM information_schema.columns
        WHERE table_schema = ?
        ORDER BY table_name, ordinal_position
        """
        [table_schema],
    ).fetchall()

    tables: dict[str, list[tuple[str, str]]] = {}
    for table_name, column_name, data_type, _ in rows:
        tables.setdefault(str(table_name), []).append((str(column_name), str(data_type)))

    table_names = sorted(tables.keys())
    if max_tables is not None:
        table_names = table_names[:max_tables]

    lines: list[str] = []
    for t in table_names:
        lines.append(f"TABLE {t} (")
        for col, typ in tables[t]:
            lines.append(f"  {col} {typ}")
        lines.append(")")
        lines.append("")
    return "\n".join(lines).strip()

schema_text = duckdb_schema_prompt(con, max_tables=MAX_TABLES)
print(f"Schema tables: {schema_text.count('TABLE ')}")


In [None]:
if PRIMARY_BENCHMARK.exists():
    benchmark_path = PRIMARY_BENCHMARK
elif FALLBACK_BENCHMARK.exists():
    benchmark_path = FALLBACK_BENCHMARK
else:
    raise FileNotFoundError("No benchmark JSON found.")

raw_items = json.loads(benchmark_path.read_text())
items = []
for item in raw_items:
    question = item.get("text") or item.get("question")
    sql = item.get("sql")
    if not question or not sql:
        continue
    items.append({
        "id": item.get("id", f"q{len(items)+1}"),
        "text": question,
        "sql": sql,
    })

if MAX_SAMPLES:
    random.seed(SAMPLE_SEED)
    items = random.sample(items, min(MAX_SAMPLES, len(items)))

print(f"Benchmark items: {len(items)} from {benchmark_path}")


In [None]:
def load_model_and_tokenizer(spec: dict):
    model_type = spec["type"]
    if model_type == "seq2seq":
        tokenizer = AutoTokenizer.from_pretrained(spec["id"], use_fast=True, trust_remote_code=True)
        model = AutoModelForSeq2SeqLM.from_pretrained(
            spec["id"],
            device_map="auto" if DEVICE == "cuda" else None,
            torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
            trust_remote_code=True,
        )
        model_kind = "seq2seq"
        model_id = spec["id"]
    elif model_type == "lora_causal":
        from peft import PeftConfig, PeftModel
        adapter_id = spec["adapter_id"]
        peft_config = PeftConfig.from_pretrained(adapter_id)
        base_id = spec.get("base_id") or peft_config.base_model_name_or_path
        tokenizer = AutoTokenizer.from_pretrained(base_id, use_fast=True, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(
            base_id,
            device_map="auto" if DEVICE == "cuda" else None,
            torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
            load_in_4bit=USE_4BIT,
            trust_remote_code=True,
        )
        model = PeftModel.from_pretrained(model, adapter_id)
        model_kind = "causal"
        model_id = f"{base_id} + {adapter_id}"
    else:
        tokenizer = AutoTokenizer.from_pretrained(spec["id"], use_fast=True, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(
            spec["id"],
            device_map="auto" if DEVICE == "cuda" else None,
            torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
            load_in_4bit=USE_4BIT,
            trust_remote_code=True,
        )
        model_kind = "causal"
        model_id = spec["id"]

    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token
    model.eval()
    return tokenizer, model, model_kind, model_id


In [None]:
_FORBIDDEN_SQL = re.compile(
    r"\b(INSERT|UPDATE|DELETE|DROP|ALTER|CREATE|COPY|PRAGMA|ATTACH|DETACH|EXPORT|IMPORT|CALL)\b",
    re.IGNORECASE,
)

def extract_sql(text: str) -> str:
    text = text.strip()
    m = re.search(r"```(?:sql)?\s*(.*?)```", text, flags=re.IGNORECASE | re.DOTALL)
    if m:
        text = m.group(1).strip()
    if text.lower().startswith("sql:"):
        text = text[4:].strip()
    if ";" in text:
        text = text.split(";", 1)[0].strip()
    return text

def is_safe_select(sql: str) -> bool:
    s = re.sub(r"--.*?$", "", sql, flags=re.MULTILINE).strip()
    if not s:
        return False
    if _FORBIDDEN_SQL.search(s):
        return False
    first = re.split(r"\s+", s, maxsplit=1)[0].upper()
    return first in {"SELECT", "WITH"}

def ensure_limit(sql: str, limit: int | None) -> str:
    if limit is None:
        return sql
    s = sql.strip().rstrip(";").strip()
    if re.search(r"\bLIMIT\b", s, flags=re.IGNORECASE):
        return s
    return f"{s}\nLIMIT {limit}"

SYSTEM_PROMPT = (
    "You translate user questions into SQL for DuckDB (TPC-DS). "
    "Return only SQL, no markdown, no explanations. "
    "Use only tables and columns from the schema."
)

def build_prompt(question: str, schema_text: str, tokenizer, model_kind: str) -> str:
    if model_kind == "seq2seq":
        return f"translate to SQL:\n{question}\n\nSCHEMA:\n{schema_text}\n\nSQL:"
    user = f"SCHEMA:\n{schema_text}\n\nQUESTION:\n{question}\n\nSQL:"
    if getattr(tokenizer, "chat_template", None):
        return tokenizer.apply_chat_template(
            [
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user", "content": user},
            ],
            tokenize=False,
            add_generation_prompt=True,
        )
    return f"{SYSTEM_PROMPT}\n\n{user}"

def generate_sql(question: str, schema_text: str, tokenizer, model, model_kind: str) -> str:
    prompt = build_prompt(question, schema_text, tokenizer, model_kind)
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True).to(model.device)
    pad_id = tokenizer.eos_token_id if tokenizer.eos_token_id is not None else tokenizer.pad_token_id
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=MAX_NEW_TOKENS,
            do_sample=False,
            temperature=0.0,
            pad_token_id=pad_id,
        )
    if model_kind == "seq2seq":
        text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    else:
        gen_ids = output_ids[0][inputs["input_ids"].shape[1]:]
        text = tokenizer.decode(gen_ids, skip_special_tokens=True)
    sql = extract_sql(text)
    sql = ensure_limit(sql, DEFAULT_LIMIT)
    return sql


In [None]:
def normalize_value(v):
    if isinstance(v, float):
        return round(v, 6)
    if isinstance(v, Decimal):
        return float(round(v, 6))
    if isinstance(v, (datetime, date)):
        return v.isoformat()
    return v

def normalize_rows(rows):
    if rows is None:
        return None
    norm = [tuple(normalize_value(x) for x in row) for row in rows]
    return sorted(norm)

def run_sql(con, sql: str):
    try:
        res = con.execute(sql).fetchall()
        return res, None
    except Exception as e:
        return None, str(e)


In [None]:
def run_benchmark_for_model(model_choice: str):
    spec = MODEL_CHOICES[model_choice]
    print(f"\nLoading model choice: {model_choice}")
    tokenizer, model, model_kind, model_id = load_model_and_tokenizer(spec)

    results = []
    for idx, item in enumerate(items, 1):
        qid = item["id"]
        question = item["text"]
        gt_sql = ensure_limit(item["sql"], DEFAULT_LIMIT)

        gt_res, gt_err = run_sql(con, gt_sql)

        start = time.time()
        gen_sql = generate_sql(question, schema_text, tokenizer, model, model_kind)
        gen_time = time.time() - start

        if not is_safe_select(gen_sql):
            results.append({
                "id": qid,
                "question": question,
                "gt_sql": gt_sql,
                "gen_sql": gen_sql,
                "exec_match": False,
                "gen_error": "INVALID_SQL",
                "gt_error": gt_err,
                "gen_time_sec": gen_time,
                "model_choice": model_choice,
                "model_id": model_id,
            })
            continue

        gen_res, gen_err = run_sql(con, gen_sql)

        exec_match = False
        if gt_err is None and gen_err is None:
            exec_match = normalize_rows(gt_res) == normalize_rows(gen_res)

        results.append({
            "id": qid,
            "question": question,
            "gt_sql": gt_sql,
            "gen_sql": gen_sql,
            "exec_match": exec_match,
            "gen_error": gen_err,
            "gt_error": gt_err,
            "gen_time_sec": gen_time,
            "model_choice": model_choice,
            "model_id": model_id,
        })

        if idx % 10 == 0:
            print(f"Processed {idx}/{len(items)}")

    results_df = pd.DataFrame(results)

    del model
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    return results_df


In [None]:
model_choices = MODEL_ORDER if RUN_ALL_MODELS else [MODEL_CHOICE]
all_results = []
summary_rows = []

for choice in model_choices:
    try:
        results_df = run_benchmark_for_model(choice)
    except Exception as e:
        print(f"Model {choice} failed: {e}")
        if CONTINUE_ON_ERROR:
            continue
        raise

    out_path = OUTPUT_DIR / f"benchmark_text_to_sql_{choice}.csv"
    results_df.to_csv(out_path, index=False)
    all_results.append(results_df)

    valid_mask = results_df["gen_error"].isna() & results_df["gt_error"].isna()
    exec_acc_all = results_df["exec_match"].mean() if not results_df.empty else 0.0
    exec_acc_valid = results_df.loc[valid_mask, "exec_match"].mean() if valid_mask.any() else 0.0

    summary_rows.append({
        "model_choice": choice,
        "total": len(results_df),
        "exec_acc_all": exec_acc_all,
        "exec_acc_valid": exec_acc_valid,
        "invalid_sql": int((results_df['gen_error'] == 'INVALID_SQL').sum()),
        "gen_exec_errors": int(results_df['gen_error'].notna().sum()),
        "gt_exec_errors": int(results_df['gt_error'].notna().sum()),
        "output_csv": str(out_path),
    })

if not all_results:
    raise RuntimeError("No model results produced.")

combined_df = pd.concat(all_results, ignore_index=True)
combined_df.to_csv(OUTPUT_CSV_ALL, index=False)

summary_df = pd.DataFrame(summary_rows)
print("\nSummary")
print(summary_df)
print(f"Combined results saved to: {OUTPUT_CSV_ALL}")
