From 6660ac408048c9c54f62db5fdd467403513481f9 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 27 May 2026 08:44:09 -0700 Subject: [PATCH 1/3] follow ups --- py/src/braintrust/dataset_pipeline.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/py/src/braintrust/dataset_pipeline.py b/py/src/braintrust/dataset_pipeline.py index 7c6f81e1..2ed3ad59 100644 --- a/py/src/braintrust/dataset_pipeline.py +++ b/py/src/braintrust/dataset_pipeline.py @@ -4,7 +4,6 @@ from typing_extensions import NotRequired, TypedDict -from .generated_types import ObjectReference from .logger import Metadata from .trace import Trace @@ -19,7 +18,6 @@ "DatasetPipelineTransform", "DatasetPipelineTransformArgs", "DatasetPipelineTransformResult", - "get_registered_dataset_pipelines", ] @@ -49,13 +47,13 @@ class DatasetPipelineRow(TypedDict, total=False): expected: Any | None tags: Sequence[str] | None metadata: Metadata | None - origin: ObjectReference Row = TypeVar("Row", bound=DatasetPipelineRow, covariant=True) class DatasetPipelineTransformArgs(TypedDict, total=False): + id: str input: Any | None output: Any | None metadata: Metadata | None @@ -69,6 +67,7 @@ class DatasetPipelineTransformArgs(TypedDict, total=False): class DatasetPipelineTransform(Protocol[Row]): def __call__( self, + id: str | None = None, input: Any | None = None, output: Any | None = None, metadata: Metadata | None = None, @@ -88,10 +87,6 @@ class DatasetPipelineDefinition(Generic[Row]): _DATASET_PIPELINES: list[DatasetPipelineDefinition[Any]] = [] -def get_registered_dataset_pipelines() -> list[DatasetPipelineDefinition[Any]]: - return list(_DATASET_PIPELINES) - - def DatasetPipeline( name: str | None = None, *, From 7c5e6f1431c2678d850b84b5ea0dc996fa630045 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 27 May 2026 09:04:54 -0700 Subject: [PATCH 2/3] add docs --- py/src/braintrust/dataset_pipeline.py | 50 ++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/py/src/braintrust/dataset_pipeline.py b/py/src/braintrust/dataset_pipeline.py index 2ed3ad59..17d0e9d3 100644 --- a/py/src/braintrust/dataset_pipeline.py +++ b/py/src/braintrust/dataset_pipeline.py @@ -25,40 +25,70 @@ class DatasetPipelineSource(TypedDict, total=False): + """Information about what spans or traces should be passed into the dataset pipeline.""" + project_id: str + """Project ID to take spans or traces from. Takes precedence over project_name.""" project_name: str + """Project name to take spans or traces from.""" org_name: str + """Organization name to take spans or traces from.""" filter: str + """Optional BTQL filter. When omitted, all spans or traces are eligible.""" scope: DatasetPipelineScope + """Whether to pass spans or entire traces to the pipeline. Defaults to "span".""" class DatasetPipelineTarget(TypedDict): + """Information about the target dataset.""" + dataset_name: str + """Dataset name. This can be an existing dataset name or a name to create.""" project_id: NotRequired[str] + """Project ID where the dataset lives or should be created.""" project_name: NotRequired[str] + """Project name where the dataset lives or should be created.""" org_name: NotRequired[str] + """Organization name where the dataset lives or should be created.""" description: NotRequired[str] + """Dataset description to use when creating the dataset.""" metadata: NotRequired[Metadata] + """Dataset metadata to use when creating the dataset.""" class DatasetPipelineRow(TypedDict, total=False): + """A row returned by a dataset pipeline transform.""" + id: str + """Stable row ID for the target dataset. Defaults to the source span or trace ID.""" input: Any | None + """Input value for the target dataset row.""" expected: Any | None + """Expected value for the target dataset row.""" tags: Sequence[str] | None + """Tags for the target dataset row.""" metadata: Metadata | None + """Metadata for the target dataset row.""" Row = TypeVar("Row", bound=DatasetPipelineRow, covariant=True) class DatasetPipelineTransformArgs(TypedDict, total=False): + """Arguments passed to a dataset pipeline transform.""" + id: str + """Source span row ID for span-scoped transforms.""" input: Any | None + """Source span input for span-scoped transforms.""" output: Any | None + """Source span output for span-scoped transforms.""" metadata: Metadata | None + """Source span metadata for span-scoped transforms.""" expected: Any | None + """Source span expected value for span-scoped transforms.""" trace: Trace + """Source trace. This is always available.""" DatasetPipelineTransformResult: TypeAlias = Row | Sequence[Row] | None @@ -78,6 +108,8 @@ def __call__( @dataclass(frozen=True) class DatasetPipelineDefinition(Generic[Row]): + """A registered dataset pipeline definition consumed by the bt CLI.""" + source: DatasetPipelineSource transform: DatasetPipelineTransform[Row] target: DatasetPipelineTarget @@ -94,9 +126,25 @@ def DatasetPipeline( transform: DatasetPipelineTransform[DatasetPipelineRow], target: DatasetPipelineTarget, ) -> DatasetPipelineDefinition[DatasetPipelineRow]: + """Create a runnable dataset pipeline. + + Dataset pipelines take trace data stored in Braintrust, filter and transform it, + and feed it back into a Braintrust dataset. + + Run a dataset pipeline with the bt CLI: + + bt datasets pipeline run path/to/pipeline.py --limit 100 + + The limit controls how many spans or traces, depending on source["scope"], are + discovered for the pipeline. + + This API is experimental and may change or be removed across non-major versions. + """ + stored_source = source.copy() + stored_source["scope"] = stored_source.get("scope", "span") definition = DatasetPipelineDefinition( name=name, - source=source.copy(), + source=stored_source, transform=transform, target=target.copy(), ) From ab388ac2799b5430a23785abd1a5e881cd6171a4 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Wed, 27 May 2026 13:35:26 -0700 Subject: [PATCH 3/3] bump --- py/src/braintrust/test_context.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/py/src/braintrust/test_context.py b/py/src/braintrust/test_context.py index 80e47048..e6b4e4f6 100644 --- a/py/src/braintrust/test_context.py +++ b/py/src/braintrust/test_context.py @@ -48,6 +48,9 @@ def _threadpool_scenario(test_logger, with_memory_logger): # ThreadPoolExecutor.submit globally. Without this flag the background # logger's atexit handler tries to flush via the patched executor during # Python shutdown, which crashes the subprocess (SIGABRT / 0xC0000409). +# The memory logger override is thread-local, so worker threads also need a +# process-local fallback to avoid the real HTTP background logger in this +# isolated test process. _SCENARIO_TEMPLATE = """\ import os, inspect, asyncio os.environ["BRAINTRUST_APP_URL"] = "https://www.braintrust.dev" @@ -58,9 +61,11 @@ def _threadpool_scenario(test_logger, with_memory_logger): os.environ.setdefault("GOOGLE_API_KEY", os.environ.get("GEMINI_API_KEY", "your_google_api_key_here")) from braintrust import logger as _logger from braintrust.test_helpers import init_test_logger +from braintrust.util import LazyValue from braintrust.test_context import {fn_name} as _fn _logger._state.reset_parent_state() with _logger._internal_with_memory_background_logger() as _bgl: + _logger._state._global_bg_logger = LazyValue(lambda: _bgl, use_mutex=False) _tl = init_test_logger("test-context-project") if {instrument}: from braintrust.wrappers.threads import setup_threads