diff --git a/README.md b/README.md index ecb13f6..0d40114 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,7 @@ from basalt.observability import observe, start_observe # Root span with identity tracking @start_observe( + feature_slug="dataset-processing", name="process_workflow", identity={ "organization": {"id": "123", "name": "ACME"}, @@ -261,6 +262,55 @@ Your Basalt instance exposes a `prompts` property for interacting with your Basa basalt.shutdown() ``` +- **Context Managers for Observability** (Recommended) + + Use prompts as context managers to automatically nest LLM calls under a prompt span for better trace organization and observability: + + **Sync Example:** + + ```python + from basalt import Basalt + import openai + + basalt = Basalt(api_key="your-api-key") + client = openai.OpenAI() + + # Use context manager for automatic span nesting + with basalt.prompts.get_sync('summary-prompt', tag='production') as prompt: + response = client.chat.completions.create( + model=prompt.model.model, + messages=[{'role': 'user', 'content': prompt.text}] + ) + print(response.choices[0].message.content) + + basalt.shutdown() + ``` + + **Async Example:** + + ```python + import asyncio + from basalt import Basalt + import openai + + async def generate(): + basalt = Basalt(api_key="your-api-key") + client = openai.AsyncOpenAI() + + async with await basalt.prompts.get('summary-prompt', tag='production') as prompt: + response = await client.chat.completions.create( + model=prompt.model.model, + messages=[{'role': 'user', 'content': prompt.text}] + ) + print(response.choices[0].message.content) + + basalt.shutdown() + + asyncio.run(generate()) + ``` + +See the [Prompts guide](./docs/03-prompts.md#observability-with-context-managers) for complete details. + - **Describe a Prompt** Get metadata about a prompt including available versions and tags. diff --git a/basalt/observability/config.py b/basalt/observability/config.py index 5301817..100ed54 100644 --- a/basalt/observability/config.py +++ b/basalt/observability/config.py @@ -127,7 +127,8 @@ class TelemetryConfig: Example: ["langchain", "llamaindex"] """ - exporter: SpanExporter | None = None + exporter: SpanExporter | list[SpanExporter] | None = None + extra_resource_attributes: dict[str, Any] = field(default_factory=dict) def clone(self) -> TelemetryConfig: @@ -136,6 +137,9 @@ def clone(self) -> TelemetryConfig: cloned.extra_resource_attributes = dict(self.extra_resource_attributes) cloned.enabled_providers = list(self.enabled_providers) if self.enabled_providers else None cloned.disabled_providers = list(self.disabled_providers) if self.disabled_providers else None + # Clone exporter list if it's a list (shallow copy of list, not exporters themselves) + if isinstance(self.exporter, list): + cloned.exporter = list(self.exporter) return cloned def with_env_overrides(self) -> TelemetryConfig: diff --git a/basalt/observability/instrumentation.py b/basalt/observability/instrumentation.py index 2b2cc5a..a81b152 100644 --- a/basalt/observability/instrumentation.py +++ b/basalt/observability/instrumentation.py @@ -74,14 +74,17 @@ def __init__( def create_tracer_provider( config: BasaltConfig, - exporter: SpanExporter | None = None, + exporter: SpanExporter | list[SpanExporter] | None = None, ) -> TracerProvider: """ Create and configure an OpenTelemetry TracerProvider for Basalt. Args: config: BasaltConfig instance with service and environment info. - exporter: Optional SpanExporter. Defaults to ConsoleSpanExporter for debugging. + exporter: Optional SpanExporter or list of SpanExporters. Can be: + - None: Defaults to ConsoleSpanExporter for debugging + - Single SpanExporter: Exports to one destination + - List of SpanExporters: Exports to multiple destinations simultaneously Returns: A configured TracerProvider instance. @@ -102,8 +105,9 @@ def create_tracer_provider( resource = Resource.create(resource_attrs) provider = TracerProvider(resource=resource) + # Normalize exporter to list if exporter is None: - exporter = ConsoleSpanExporter() + exporters = [ConsoleSpanExporter()] warnings.warn( "No span exporter configured and default Basalt OTEL endpoint unavailable. " "Using ConsoleSpanExporter for debugging. For production, configure an exporter " @@ -111,37 +115,65 @@ def create_tracer_provider( UserWarning, stacklevel=3, ) + elif isinstance(exporter, list): + # Handle empty list + exporters = exporter if exporter else [ConsoleSpanExporter()] + if not exporter: + warnings.warn( + "Empty exporter list provided. Using ConsoleSpanExporter for debugging.", + UserWarning, + stacklevel=3, + ) + else: + exporters = [exporter] - processor_cls = SimpleSpanProcessor if isinstance(exporter, ConsoleSpanExporter) else BatchSpanProcessor - provider.add_span_processor(processor_cls(exporter)) + # Add a span processor for each exporter + for exp in exporters: + processor_cls = SimpleSpanProcessor if isinstance(exp, ConsoleSpanExporter) else BatchSpanProcessor + provider.add_span_processor(processor_cls(exp)) return provider def setup_tracing( config: BasaltConfig, - exporter: SpanExporter | None = None, + exporter: SpanExporter | list[SpanExporter] | None = None, ) -> TracerProvider: """ Set up global OpenTelemetry tracing for the Basalt SDK. Args: config: Tracing configuration. - exporter: Optional SpanExporter to use. + exporter: Optional SpanExporter or list of SpanExporters to use. Returns: The configured TracerProvider. Note: - If a TracerProvider is already set globally, this will return the existing - provider instead of creating a new one to avoid "Overriding of current - TracerProvider is not allowed" errors. + If a TracerProvider is already set globally (e.g., by Datadog, Honeycomb, + or another observability tool), this will return the existing provider instead + of creating a new one to avoid "Overriding of current TracerProvider is not allowed" errors. + + When an existing provider is detected, Basalt's span processors + (BasaltContextProcessor, BasaltCallEvaluatorProcessor, BasaltAutoInstrumentationProcessor) + will be attached to it via _install_basalt_processors() in initialize(). + This ensures that all spans (including those from external tools) are enriched + with Basalt's custom metadata, evaluators, and prompt context. + + Integration order: For best results, initialize external observability tools + (Datadog, Honeycomb) before Basalt. If Basalt initializes first, external tools + may fail to override the global provider. """ # Check if a tracer provider is already set globally existing_provider = trace.get_tracer_provider() # If it's a real TracerProvider (not the default proxy), reuse it if hasattr(existing_provider, 'add_span_processor'): - logger.debug("Reusing existing global TracerProvider") + provider_type = type(existing_provider).__name__ + provider_module = type(existing_provider).__module__ + logger.info( + f"Reusing existing global TracerProvider: {provider_module}.{provider_type}. " + f"Basalt processors will be attached to this provider to enrich all spans." + ) return existing_provider # type: ignore[return-value] # Otherwise create and set a new one @@ -180,14 +212,32 @@ def initialize( self._initialized = True return - exporter = effective_config.exporter or self._build_exporter_from_env() + # Combine user-provided exporters with environment-built exporter + user_exporters = effective_config.exporter + env_exporter = self._build_exporter_from_env() + + # Normalize user_exporters to list + if user_exporters is None: + exporters_list = [] + elif isinstance(user_exporters, list): + exporters_list = user_exporters.copy() + else: + exporters_list = [user_exporters] + + # Add environment exporter if available and not already in list + if env_exporter and env_exporter not in exporters_list: + exporters_list.append(env_exporter) + + # Pass to setup_tracing (will handle None/empty list → ConsoleSpanExporter) + final_exporter = exporters_list if exporters_list else None + basalt_config = BasaltConfig( service_name=effective_config.service_name, service_version=effective_config.service_version or "", environment=effective_config.environment, extra_resource_attributes=effective_config.extra_resource_attributes, ) - self._tracer_provider = setup_tracing(basalt_config, exporter=exporter) + self._tracer_provider = setup_tracing(basalt_config, exporter=final_exporter) if self._tracer_provider: self._install_basalt_processors(self._tracer_provider) @@ -403,6 +453,7 @@ def _initialize_instrumentation(self, config: TelemetryConfig) -> None: def _install_basalt_processors(self, provider: TracerProvider) -> None: if getattr(provider, "_basalt_processors_installed", False): + logger.debug("Basalt processors already installed on this provider, skipping") return processors: list[OTelSpanProcessor] = [ @@ -410,11 +461,19 @@ def _install_basalt_processors(self, provider: TracerProvider) -> None: BasaltCallEvaluatorProcessor(), BasaltAutoInstrumentationProcessor(), ] + + provider_type = type(provider).__name__ + logger.info( + f"Installing {len(processors)} Basalt span processors on {provider_type}: " + f"{', '.join(type(p).__name__ for p in processors)}" + ) + for processor in processors: provider.add_span_processor(processor) provider._basalt_processors_installed = True # type: ignore[attr-defined] self._span_processors = processors + logger.debug(f"Successfully installed Basalt processors on {provider_type}") def _uninstrument_providers(self) -> None: diff --git a/examples/multi_exporter_example.py b/examples/multi_exporter_example.py new file mode 100644 index 0000000..b9a0310 --- /dev/null +++ b/examples/multi_exporter_example.py @@ -0,0 +1,67 @@ +import os + +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter +from opentelemetry.sdk.trace.export import ConsoleSpanExporter + +from basalt import Basalt, TelemetryConfig +from basalt.observability import start_observe + +# Configuration +BASALT_API_KEY = os.environ.get("BASALT_API_KEY", "your-basalt-api-key") +BASALT_COLLECTOR_ENDPOINT = "https://grpc.otel.getbasalt.ai" # Basalt's default collector +LOCAL_COLLECTOR_ENDPOINT = "localhost:4317" # Local OTel collector + + +# Create exporters for three destinations + +basalt_exporter = GRPCSpanExporter( + endpoint=BASALT_COLLECTOR_ENDPOINT, + headers=(("authorization", f"Bearer {BASALT_API_KEY}"),), +) + +local_exporter = GRPCSpanExporter( + endpoint=LOCAL_COLLECTOR_ENDPOINT, + insecure=True, + # Local collector typically doesn't need auth headers +) + +console_exporter = ConsoleSpanExporter() + +# Configure Basalt with THREE exporters +telemetry_config = TelemetryConfig( + service_name="multi-export-demo", + environment="production", + exporter=[ + basalt_exporter, # Export to Basalt for advanced features + local_exporter, # Export to local collector + console_exporter, # Export to console for debugging + ], +) + +basalt = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry_config) + + +# Simulate a traced workflow + + +@start_observe(feature_slug="support-ticket", name="onboard_user") +def onboard_user(user_id: str): + """Simulate a customer onboarding workflow.""" + from basalt.observability import observe + + observe.set_input({"user_id": user_id}) + observe.set_metadata({"workflow_version": "2.1"}) + + # In a real app, you might call OpenAI here + # The auto-instrumentation will capture those calls too + result = {"status": "success", "user_id": user_id, "onboarding_complete": True} + + observe.set_output(result) + return result + + +# Execute the workflow +result = onboard_user("user-12345") + +# Flush to ensure traces are sent before exit +basalt.shutdown() diff --git a/tests/observability/test_instrumentation.py b/tests/observability/test_instrumentation.py index 0b608a7..a600d5d 100644 --- a/tests/observability/test_instrumentation.py +++ b/tests/observability/test_instrumentation.py @@ -198,3 +198,56 @@ def test_grpc_exporter_not_wrapped(self, mock_grpc_exporter): # Should NOT be wrapped, should be the gRPC exporter directly self.assertNotIsInstance(exporter, ResilientSpanExporter) self.assertIs(exporter, mock_grpc_exporter_instance) + + @mock.patch("basalt.observability.instrumentation.trace") + def test_install_processors_on_existing_provider(self, mock_trace): + """Test that Basalt processors are installed on an existing TracerProvider (e.g., Datadog).""" + from opentelemetry.sdk.trace import TracerProvider + + # Simulate an external tool (like Datadog) creating a provider first + external_provider = TracerProvider() + mock_trace.get_tracer_provider.return_value = external_provider + mock_trace.set_tracer_provider = mock.Mock() # Should not be called + + manager = InstrumentationManager() + config = TelemetryConfig(service_name="test", enabled=True) + + manager.initialize(config) + + # Verify that setup_tracing reused the existing provider + mock_trace.set_tracer_provider.assert_not_called() + + # Verify that Basalt processors were installed on the external provider + self.assertTrue(hasattr(external_provider, "_basalt_processors_installed")) + self.assertTrue(external_provider._basalt_processors_installed) + + # Verify that the manager has references to the processors + self.assertEqual(len(manager._span_processors), 3) + + # Verify that the manager stored the external provider + self.assertIs(manager._tracer_provider, external_provider) + + @mock.patch("basalt.observability.instrumentation.trace") + def test_processors_not_installed_twice_on_same_provider(self, mock_trace): + """Test that Basalt processors are not installed twice on the same provider.""" + from opentelemetry.sdk.trace import TracerProvider + + external_provider = TracerProvider() + mock_trace.get_tracer_provider.return_value = external_provider + + manager1 = InstrumentationManager() + manager2 = InstrumentationManager() + + config = TelemetryConfig(service_name="test", enabled=True) + + # First initialization should install processors + manager1.initialize(config) + processor_count_after_first = len(external_provider._active_span_processor._span_processors) + + # Second initialization should NOT add processors again (idempotent) + manager2.initialize(config) + processor_count_after_second = len(external_provider._active_span_processor._span_processors) + + # Verify processors were only added once + self.assertEqual(processor_count_after_first, processor_count_after_second) + self.assertTrue(external_provider._basalt_processors_installed) diff --git a/tests/observability/test_multi_exporters.py b/tests/observability/test_multi_exporters.py new file mode 100644 index 0000000..f1dc429 --- /dev/null +++ b/tests/observability/test_multi_exporters.py @@ -0,0 +1,299 @@ +"""Tests for multiple span exporters functionality.""" + +import unittest +from unittest import mock + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ConsoleSpanExporter +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from basalt.observability.config import TelemetryConfig +from basalt.observability.instrumentation import ( + BasaltConfig, + InstrumentationManager, + create_tracer_provider, +) + + +class TestMultipleExporters(unittest.TestCase): + """Test cases for multiple span exporters support.""" + + @classmethod + def setUpClass(cls): + """Save original global state before any tests run.""" + cls._original_provider = trace.get_tracer_provider() + cls._original_once = trace._TRACER_PROVIDER_SET_ONCE + + @classmethod + def tearDownClass(cls): + """Restore original global state after all tests complete.""" + # Restore the Once flag so other test files can set providers + trace._TRACER_PROVIDER_SET_ONCE = cls._original_once + # If there was a real provider originally, try to restore it + if cls._original_provider and not isinstance(cls._original_provider, trace.ProxyTracerProvider): + trace._TRACER_PROVIDER = None + trace._TRACER_PROVIDER_SET_ONCE = trace.Once() + try: + trace.set_tracer_provider(cls._original_provider) + except Exception: + pass + + def setUp(self): + """Reset provider state before each test so we can set new ones.""" + # Allow each test to set its own provider by resetting the Once flag + trace._TRACER_PROVIDER = None + trace._TRACER_PROVIDER_SET_ONCE = trace.Once() + + def tearDown(self): + """Don't clean up - let setUpClass/tearDownClass handle global state.""" + pass + + def test_single_exporter_backward_compatibility(self): + """Test that single exporter still works (backward compatibility).""" + exporter = InMemorySpanExporter() + config = BasaltConfig(service_name="test-service") + + provider = create_tracer_provider(config, exporter=exporter) + + # Verify provider was created + self.assertIsInstance(provider, TracerProvider) + # Verify exporter was added (check _active_span_processor has processors) + self.assertGreater(len(provider._active_span_processor._span_processors), 0) + + def test_multiple_exporters_list(self): + """Test configuring with list of 2 exporters.""" + exporter1 = InMemorySpanExporter() + exporter2 = InMemorySpanExporter() + config = BasaltConfig(service_name="test-service") + + provider = create_tracer_provider(config, exporter=[exporter1, exporter2]) + + # Verify provider was created + self.assertIsInstance(provider, TracerProvider) + # Verify both exporters were added (2 processors) + self.assertEqual(len(provider._active_span_processor._span_processors), 2) + + # Test that both exporters receive spans + trace.set_tracer_provider(provider) + tracer = trace.get_tracer("test") + + with tracer.start_as_current_span("test-span"): + pass + + # Force flush to ensure spans are exported + provider.force_flush() + + # Both exporters should have received the span + self.assertEqual(len(exporter1.get_finished_spans()), 1) + self.assertEqual(len(exporter2.get_finished_spans()), 1) + + # Verify span content is identical + span1 = exporter1.get_finished_spans()[0] + span2 = exporter2.get_finished_spans()[0] + self.assertEqual(span1.name, span2.name) + self.assertEqual(span1.context.trace_id, span2.context.trace_id) + self.assertEqual(span1.context.span_id, span2.context.span_id) + + def test_empty_list_uses_console_exporter(self): + """Test that empty list falls back to ConsoleSpanExporter with warning.""" + config = BasaltConfig(service_name="test-service") + + with self.assertWarns(UserWarning) as cm: + provider = create_tracer_provider(config, exporter=[]) + + # Verify warning message + self.assertIn("Empty exporter list", str(cm.warning)) + + # Verify ConsoleSpanExporter was used + self.assertIsInstance(provider, TracerProvider) + # Check that a processor was added + self.assertGreater(len(provider._active_span_processor._span_processors), 0) + + @mock.patch.dict("os.environ", {"BASALT_OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318"}, clear=False) + @mock.patch("basalt.observability.instrumentation.OTLPSpanExporter") + def test_user_exporters_plus_env_exporter(self, mock_otlp_exporter): + """Test that user exporters are combined with environment exporter.""" + # Mock the OTLP exporter creation + mock_env_exporter = mock.Mock() + mock_otlp_exporter.return_value = mock_env_exporter + + user_exporter = InMemorySpanExporter() + config = TelemetryConfig( + service_name="test-service", + exporter=user_exporter, + ) + + manager = InstrumentationManager() + manager.initialize(config) + + # Verify both exporters were used + provider = manager._tracer_provider + self.assertIsInstance(provider, TracerProvider) + # Should have 2 exporters + 3 Basalt processors = 5 total processors + self.assertEqual(len(provider._active_span_processor._span_processors), 5) + + def test_mixed_console_and_otlp_exporters(self): + """Test mix of ConsoleSpanExporter and regular exporters.""" + console_exporter = ConsoleSpanExporter() + memory_exporter = InMemorySpanExporter() + config = BasaltConfig(service_name="test-service") + + provider = create_tracer_provider( + config, exporter=[console_exporter, memory_exporter] + ) + + # Verify both exporters were added + self.assertIsInstance(provider, TracerProvider) + self.assertEqual(len(provider._active_span_processor._span_processors), 2) + + # Test span export + trace.set_tracer_provider(provider) + tracer = trace.get_tracer("test") + + with tracer.start_as_current_span("test-span"): + pass + + provider.force_flush() + + # Memory exporter should have received the span + self.assertEqual(len(memory_exporter.get_finished_spans()), 1) + + def test_exporter_isolation_on_error(self): + """Test that one failing exporter doesn't affect others.""" + failing_exporter = mock.Mock() + failing_exporter.export.side_effect = Exception("Export failed") + + working_exporter = InMemorySpanExporter() + config = BasaltConfig(service_name="test-service") + + provider = create_tracer_provider( + config, exporter=[failing_exporter, working_exporter] + ) + + trace.set_tracer_provider(provider) + tracer = trace.get_tracer("test") + + with tracer.start_as_current_span("test-span"): + pass + + # Force flush (failing exporter will raise but shouldn't stop working exporter) + try: + provider.force_flush() + except Exception: + pass # Expected from failing exporter + + # Working exporter should still have received the span + self.assertEqual(len(working_exporter.get_finished_spans()), 1) + + def test_duplicate_exporters_allowed(self): + """Test that duplicate exporters in list are allowed (user responsibility).""" + exporter = InMemorySpanExporter() + config = BasaltConfig(service_name="test-service") + + # Same exporter instance twice + provider = create_tracer_provider(config, exporter=[exporter, exporter]) + + # Should have 2 processors (both using same exporter) + self.assertEqual(len(provider._active_span_processor._span_processors), 2) + + def test_none_exporter_uses_console_with_warning(self): + """Test that None exporter defaults to ConsoleSpanExporter with warning.""" + config = BasaltConfig(service_name="test-service") + + with self.assertWarns(UserWarning) as cm: + provider = create_tracer_provider(config, exporter=None) + + # Verify warning message + self.assertIn("No span exporter configured", str(cm.warning)) + + # Verify provider was created + self.assertIsInstance(provider, TracerProvider) + + +class TestTelemetryConfigWithMultipleExporters(unittest.TestCase): + """Test TelemetryConfig with multiple exporters.""" + + def test_config_accepts_exporter_list(self): + """Test that TelemetryConfig accepts list of exporters.""" + exporter1 = InMemorySpanExporter() + exporter2 = InMemorySpanExporter() + + config = TelemetryConfig( + service_name="test-service", + exporter=[exporter1, exporter2], + ) + + self.assertIsInstance(config.exporter, list) + self.assertEqual(len(config.exporter), 2) + self.assertIs(config.exporter[0], exporter1) + self.assertIs(config.exporter[1], exporter2) + + def test_config_accepts_single_exporter(self): + """Test backward compatibility: single exporter still works.""" + exporter = InMemorySpanExporter() + + config = TelemetryConfig( + service_name="test-service", + exporter=exporter, + ) + + # Should be the exporter itself, not wrapped in list + self.assertIsInstance(config.exporter, InMemorySpanExporter) + self.assertIs(config.exporter, exporter) + + def test_clone_with_exporter_list(self): + """Test that clone() properly copies exporter lists.""" + exporter1 = InMemorySpanExporter() + exporter2 = InMemorySpanExporter() + + original = TelemetryConfig( + service_name="test-service", + exporter=[exporter1, exporter2], + ) + + cloned = original.clone() + + # Verify it's a new list instance + self.assertIsNot(cloned.exporter, original.exporter) + # But contains same exporter objects + self.assertEqual(len(cloned.exporter), 2) + self.assertIs(cloned.exporter[0], exporter1) + self.assertIs(cloned.exporter[1], exporter2) + + def test_clone_list_independence(self): + """Test that modifying cloned exporter list doesn't affect original.""" + exporter1 = InMemorySpanExporter() + exporter2 = InMemorySpanExporter() + + original = TelemetryConfig( + service_name="test-service", + exporter=[exporter1, exporter2], + ) + + cloned = original.clone() + + # Modify cloned list + if isinstance(cloned.exporter, list): + cloned.exporter.append(InMemorySpanExporter()) + + # Original should be unchanged + self.assertEqual(len(original.exporter), 2) + + def test_clone_with_single_exporter(self): + """Test that clone() handles single exporter correctly.""" + exporter = InMemorySpanExporter() + + original = TelemetryConfig( + service_name="test-service", + exporter=exporter, + ) + + cloned = original.clone() + + # Should be same exporter object (not cloned) + self.assertIs(cloned.exporter, exporter) + + +if __name__ == "__main__": + unittest.main()