# Evaluate Full Pipeline Using The DsPy Framework

## Preperations

Before starting, activate the mlflow server by running `mlflow server --backend-store-uri sqlite:///mydb.sqlite`

In [None]:
import dspy
import mlflow
import pyodbc
from datasets import load_dataset

from src.experiments.qpl.text_to_qpl_dspy import TextToQPL, SchemaRepresentation, connection_string

# start mlflow
mlflow.dspy.autolog(
    log_compiles=True,
    log_evals=True,
    log_traces_from_compile=False,
    log_traces_from_eval=True,
)

# Configure MLflow tracking
mlflow.set_tracking_uri("http://127.0.0.1:5000")  # Use local MLflow server
mlflow.set_experiment("QPL")

# connect to db
conn = pyodbc.connect(connection_string, autocommit=True)
cursor = conn.cursor()

# load program
SCHEMA_REPR = SchemaRepresentation.M_SCHEMA

## Prepare Data

In [None]:
# full pipeline test
eval_dataset = [
    dspy.Example(
        db_id=row['qpl'].split('|')[0].strip(),
        question=row['question'],
        query=row['query']
    ).with_inputs('db_id', 'question')
    for row in load_dataset("d4nieldev/nl2qpl-ds", split="development")
]

## Evaluation

In [None]:
# load program
text_to_qpl = TextToQPL(schema_repr=SCHEMA_REPR)

In [None]:
# define metric
from src.experiments.qpl.validate_qpl import compare_qpl_sql

# define metric (execution accuracy)
def metric(example, prediction, trace=None, pred_name=None, pred_trace=None):
    gold_sql = example['query']
    db_id = example['db_id']
    predicted_qpl = prediction['qpl']
    correct, err = compare_qpl_sql(qpl=predicted_qpl, sql=gold_sql, db_id=db_id, cursor=cursor)
    print(correct, err)
    return correct

In [None]:
# evaluate program

# configure model
dspy.settings.configure(lm=dspy.LM("openai/gpt-5-mini", temperature=1.0, max_tokens=20000))

# define evaluation recipe
evaluate = dspy.Evaluate(
    devset=eval_dataset[:1],
    metric=metric,
    num_threads=4,
    display_table=True,
    display_progress=True,
    provide_traceback=True
)

# evaluate
results = evaluate(text_to_qpl)

## Optimization

### Training Datasets

In [None]:
# decomposer train
from src.utilsema import DBSchema

schemas = DBSchema.from_db_schemas_file()

decomposer_ds = load_dataset("d4nieldev/qpl-decomposer-cot-ds", 'original', split="train")
q_to_row = {
    row['question']: row
    for row in decomposer_ds
}

def is_err(func, *args, **kwargs):
    try:
        func(*args, **kwargs)
        return False
    except Exception as e:
        return True

def construct_qd(row) -> dict:
    sub_questions = [sq for sq in [row['sub_question_1'], row['sub_question_2']] if sq]
    return {
        "question": row['question'],
        "operator": row['op'],
        "arguments": [
            construct_qd(q_to_row[sq]) for sq in sub_questions
        ]
    }


decomposer_trainset = [
    dspy.Example(
        db_schema=schemas[row['db_id']].m_schema() if SCHEMA_REPR == SchemaRepresentation.M_SCHEMA else schemas[row['db_id']].ddl(),
        question=row['question'],

        qd=construct_qd(row),

        reasoning=row['cot'],
        operator=row['op'],
        sub_questions=[sq for sq in [row['sub_question_1'], row['sub_question_2']] if sq]
    ).with_inputs('db_schema', 'question')
    for row in decomposer_ds
    if not is_err(construct_qd, row)
]

In [None]:
# completer trainset
import re

flat_qpl_scan_pattern = re.compile(
    r"#(?P<idx>\d+) = Scan Table \[ (?P<table>\w+) \]( Predicate \[ (?P<pred>[^\]]+) \])?( Distinct \[ (?P<distinct>true) \])? Output \[ (?P<out>[^\]]+) \]"
)
flat_qpl_line_pattern = re.compile(
    r"#(?P<idx>\d+) = (?P<op>\w+) \[ (?P<ins>[^\]]+) \] ((?P<opt>\w+) \[ (?P<arg>[^\]]+) \] )*Output \[ (?P<out>[^\]]+) \]"
)

def line_prefix(qpl_line: str) -> str:
    if (m := flat_qpl_scan_pattern.match(qpl_line)):
        return f"#{m.group('idx')} = Scan Table"
    elif (m := flat_qpl_line_pattern.match(qpl_line)):
        return f"#{m.group('idx')} = {m.group('op')} [ {m.group('ins')} ]"
    raise ValueError(f"Could not parse QPL line: {qpl_line}")


completer_trainset = [
    dspy.Example(
        db_schema=schemas[row['db_id']].m_schema() if SCHEMA_REPR == SchemaRepresentation.M_SCHEMA else schemas[row['db_id']].ddl(),
        question=row['question'],
        prefix_qpl=row['prefix_qpl']+"\n"+line_prefix(row['qpl_line'])+" ...",

        operator=row['op'],
        db_id=row['db_id'],

        reasoning=row['cot'],
        last_line=row['qpl_line']
    ).with_inputs('db_schema', 'question', 'prefix_qpl')
    for row in load_dataset("d4nieldev/qpl-completer-cot-ds", 'original', split="train")
]

In [None]:
# sample subsets for more efficient training
import random
from collections import defaultdict

random.seed(42)  # for reproduction

def sample_trainset(trainset: list[dspy.Example], n_per_op: int = 10, n_splits: int = 1) -> tuple[list[dspy.Example]]:
    groups = defaultdict(list)
    for example in trainset:
        if example["operator"] != "Top":
            groups[example["operator"]].append(example)
    
    # shuffle examples in every group
    for op in groups:
        assert len(groups[op]) > n_per_op * n_splits, f"Not enough examples for operator {op}: {len(groups[op])} < {n_per_op * n_splits}"
        random.shuffle(groups[op])

    # split accordingly
    return tuple([[example for examples in groups.values() for example in examples[i*n_per_op:(i+1)*n_per_op]] for i in range(n_splits)])

decomposer_trainset, decomposer_valset = sample_trainset(decomposer_trainset, n_per_op=5, n_splits=2)
completer_trainset, completer_valset = sample_trainset(completer_trainset, n_per_op=5, n_splits=2)

### Metrics

In [None]:
# define completer metric
from src.experiments.qpl.validate_qpl import flat_qpl_to_cte, execute_sql, same_rs

def qpl_to_flat(qpl: str) -> list[str]:
    lines = [line[:line.index(';')] if ';' in line else line for line in qpl.split('\n')]
    return [l for l in lines if l.strip()]

def completer_metric_with_feedback(gold, pred, trace=None, pred_name=None, pred_trace=None):
    db_id = gold['db_id']
    actual_prefix = "\n".join(gold['prefix_qpl'].split("\n")[:-1])
    gold_qpl = actual_prefix+"\n"+gold['last_line']
    pred_qpl = actual_prefix+"\n"+pred['last_line']

    same = False
    feedback_lines = []
    err = False
    
    try:
        # convert QPL to CTE
        pred_cte = flat_qpl_to_cte(qpl_to_flat(pred_qpl), db_id)
    except Exception as e:
        feedback_lines.append("Error converting QPL to CTE - " + str(e))
        feedback_lines.append("")
        err = True
    else:
        # execute and compare
        feedback_lines.append(f"Converted QPL to CTE: `{pred_cte}`.")
        feedback_lines.append("")
        try:
            prs = execute_sql(cursor, pred_cte)
        except Exception as e:
            feedback_lines.append("Could not execute predicted SQL: " + str(e))
            feedback_lines.append("")
            err = True
        else:
            grs = execute_sql(cursor, flat_qpl_to_cte(qpl_to_flat(gold_qpl), db_id))
            same = same_rs(grs, prs, qpl_to_flat(pred_qpl))

    feedback_lines.append(f"Your answer is **{'correct' if same else ('incorrect' if not err else 'invalid')}**. The best QPL line is: `{gold['last_line']}`")
    feedback_lines.append("")
    feedback_lines.append(f"Here is the full step-by-step reasoning that led to that conclusion:")
    feedback_lines.append(gold['reasoning'])
    feedback_lines.append("Think about what takeaways you can learn from this solution to improve your future answers and approach to similar problems.")
    
    # NOTE: maybe add the execution results? (can be very big)
    # NOTE: maybe give higher scores to queries that are executable but incorrect

    return dspy.Prediction(score=1 if same else 0, feedback="\n".join(feedback_lines))

In [None]:
# decomposer judge
import dspy
from src.utilsema import DBSchema

dspy.settings.configure(lm=dspy.LM("openai/gpt-5-mini", temperature=1.0, max_tokens=20000))

schemas = DBSchema.from_db_schemas_file()

class DecomposerJudge(dspy.Signature):
    """Evaluate the quality of an immediate question decomposition into a toplevel QPL operator and its arguments in the form of natural language questions.
    
The toplevel QPL operators are:
- **Scan** - Scan all rows in a table with optional filtering predicate (no decomposition needed - the question is atomic)
- **Aggregate** - Aggregate a stream of tuples, optionally using a grouping criterion into a stream of groups (1 sub-question)
- **Filter** - Remove tuples from a stream that do not match a predicate (1 sub-question)
- **Sort** - Sort a stream according to a sorting expression (1 sub-question)
- **TopSort** - Select the top-K tuples from a stream according to a sorting expression (1 sub-question)
- **Join** - Perform a logical join operation between two streams based on a join condition (2 sub-questions)
- **Except** - Compute the set difference between two streams of tuples (2 sub-questions)
- **Intersect** - Compute the set intersection between two streams of tuples (2 sub-questions)
- **Union** - Compute the set union between two streams of tuples (2 sub-questions)

Notes:
- The operation determined by the operator and arguments predicted in the immediate sub questions are what will determine how to answer the question. Make sure that no critical details are omitted in the arguments (sub questions) unless covered by the operator.
- Even if the immediate decomposition does not match the first step in the full decomposition, you should take a look at what happens down the tree because some operations can be done in different orders and still be valid.
- The sub-questions should not contain the entire decomposition plan, and should be simple questions, that will later be decomposed in the same way.
"""

    db_schema: str = dspy.InputField(desc="The schema of the database the question is being asked about")
    question: str = dspy.InputField(desc="The question to be decomposed")
    full_question_decomposition: dict = dspy.InputField(desc="The full question decomposition tree, where the first element is the gold immediate decomposition, but it also includes the decomposition for the arguments (recursively), which is not required but can provide context.")
    immediate_question_decomposition: dict = dspy.InputField(desc="The decomposition only for the immediate sub-questions, including only the toplevel operator and its arguments. This is what is being evaluated.")

    score: float = dspy.OutputField(desc="A score between 0 and 1 indicating how good the immediate question decomposition is.")


judge = dspy.ChainOfThought(DecomposerJudge)

In [None]:
# bad example
result = judge(
    db_schema=schemas['concert_singer'],
    question="Find the number of concerts happened in the stadium with the highest capacity .",
    full_question_decomposition={
        "operator": "Aggregate",
        "arguments": [
            {
                "question": "List 1 for each concert that happened in the stadium with the highest capacity.",
                "operator": "Join",
                "arguments": [
                    {
                        "question": "Find the id of the stadium with the highest capacity.",
                        "operator": "TopSort",
                        "arguments": [
                            {
                                "question": "Find the id and capacity of all stadiums.",
                                "operator": "Scan",
                                "arguments": []
                            }
                        ]
                    },
                    {
                        "question": "Find the id of the stadium of all concerts.",
                        "operator": "Scan",
                        "arguments": []
                    }
                ]
            }
        ]
    },
    immediate_question_decomposition={
        "operator": "Join",
        "arguments": [
            {"question": "Find the id of the stadium with the highest capacity."},
            {"question": "Find the stadium ids of all concerts."}
        ]
    }
)

print(result.reasoning)
print(result.score)

In [None]:
# good example
result = judge(
    db_schema=schemas['concert_singer'],
    question="Find the number of concerts happened in the stadium with the highest capacity .",
    full_question_decomposition={
        "operator": "Aggregate",
        "arguments": [
            {
                "question": "List 1 for each concert that happened in the stadium with the highest capacity.",
                "operator": "Join",
                "arguments": [
                    {
                        "question": "Find the id of the stadium with the highest capacity.",
                        "operator": "TopSort",
                        "arguments": [
                            {
                                "question": "Find the id and capacity of all stadiums.",
                                "operator": "Scan",
                                "arguments": []
                            }
                        ]
                    },
                    {
                        "question": "Find the id of the stadium of all concerts.",
                        "operator": "Scan",
                        "arguments": []
                    }
                ]
            }
        ]
    },
    immediate_question_decomposition={
        "operator": "Join",
        "arguments": [
            {"question": "Find the id of the stadium with the highest capacity."},
            {"question": "For each stadium id, find how many concerts happened in it."}
        ]
    }
)

print(result.reasoning)
print(result.score)

In [None]:
# decomposer metric
import json

def decomposer_metric_with_feedback(gold, pred, trace=None, pred_name=None, pred_trace=None):
    immediate_decomposition = {
        "question": gold.question,
        "operator": pred.operator,
        "arguments": [{"question": sq} for sq in pred.sub_questions]
    }
    judge_result = judge(
        db_schema=gold.db_schema,
        question=gold.question,
        full_question_decomposition=gold.qd,
        immediate_question_decomposition=immediate_decomposition
    )

    feedback = "Full correct decomposition tree:\n"+json.dumps(gold.qd, indent=2)
    feedback += "\n\nYour immediate decomposition:\n"+json.dumps(immediate_decomposition, indent=2)
    feedback += "\n\nEvaluation of your decomposition:\n"+judge_result.reasoning

    return dspy.Prediction(score=judge_result.score, feedback=feedback)

### GEPA

In [None]:
# optimize decomposer
decomposer_optimizer = dspy.GEPA(
    metric=decomposer_metric_with_feedback,
    auto="light",
    num_threads=4,
    track_stats=True,
    reflection_lm=dspy.LM("openai/gpt-5", temperature=1.0, max_tokens=20000)
)

text_to_qpl.decomposer.compile(optimizer=decomposer_optimizer, trainset=decomposer_trainset, valset=decomposer_valset)
text_to_qpl.decomposer.save("output/dspy/decomposer_gepa.json")

In [None]:
# optimize completer
completer_optimizer = dspy.GEPA(
    metric=completer_metric_with_feedback,
    auto="light",
    num_threads=4,
    track_stats=True,
    reflection_lm=dspy.LM("openai/gpt-5", temperature=1.0, max_tokens=20000)
)

text_to_qpl.completer.compile(optimizer=completer_optimizer, trainset=completer_trainset, valset=completer_valset)
text_to_qpl.completer.save("output/dspy/completer_gepa.json")

In [None]:
# evaluate optimized program

text_to_qpl.decomposer.load("output/dspy/decomposer_gepa.json")
text_to_qpl.completer.load("output/dspy/completer_gepa.json")

# define evaluation recipe
evaluate = dspy.Evaluate(
    devset=eval_dataset,
    metric=metric,
    num_threads=32,
    display_table=True,
    display_progress=True,
    provide_traceback=True
)

# evaluate
results = evaluate(text_to_qpl)