In [6]:
import json
import os
from copy import deepcopy
from pathlib import Path

import dspy
import pandas as pd
import typer
from bellem.musique.eval import (
    aggregate_scores,
    compute_scores,
    compute_scores_dataframe,
)
from bellem.utils import set_seed
from datasets import load_dataset
from dotenv import load_dotenv
from dspy.evaluate import Evaluate
from dspy.teleprompt.ensemble import Ensemble
from rich.console import Console

print = Console(stderr=True).print

load_dotenv()

set_seed(89)

In [7]:
import weave
weave.init(project_name="mhqa-dspy")

<weave.trace.weave_client.WeaveClient at 0x70b8b367b860>

In [8]:
# import mlflow

# mlflow.set_tracking_uri("http://127.0.0.1:5000/")
# mlflow.set_experiment("mhqa-dspy")
# mlflow.dspy.autolog()

In [9]:
def configure_lm(model, temperature):
    lm = dspy.LM(
        "openai/" + model,
        temperature=temperature,
        cache=False,
        api_base=os.getenv("OPENAI_BASE_URL"),
        api_key=os.getenv("OPENAI_API_KEY"),
    )
    dspy.configure(lm=lm)


In [10]:
from rerankers import Reranker

ranker = Reranker(model_type="t5", model_name="unicamp-dl/mt5-base-mmarco-v2")
if ranker is None:
    raise ValueError("Ranker is not initialized")

def retrieve(docs: list[dict], query: str, top_k: int = 3) -> list[dict]:
    """Reranker retriever implementation.

    Args:
        docs: List of documents to search in. Each document should be a dict with 'idx' and 'text' fields.
        query: Query string to search for
        top_k: Number of documents to retrieve (default: 3)

    Returns:
        List of documents sorted by relevance score
    """
    # Extract text and ids from docs
    texts = [doc["text"] for doc in docs]
    ranking = ranker.rank(query=query, docs=texts, doc_ids=list(range(len(texts))))
    return [docs[result.doc_id] for result in ranking.results[:top_k]]

Loading T5Ranker model unicamp-dl/mt5-base-mmarco-v2 (this message can be suppressed by setting verbose=0)
You don't have the necessary dependencies installed to use T5Ranker.
Please install the necessary dependencies for T5Ranker by running `pip install "rerankers[transformers]"` or `pip install "rerankers[all]" to install the dependencies for all reranker types.


ValueError: Ranker is not initialized

In [6]:
import inspect
from dataclasses import dataclass
from typing import Any, Callable, Literal, get_origin, get_type_hints

import dspy
from dspy.primitives.program import Module
from dspy.signatures.signature import ensure_signature
from dspy.utils.callback import with_callbacks
from pydantic import BaseModel, TypeAdapter


@dataclass
class RunContext:
    input: dict[str, Any]


class Tool:
    def __init__(
        self,
        func: Callable,
        name: str = None,
        desc: str = None,
        args: dict[str, Any] | None = None,
    ):
        annotations_func = func if inspect.isfunction(func) or inspect.ismethod(func) else func.__call__

        self.func = func
        self.name = name or getattr(func, "__name__", type(func).__name__)
        self.desc = desc or getattr(func, "__doc__", None) or getattr(annotations_func, "__doc__", "")
        self.args = {}
        self.arg_types = {}

        # If an explicit args dict is passed, use that; otherwise, extract from the function.
        if args is not None:
            hints = args
        else:
            # Use inspect.signature to get all parameter names
            sig = inspect.signature(annotations_func)
            # Get available type hints
            available_hints = get_type_hints(annotations_func)
            # Build a dictionary of parameter name -> type (defaulting to Any when missing)
            hints = {param_name: available_hints.get(param_name, Any) for param_name in sig.parameters.keys()}

        # Process each argument's type to generate its JSON schema.
        for k, v in hints.items():
            self.arg_types[k] = v
            if k == "return":
                continue
            if k == "run_context":
                continue
            # Check if the type (or its origin) is a subclass of Pydantic's BaseModel
            origin = get_origin(v) or v
            if isinstance(origin, type) and issubclass(origin, BaseModel):
                self.args[k] = v.model_json_schema()
            else:
                self.args[k] = TypeAdapter(v).json_schema() or "Any"

    @with_callbacks
    def __call__(self, *args, **kwargs):
        return self.func(*args, **kwargs)


class ReAct(Module):
    def __init__(self, signature, tools: list[Callable], max_iters=5):
        """
        `tools` is either a list of functions, callable classes, or `dspy.Tool` instances.
        """

        self.signature = signature = ensure_signature(signature)
        self.max_iters = max_iters

        tools = [t if isinstance(t, Tool) or hasattr(t, "input_variable") else Tool(t) for t in tools]
        tools = {tool.name: tool for tool in tools}

        inputs = ", ".join([f"`{k}`" for k in signature.input_fields.keys()])
        outputs = ", ".join([f"`{k}`" for k in signature.output_fields.keys()])
        instr = [f"{signature.instructions}\n"] if signature.instructions else []

        instr.extend(
            [
                f"You will be given {inputs} and your goal is to finish with {outputs}.\n",
                "To do this, you will interleave Thought, Tool Name, and Tool Args, and receive a resulting Observation.\n",
                "Thought can reason about the current situation, and Tool Name can be the following types:\n",
            ]
        )

        finish_desc = (
            f"Signals that the final outputs, i.e. {outputs}, are now available and marks the task as complete."
        )
        finish_args = {}  # k: v.annotation for k, v in signature.output_fields.items()}
        tools["finish"] = Tool(
            func=lambda **kwargs: "Completed.",
            name="finish",
            desc=finish_desc,
            args=finish_args,
        )

        for idx, tool in enumerate(tools.values()):
            args = tool.args if hasattr(tool, "args") else str({tool.input_variable: str})
            desc = (f", whose description is <desc>{tool.desc}</desc>." if tool.desc else ".").replace("\n", "  ")
            desc += f" It takes arguments {args} in JSON format."
            instr.append(f"({idx + 1}) {tool.name}{desc}")

        react_signature = (
            dspy.Signature({**signature.input_fields}, "\n".join(instr))
            .append("trajectory", dspy.InputField(), type_=str)
            .append("next_thought", dspy.OutputField(), type_=str)
            .append("next_tool_name", dspy.OutputField(), type_=Literal[tuple(tools.keys())])
            .append("next_tool_args", dspy.OutputField(), type_=dict[str, Any])
        )

        fallback_signature = dspy.Signature(
            {**signature.input_fields, **signature.output_fields},
            signature.instructions,
        ).append("trajectory", dspy.InputField(), type_=str)

        self.tools = tools
        self.react = dspy.Predict(react_signature)
        self.extract = dspy.ChainOfThought(fallback_signature)

    def forward(self, **input_args):
        def format(trajectory: dict[str, Any], last_iteration: bool):
            adapter = dspy.settings.adapter or dspy.ChatAdapter()
            trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x")
            return adapter.format_fields(trajectory_signature, trajectory, role="user")

        trajectory = {}
        for idx in range(self.max_iters):
            pred = self.react(
                **input_args,
                trajectory=format(trajectory, last_iteration=(idx == self.max_iters - 1)),
            )

            trajectory[f"thought_{idx}"] = pred.next_thought
            trajectory[f"tool_name_{idx}"] = pred.next_tool_name
            trajectory[f"tool_args_{idx}"] = pred.next_tool_args

            try:
                parsed_tool_args = {}
                tool = self.tools[pred.next_tool_name]
                for k, v in pred.next_tool_args.items():
                    if hasattr(tool, "arg_types") and k in tool.arg_types:
                        arg_type = tool.arg_types[k]
                        if isinstance((origin := get_origin(arg_type) or arg_type), type) and issubclass(
                            origin, BaseModel
                        ):
                            parsed_tool_args[k] = arg_type.model_validate(v)
                            continue
                    parsed_tool_args[k] = v
                trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](
                    **parsed_tool_args,
                    run_context=RunContext(input=input_args),
                )
            except Exception as e:
                trajectory[f"observation_{idx}"] = f"Failed to execute: {e}"

            if pred.next_tool_name == "finish":
                break

        extract = self.extract(**input_args, trajectory=format(trajectory, last_iteration=False))
        return dspy.Prediction(trajectory=trajectory, **extract)


"""
Thoughts and Planned Improvements for dspy.ReAct.

TOPIC 01: How Trajectories are Formatted, or rather when they are formatted.

Right now, both sub-modules are invoked with a `trajectory` argument, which is a string formatted in `forward`. Though
the formatter uses a general adapter.format_fields, the tracing of DSPy only sees the string, not the formatting logic.

What this means is that, in demonstrations, even if the user adjusts the adapter for a fixed program, the demos' format
will not update accordingly, but the inference-time trajectories will.

One way to fix this is to support `format=fn` in the dspy.InputField() for "trajectory" in the signatures. But this
means that care must be taken that the adapter is accessed at `forward` runtime, not signature definition time.

Another potential fix is to more natively support a "variadic" input field, where the input is a list of dictionaries,
or a big dictionary, and have each adatper format it accordingly.

Trajectories also affect meta-programming modules that view the trace later. It's inefficient O(n^2) to view the
trace of every module repeating the prefix.


TOPIC 02: Handling default arguments in the Tool class.


TOPIC 03: Simplifying ReAct's __init__ by moving modular logic to the Tool class.
    * Handling descriptions and casting.
    * Handling exceptions and error messages.
    * More cleanly defining the "finish" tool, perhaps as a runtime-defined function?


TOPIC 04: Default behavior when the trajectory gets too long.


TOPIC 05: Adding more structure around how the instruction is formatted.
    * Concretely, it's now a string, so an optimizer can and does rewrite it freely.
    * An alternative would be to add more structure, such that a certain template is fixed but values are variable?


TOPIC 06: Idiomatically allowing tools that maintain state across iterations, but not across different `forward` calls.
    * So the tool would be newly initialized at the start of each `forward` call, but maintain state across iterations.
    * This is pretty useful for allowing the agent to keep notes or count certain things, etc.

TOPIC 07: Make max_iters a bit more expressive.
    * Allow passing `max_iters` in forward to overwrite the default.
    * Get rid of `last_iteration: bool` in the format function. It's not necessary now.
"""


'\nThoughts and Planned Improvements for dspy.ReAct.\n\nTOPIC 01: How Trajectories are Formatted, or rather when they are formatted.\n\nRight now, both sub-modules are invoked with a `trajectory` argument, which is a string formatted in `forward`. Though\nthe formatter uses a general adapter.format_fields, the tracing of DSPy only sees the string, not the formatting logic.\n\nWhat this means is that, in demonstrations, even if the user adjusts the adapter for a fixed program, the demos\' format\nwill not update accordingly, but the inference-time trajectories will.\n\nOne way to fix this is to support `format=fn` in the dspy.InputField() for "trajectory" in the signatures. But this\nmeans that care must be taken that the adapter is accessed at `forward` runtime, not signature definition time.\n\nAnother potential fix is to more natively support a "variadic" input field, where the input is a list of dictionaries,\nor a big dictionary, and have each adatper format it accordingly.\n\nTraj

In [7]:
def format_paragraph(paragraph):
    text = paragraph["paragraph_text"]
    title = paragraph["title"]
    return f"# {title}\n{text}"


def make_example(record):
    docs = [{"text": format_paragraph(p), "idx": p["idx"]} for p in record["paragraphs"]]
    return dspy.Example(
        id=record["id"],
        question=record["question"],
        docs=docs,
        question_decomposition=record["question_decomposition"],
        answers=[record["answer"], *record["answer_aliases"]],
    ).with_inputs("question", "docs")


def search(query: str, run_context: RunContext) -> list[str]:
    """Find relevant documents for the query."""
    retrieved_docs = retrieve(run_context.input["docs"], query, 3)
    return [x["text"] for x in retrieved_docs]


def make_program():
    return ReAct("question -> answer", tools=[search])


def evaluate_answer(example, pred, trace=None):
    scores = compute_scores(pred.answer, example.answers)
    return scores["f1"]


def dynamic_import(module, name):
    import importlib

    return getattr(importlib.import_module(module), name)


def make_optimizer(optimizer_config: dict):
    cls = dynamic_import("dspy.teleprompt", optimizer_config["class"])
    kwargs = deepcopy(optimizer_config["params"])
    if optimizer_config["with_metric"]:
        kwargs["metric"] = evaluate_answer
    return cls(**kwargs)


def preprocess_result(result):
    example, pred, score = result
    predictions = {f"predicted_{k}": v for k, v in dict(pred).items()}
    return {**dict(example), **predictions, "score": float(score)}


def make_results_dataframe(results):
    dataf = pd.json_normalize([preprocess_result(result) for result in results])
    dataf["n_hops"] = dataf["question_decomposition"].apply(len)
    dataf["predicted_answer"] = dataf["predicted_answer"].fillna("No Answer")
    return compute_scores_dataframe(dataf)


def train_main(
    dataset_path: str = typer.Option(..., help="Path to the dataset"),
    dataset_name: str = typer.Option(..., help="Name of the dataset"),
    dataset_split: str = typer.Option(..., help="Dataset split to use (e.g., 'train', 'validation')"),
    model: str = typer.Option(..., help="Name of the model to use"),
    temperature: float = typer.Option(..., help="Temperature parameter for the model"),
    load_from: str = typer.Option(default="UNSET", help="Path to a saved model to load"),
    optimizer_path: Path = typer.Option(..., help="Path to the optimizer config"),
    ensemble: str = typer.Option("no", help="Whether to use an ensemble of models"),
    out: Path = typer.Option(..., help="Output file for trained program"),
):
    out.parent.mkdir(parents=True, exist_ok=True)

    # Set up LLM
    configure_lm(model, temperature)

    # Load and preprocess datasets
    ds = load_dataset(dataset_path, dataset_name, split=dataset_split)
    examples = [make_example(record) for record in ds]
    print(f"Loaded {len(examples)} examples")

    # Create the program
    program = make_program()
    if load_from and load_from != "UNSET":
        print(f"Loading model from {load_from}")
        program.load(load_from)

    # Train the program
    with open(optimizer_path) as f:
        optimizer_config = json.load(f)

    if optimizer_config:
        optimizer = make_optimizer(optimizer_config)
        compile_params = optimizer_config.get("compile_params", {})
        trained_program = optimizer.compile(program, trainset=examples, **compile_params)
    else:
        trained_program = program

    if ensemble == "yes":
        ensemble_optimizer = Ensemble(reduce_fn=dspy.majority)
        candidate_programs = [x[-1] for x in trained_program.candidate_programs]
        trained_program = ensemble_optimizer.compile(candidate_programs)

    # Save the trained program
    trained_program.save(out)

    return trained_program

def evaluate_main(
    dataset_path: str = typer.Option(..., help="Path to the dataset"),
    dataset_name: str = typer.Option(..., help="Name of the dataset"),
    dataset_split: str = typer.Option(..., help="Dataset split to use (e.g., 'train', 'validation')"),
    model: str = typer.Option(..., help="Name of the model to use"),
    temperature: float = typer.Option(..., help="Temperature parameter for the model"),
    load_from: str = typer.Option(default="UNSET", help="Path to a saved model to load"),
    out: Path = typer.Option(..., help="Output directory for generated results"),
):
    out.mkdir(parents=True, exist_ok=True)

    # Set up LLM
    configure_lm(model, temperature)

    # Load and preprocess datasets
    ds = load_dataset(dataset_path, dataset_name, split=dataset_split)
    examples = [make_example(record) for record in ds]
    print(f"Loaded {len(examples)} examples")

    # Create the program
    program = make_program()
    if load_from and load_from != "UNSET":
        print(f"Loading model from {load_from}")
        program.load(load_from)

    # Evaluate the program
    evaluate_program = Evaluate(
        metric=evaluate_answer,
        devset=examples,
        num_threads=4,
        display_progress=True,
        return_outputs=True,
    )
    _, results = evaluate_program(program)

    # Save the results
    result_df = make_results_dataframe(results)
    result_df.to_json(out / "results.jsonl", orient="records", lines=True)

    # Save the scores
    scores = aggregate_scores(result_df)
    for n_hops in result_df["n_hops"].unique():
        scores[f"{n_hops}hops"] = aggregate_scores(result_df[result_df["n_hops"] == n_hops])

    with open(out / "scores.json", "w") as f:
        json.dump(scores, f, indent=2)


In [8]:
model='llama-3.3-70b-tgi'
# model='meta-llama/Llama-3.3-70B-Instruct-Turbo'
# model='llama3.1:8b-instruct-q8_0'
# model='llama-3.1-8b-instant'
# model='gemini-2.0-flash-lite-preview-02-05'

In [None]:
out = Path('out')

trained_program = train_main(
    dataset_path='bdsaglam/musique-mini',
    dataset_name='answerable',
    dataset_split='train',
    model=model,
    temperature=0.1,
    load_from='UNSET',
    optimizer_path='../data/raw/optimizer-configs/bfsrs-medium.json',
    out=out,
)

Going to sample between 1 and 8 traces per predictor.
Will attempt to bootstrap 16 candidate sets.
  0%|          | 0/300 [00:00<?, ?it/s]



🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c363-73a0-9157-b6c9b81fc0f7
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c364-7533-b165-a9cc3e1329a0
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c365-78f0-99a0-9ebcf689d5ee
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c363-73a0-9157-b6d20c890afe
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c361-7663-8e83-54b8028e3a28
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c362-7b00-bed1-eb823137426f
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c365-78f0-99a0-9ec04989931d
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c363-73a0-9157-b6ef824027e3
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c360-7f93-b2f5-21dddd9e762b
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c361-7663-8e83-54aee84b5bae
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c366-7ce0-a3d1-5fe152fcbdac
🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-c366-7ce0-a3d1-5ff6612f2291
🍩 https://wandb.

🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-e1ca-7010-88da-ace98b6defef🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-e2e8-78d2-b48d-c86c465b20aa

🍩 https://wandb.ai/bdsaglam/mhqa-dspy/r/call/0194f44a-dfea-7d10-a942-6f9dea83ce94
