diff --git a/py/requirements-dev.txt b/py/requirements-dev.txt index 14fb3c47..8400c47f 100644 --- a/py/requirements-dev.txt +++ b/py/requirements-dev.txt @@ -7,7 +7,6 @@ pylint==4.0.5 pyperf==2.10.0 pytest==9.0.2 pytest-asyncio==1.3.0 -pytest-forked==1.6.0 pytest-vcr==1.0.2 -r requirements-build.txt diff --git a/py/src/braintrust/test_context.py b/py/src/braintrust/test_context.py index 313756cf..17b3c6d1 100644 --- a/py/src/braintrust/test_context.py +++ b/py/src/braintrust/test_context.py @@ -4,8 +4,8 @@ This test suite validates context propagation behavior across various concurrency patterns. TEST ISOLATION STRATEGY: -- Tests use pytest-forked to run each test in an isolated process -- This ensures setup_threads() patches don't leak between tests +- Tests that call setup_threads() run in isolated subprocesses (via subprocess.run) +- This ensures setup_threads() monkey-patches don't leak between tests - Use unpatched(scenario) for xfail tests (documents context loss) - Use patched(scenario) for tests that prove setup_threads() fixes it @@ -16,12 +16,12 @@ def _threadpool_scenario(test_logger, with_memory_logger): test_threadpool_loses_context = unpatched(_threadpool_scenario) test_threadpool_with_patch = patched(_threadpool_scenario) -Run with: pytest --forked src/braintrust/test_context.py +Run with: pytest src/braintrust/test_context.py """ import asyncio import concurrent.futures -import functools +import subprocess import sys import threading from typing import AsyncGenerator, Callable, Generator, TypeVar @@ -30,44 +30,91 @@ def _threadpool_scenario(test_logger, with_memory_logger): import pytest from braintrust import current_span, start_span from braintrust.test_helpers import init_test_logger, with_memory_logger # noqa: F401 -from braintrust.wrappers.threads import setup_threads F = TypeVar("F", bound=Callable) +# --------------------------------------------------------------------------- +# Subprocess isolation helpers +# --------------------------------------------------------------------------- +# Tests that call setup_threads() must run in isolated subprocesses so that +# the monkey-patches applied to threading.Thread / ThreadPoolExecutor do not +# leak between tests. Each isolated test spawns a fresh ``python -c`` +# subprocess. +# --------------------------------------------------------------------------- + +# BRAINTRUST_DISABLE_ATEXIT_FLUSH is set because setup_threads() patches +# ThreadPoolExecutor.submit globally. Without this flag the background +# logger's atexit handler tries to flush via the patched executor during +# Python shutdown, which crashes the subprocess (SIGABRT / 0xC0000409). +_SCENARIO_TEMPLATE = """\ +import os, inspect, asyncio +os.environ["BRAINTRUST_APP_URL"] = "https://www.braintrust.dev" +os.environ["BRAINTRUST_DISABLE_ATEXIT_FLUSH"] = "true" +os.environ.setdefault("OPENAI_API_KEY", "sk-test-dummy-api-key-for-vcr-tests") +os.environ.setdefault("ANTHROPIC_API_KEY", "sk-ant-test-dummy-api-key-for-vcr-tests") +os.environ.setdefault("MISTRAL_API_KEY", "mistral-test-dummy-api-key-for-vcr-tests") +os.environ.setdefault("GOOGLE_API_KEY", os.environ.get("GEMINI_API_KEY", "your_google_api_key_here")) +from braintrust import logger as _logger +from braintrust.test_helpers import init_test_logger +from braintrust.test_context import {fn_name} as _fn +_logger._state.reset_parent_state() +with _logger._internal_with_memory_background_logger() as _bgl: + _tl = init_test_logger("test-context-project") + if {instrument}: + from braintrust.wrappers.threads import setup_threads + setup_threads() + if inspect.iscoroutinefunction(_fn): + asyncio.run(_fn(_tl, _bgl)) + else: + _fn(_tl, _bgl) +_logger._state.reset_parent_state() +""" + + +def _run_in_subprocess(code: str, label: str, timeout: int = 60) -> None: + """Execute *code* in a fresh ``python -c`` subprocess and assert it exits cleanly.""" + try: + result = subprocess.run( + [sys.executable, "-c", code], + capture_output=True, + text=True, + timeout=timeout, + ) + except subprocess.TimeoutExpired as exc: + raise AssertionError(f"Isolated test {label} timed out after {timeout}s") from exc + if result.returncode != 0: + raise AssertionError( + f"Isolated test {label} failed (exit code {result.returncode}):\n{result.stderr}\n{result.stdout}" + ) + def isolate(instrument: bool) -> Callable[[F], F]: """ Decorator for isolated context propagation tests. - - Always runs in forked process (pytest-forked) - - If instrument=True: calls setup_threads() before test + Runs each test in a separate subprocess for full process isolation, + ensuring setup_threads() patches don't leak between tests. + + - If instrument=True: calls setup_threads() before the test - If instrument=False: marks test as xfail (context loss expected) """ def decorator(fn: F) -> F: - if asyncio.iscoroutinefunction(fn): - - @functools.wraps(fn) - async def async_wrapper(*args, **kwargs): - if instrument: - setup_threads() - return await fn(*args, **kwargs) + fn_name = fn.__name__ - wrapped = pytest.mark.forked(async_wrapper) - else: + def isolated_test(): + code = _SCENARIO_TEMPLATE.format(fn_name=fn_name, instrument=instrument) + _run_in_subprocess(code, fn_name) - @functools.wraps(fn) - def wrapper(*args, **kwargs): - if instrument: - setup_threads() - return fn(*args, **kwargs) - - wrapped = pytest.mark.forked(wrapper) + isolated_test.__name__ = fn_name + isolated_test.__qualname__ = fn_name + isolated_test.__doc__ = fn.__doc__ if not instrument: - wrapped = pytest.mark.xfail(reason="context lost without patch")(wrapped) - return wrapped # type: ignore + isolated_test = pytest.mark.xfail(reason="context lost without patch")(isolated_test) + + return isolated_test # type: ignore return decorator @@ -1266,21 +1313,39 @@ def failing_function(): # ============================================================================ -@pytest.mark.forked -def test_setup_threads_returns_true(): - """setup_threads() returns True on success.""" +def _setup_threads_returns_true_check(): + """Subprocess helper: verify setup_threads() returns True.""" + from braintrust.wrappers.threads import setup_threads + result = setup_threads() assert result is True -@pytest.mark.forked -def test_setup_threads_idempotent(): - """Calling setup_threads() multiple times is safe.""" +def _setup_threads_idempotent_check(): + """Subprocess helper: verify setup_threads() is idempotent.""" + from braintrust.wrappers.threads import setup_threads + result1 = setup_threads() result2 = setup_threads() assert result1 is True assert result2 is True +def test_setup_threads_returns_true(): + """setup_threads() returns True on success.""" + _run_in_subprocess( + "from braintrust.test_context import _setup_threads_returns_true_check; _setup_threads_returns_true_check()", + "test_setup_threads_returns_true", + ) + + +def test_setup_threads_idempotent(): + """Calling setup_threads() multiple times is safe.""" + _run_in_subprocess( + "from braintrust.test_context import _setup_threads_idempotent_check; _setup_threads_idempotent_check()", + "test_setup_threads_idempotent", + ) + + if __name__ == "__main__": pytest.main([__file__, "-v", "-s"])