diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 1b555eb38aa..209a682e776 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -42,3 +42,4 @@ jobs: # Alert with a commit comment on possible performance regression alert-threshold: '200%' comment-on-alert: true + alert-comment-cc-users: "@open-telemetry/python-approvers,@open-telemetry/python-maintainers" diff --git a/CHANGELOG.md b/CHANGELOG.md index e3091794e0b..4c05b4c1e61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,14 +12,28 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-exporter-otlp-proto-grpc`: Fix re-initialization of gRPC channel on UNAVAILABLE error + ([#4825](https://github.com/open-telemetry/opentelemetry-python/pull/4825)) +- `opentelemetry-exporter-prometheus`: Fix duplicate HELP/TYPE declarations for metrics with different label sets + ([#4868](https://github.com/open-telemetry/opentelemetry-python/issues/4868)) +- Allow loading all resource detectors by setting `OTEL_EXPERIMENTAL_RESOURCE_DETECTORS` to `*` + ([#4819](https://github.com/open-telemetry/opentelemetry-python/pull/4819)) - `opentelemetry-sdk`: Fix the type hint of the `_metrics_data` property to allow `None` - ([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837) + ([#4837](https://github.com/open-telemetry/opentelemetry-python/pull/4837)). - Regenerate opentelemetry-proto code with v1.9.0 release ([#4840](https://github.com/open-telemetry/opentelemetry-python/pull/4840)) - Add python 3.14 support ([#4798](https://github.com/open-telemetry/opentelemetry-python/pull/4798)) - Silence events API warnings for internal users ([#4847](https://github.com/open-telemetry/opentelemetry-python/pull/4847)) +- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`, + ([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)) and ([#4867](https://github.com/open-telemetry/opentelemetry-python/pull/4867)). +- Make ConcurrentMultiSpanProcessor fork safe + ([#4862](https://github.com/open-telemetry/opentelemetry-python/pull/4862)) +- `opentelemetry-exporter-otlp-proto-http`: fix retry logic and error handling for connection failures in trace, metric, and log exporters + ([#4709](https://github.com/open-telemetry/opentelemetry-python/pull/4709)) +- `opentelemetry-sdk`: automatically generate configuration models using OTel config JSON schema + ([#4879](https://github.com/open-telemetry/opentelemetry-python/pull/4879)) ## Version 1.39.0/0.60b0 (2025-12-03) @@ -87,7 +101,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4654](https://github.com/open-telemetry/opentelemetry-python/pull/4654)). - Fix type checking for built-in metric exporters ([#4820](https://github.com/open-telemetry/opentelemetry-python/pull/4820)) - + ## Version 1.38.0/0.59b0 (2025-10-16) - Add `rstcheck` to pre-commit to stop introducing invalid RST diff --git a/README.md b/README.md index 90b9ceaeade..a84c0c7bae8 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,7 @@ For more information about the maintainer role, see the [community repository](h ### Approvers - [Dylan Russell](https://github.com/dylanrussell), Google -- [Emídio Neto](https://github.com/emdneto), PicPay +- [Emídio Neto](https://github.com/emdneto), Independent - [Héctor Hernández](https://github.com/hectorhdzg), Microsoft - [Jeremy Voss](https://github.com/jeremydvoss), Microsoft - [Liudmila Molkova](https://github.com/lmolkova), Grafana Labs diff --git a/RELEASING.md b/RELEASING.md index 5c8f447be30..dc0b73b4bfd 100644 --- a/RELEASING.md +++ b/RELEASING.md @@ -33,6 +33,18 @@ * Review and merge the pull request that it creates for updating the version. * Note: If you are doing a patch release in `-core` repo, you should also do an equivalent patch release in `-contrib` repo (even if there's no fix to release), otherwise tests in CI will fail. +### Note on `contrib.yml` Workflow Behavior + +The [contrib.yml](https://github.com/open-telemetry/opentelemetry-python/blob/main/.github/workflows/contrib.yml) workflow in the core repository references reusable workflows from opentelemetry-python-contrib using the hard-coded `main` branch. + +Because `uses:` statements cannot receive environment variables and workflows cannot patch or modify other workflows, this reference cannot dynamically follow release branches as we are doing in other workflows. + +As a result, when preparing a release branch that contains a different set of instrumentations (e.g., older branches without newly added tox environments), CI may attempt to run tests that do not exist on tox in that branch. In this case: + +* It is safe to merge the release PR even if the contrib workflow fails for this reason, or + +* Optionally update the contrib.yml workflow to point to the corresponding release branch before running CI. + ## Making the release * Run the [Release workflow](https://github.com/open-telemetry/opentelemetry-python/actions/workflows/release.yml). diff --git a/docs/examples/sqlcommenter/README.rst b/docs/examples/sqlcommenter/README.rst index e090c32144c..99b5de526e7 100644 --- a/docs/examples/sqlcommenter/README.rst +++ b/docs/examples/sqlcommenter/README.rst @@ -5,7 +5,7 @@ This is an example of how to use OpenTelemetry Python instrumention with sqlcommenter to enrich database query statements with contextual information. For more information on sqlcommenter concepts, see: -* `Semantic Conventions - Database Spans `_ +* `Semantic Conventions - Database Spans `_ * `sqlcommenter `_ The source files of this example are available `here `_. @@ -120,5 +120,5 @@ References * `OpenTelemetry Project `_ * `OpenTelemetry Collector `_ * `OpenTelemetry MySQL instrumentation `_ -* `Semantic Conventions - Database Spans `_ +* `Semantic Conventions - Database Spans `_ * `sqlcommenter `_ \ No newline at end of file diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index be86e5b0cf5..89c2608c30a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -12,7 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""OTLP Exporter""" +"""OTLP Exporter + +This module provides a mixin class for OTLP exporters that send telemetry data +to an OTLP-compatible receiver via gRPC. It includes a configurable reconnection +logic to handle transient collector outages. + +""" import random import threading @@ -251,9 +257,11 @@ def _get_credentials( if certificate_file: client_key_file = environ.get(client_key_file_env_key) client_certificate_file = environ.get(client_certificate_file_env_key) - return _load_credentials( + credentials = _load_credentials( certificate_file, client_key_file, client_certificate_file ) + if credentials is not None: + return credentials return ssl_channel_credentials() @@ -261,10 +269,15 @@ def _get_credentials( class OTLPExporterMixin( ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT, ExportStubT] ): - """OTLP span exporter + """OTLP gRPC exporter mixin. + + This class provides the base functionality for OTLP exporters that send + telemetry data (spans or metrics) to an OTLP-compatible receiver via gRPC. + It includes a configurable reconnection mechanism to handle transient + receiver outages. Args: - endpoint: OpenTelemetry Collector receiver endpoint + endpoint: OTLP-compatible receiver endpoint insecure: Connection type credentials: ChannelCredentials object for server authentication headers: Headers to send when exporting @@ -308,6 +321,8 @@ def __init__( if parsed_url.netloc: self._endpoint = parsed_url.netloc + self._insecure = insecure + self._credentials = credentials self._headers = headers or environ.get(OTEL_EXPORTER_OTLP_HEADERS) if isinstance(self._headers, str): temp_headers = parse_env_headers(self._headers, liberal=True) @@ -336,37 +351,52 @@ def __init__( ) self._collector_kwargs = None - compression = ( + self._compression = ( environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) if compression is None else compression ) or Compression.NoCompression - if insecure: - self._channel = insecure_channel( - self._endpoint, - compression=compression, - options=self._channel_options, - ) - else: + self._channel = None + self._client = None + + self._shutdown_in_progress = threading.Event() + self._shutdown = False + + if not self._insecure: self._credentials = _get_credentials( - credentials, + self._credentials, _OTEL_PYTHON_EXPORTER_OTLP_GRPC_CREDENTIAL_PROVIDER, OTEL_EXPORTER_OTLP_CERTIFICATE, OTEL_EXPORTER_OTLP_CLIENT_KEY, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) + + self._initialize_channel_and_stub() + + def _initialize_channel_and_stub(self): + """ + Create a new gRPC channel and stub. + + This method is used during initialization and by the reconnection + mechanism to reinitialize the channel on transient errors. + """ + if self._insecure: + self._channel = insecure_channel( + self._endpoint, + compression=self._compression, + options=self._channel_options, + ) + else: + assert self._credentials is not None self._channel = secure_channel( self._endpoint, self._credentials, - compression=compression, + compression=self._compression, options=self._channel_options, ) self._client = self._stub(self._channel) # type: ignore [reportCallIssue] - self._shutdown_in_progress = threading.Event() - self._shutdown = False - @abstractmethod def _translate_data( self, @@ -388,6 +418,8 @@ def _export( deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): try: + if self._client is None: + return self._result.FAILURE self._client.Export( request=self._translate_data(data), metadata=self._headers, @@ -407,6 +439,26 @@ def _export( retry_info.retry_delay.seconds + retry_info.retry_delay.nanos / 1.0e9 ) + + # For UNAVAILABLE errors, reinitialize the channel to force reconnection + if error.code() == StatusCode.UNAVAILABLE and retry_num == 0: # type: ignore + logger.debug( + "Reinitializing gRPC channel for %s exporter due to UNAVAILABLE error", + self._exporting, + ) + try: + if self._channel: + self._channel.close() + except Exception as e: + logger.debug( + "Error closing channel for %s exporter to %s: %s", + self._exporting, + self._endpoint, + str(e), + ) + # Enable channel reconnection for subsequent calls + self._initialize_channel_and_stub() + if ( error.code() not in _RETRYABLE_ERROR_CODES # type: ignore [reportAttributeAccessIssue] or retry_num + 1 == _MAX_RETRYS @@ -436,12 +488,19 @@ def _export( return self._result.FAILURE # type: ignore [reportReturnType] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + """ + Shut down the exporter. + + Args: + timeout_millis: Timeout in milliseconds for shutting down the exporter. + """ if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") return self._shutdown = True self._shutdown_in_progress.set() - self._channel.close() + if self._channel: + self._channel.close() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 8c2ed0e1501..de27d0fe792 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -24,6 +24,7 @@ from unittest import TestCase from unittest.mock import Mock, patch +import grpc from google.protobuf.duration_pb2 import ( # pylint: disable=no-name-in-module Duration, ) @@ -91,8 +92,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: def _exporting(self): return "traces" - def shutdown(self, timeout_millis=30_000): - return OTLPExporterMixin.shutdown(self, timeout_millis) + def shutdown(self, timeout_millis: float = 30_000, **kwargs): + return OTLPExporterMixin.shutdown(self, timeout_millis, **kwargs) class TraceServiceServicerWithExportParams(TraceServiceServicer): @@ -513,6 +514,16 @@ def test_timeout_set_correctly(self): self.assertEqual(mock_trace_service.num_requests, 2) self.assertAlmostEqual(after - before, 1.4, 1) + def test_channel_options_set_correctly(self): + """Test that gRPC channel options are set correctly for keepalive and reconnection""" + # This test verifies that the channel is created with the right options + # We patch grpc.insecure_channel to ensure it is called without errors + with patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel" + ) as mock_channel: + OTLPSpanExporterForTesting(insecure=True) + self.assertTrue(mock_channel.called) + def test_otlp_headers_from_env(self): # pylint: disable=protected-access # This ensures that there is no other header than standard user-agent. @@ -536,3 +547,27 @@ def test_permanent_failure(self): warning.records[-1].message, "Failed to export traces to localhost:4317, error code: StatusCode.ALREADY_EXISTS", ) + + def test_unavailable_reconnects(self): + """Test that the exporter reconnects on UNAVAILABLE error""" + add_TraceServiceServicer_to_server( + TraceServiceServicerWithExportParams(StatusCode.UNAVAILABLE), + self.server, + ) + + # Spy on grpc.insecure_channel to verify it's called for reconnection + with patch( + "opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel", + side_effect=grpc.insecure_channel, + ) as mock_insecure_channel: + # Mock sleep to avoid waiting + with patch("time.sleep"): + # We expect FAILURE because the server keeps returning UNAVAILABLE + # but we want to verify reconnection attempts happened + self.exporter.export([self.span]) + + # Verify that we attempted to reinitialize the channel (called insecure_channel) + # Since the initial channel was created in setUp (unpatched), this call + # must be from the reconnection logic. + self.assertTrue(mock_insecure_channel.called) + # Verify that reconnection enabled flag is set diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index b120a2cca45..7aea76be8d2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -186,26 +186,42 @@ def export( serialized_data = encode_logs(batch).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return LogRecordExportResult.SUCCESS # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return LogRecordExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + if not retryable: + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + status_code, + reason, + ) + return LogRecordExportResult.FAILURE + if ( - not _is_retryable(resp) - or retry_num + 1 == _MAX_RETRYS + retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) or self._shutdown ): _logger.error( - "Failed to export logs batch code: %s, reason: %s", - resp.status_code, - resp.text, + "Failed to export logs batch due to timeout, " + "max retries or shutdown." ) return LogRecordExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting logs batch, retrying in %.2fs.", - resp.reason, + reason, backoff_seconds, ) shutdown = self._shutdown_is_occuring.wait(backoff_seconds) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index c6d657e7ae0..7e08f624375 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -231,26 +231,41 @@ def export( serialized_data = encode_metrics(metrics_data).SerializeToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return MetricExportResult.SUCCESS # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return MetricExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + if not retryable: + _logger.error( + "Failed to export metrics batch code: %s, reason: %s", + status_code, + reason, + ) + return MetricExportResult.FAILURE if ( - not _is_retryable(resp) - or retry_num + 1 == _MAX_RETRYS + retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) or self._shutdown ): _logger.error( - "Failed to export metrics batch code: %s, reason: %s", - resp.status_code, - resp.text, + "Failed to export metrics batch due to timeout, " + "max retries or shutdown." ) return MetricExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting metrics batch, retrying in %.2fs.", - resp.reason, + reason, backoff_seconds, ) shutdown = self._shutdown_in_progress.wait(backoff_seconds) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index 055e829daba..d02f94adf05 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -179,26 +179,42 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: serialized_data = encode_spans(spans).SerializePartialToString() deadline_sec = time() + self._timeout for retry_num in range(_MAX_RETRYS): - resp = self._export(serialized_data, deadline_sec - time()) - if resp.ok: - return SpanExportResult.SUCCESS # multiplying by a random number between .8 and 1.2 introduces a +/20% jitter to each backoff. backoff_seconds = 2**retry_num * random.uniform(0.8, 1.2) + try: + resp = self._export(serialized_data, deadline_sec - time()) + if resp.ok: + return SpanExportResult.SUCCESS + except requests.exceptions.RequestException as error: + reason = error + retryable = isinstance(error, ConnectionError) + status_code = None + else: + reason = resp.reason + retryable = _is_retryable(resp) + status_code = resp.status_code + + if not retryable: + _logger.error( + "Failed to export span batch code: %s, reason: %s", + status_code, + reason, + ) + return SpanExportResult.FAILURE + if ( - not _is_retryable(resp) - or retry_num + 1 == _MAX_RETRYS + retry_num + 1 == _MAX_RETRYS or backoff_seconds > (deadline_sec - time()) or self._shutdown ): _logger.error( - "Failed to export span batch code: %s, reason: %s", - resp.status_code, - resp.text, + "Failed to export span batch due to timeout, " + "max retries or shutdown." ) return SpanExportResult.FAILURE _logger.warning( "Transient error %s encountered while exporting span batch, retrying in %.2fs.", - resp.reason, + reason, backoff_seconds, ) shutdown = self._shutdown_in_progress.wait(backoff_seconds) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index eca1aed5d98..2dbbadccb9e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -19,7 +19,9 @@ from unittest import TestCase from unittest.mock import ANY, MagicMock, Mock, patch +import requests from requests import Session +from requests.exceptions import ConnectionError from requests.models import Response from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( @@ -555,6 +557,40 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + @patch.object(Session, "post") + def test_export_no_collector_available_retryable(self, mock_post): + exporter = OTLPMetricExporter(timeout=1.5) + msg = "Server not available." + mock_post.side_effect = ConnectionError(msg) + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + # Check for greater 2 because the request is on each retry + # done twice at the moment. + self.assertGreater(mock_post.call_count, 2) + self.assertIn( + f"Transient error {msg} encountered while exporting metrics batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_export_no_collector_available(self, mock_post): + exporter = OTLPMetricExporter(timeout=1.5) + + mock_post.side_effect = requests.exceptions.RequestException() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + self.assertEqual(mock_post.call_count, 1) + self.assertIn( + "Failed to export metrics batch code", + warning.records[0].message, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 31e824a980f..c86ac1f6ba1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -24,6 +24,7 @@ import requests from google.protobuf.json_format import MessageToDict from requests import Session +from requests.exceptions import ConnectionError from requests.models import Response from opentelemetry._logs import LogRecord, SeverityNumber @@ -483,6 +484,40 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + @patch.object(Session, "post") + def test_export_no_collector_available_retryable(self, mock_post): + exporter = OTLPLogExporter(timeout=1.5) + msg = "Server not available." + mock_post.side_effect = ConnectionError(msg) + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.FAILURE, + ) + # Check for greater 2 because the request is on each retry + # done twice at the moment. + self.assertGreater(mock_post.call_count, 2) + self.assertIn( + f"Transient error {msg} encountered while exporting logs batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_export_no_collector_available(self, mock_post): + exporter = OTLPLogExporter(timeout=1.5) + + mock_post.side_effect = requests.exceptions.RequestException() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export(self._get_sdk_log_data()), + LogRecordExportResult.FAILURE, + ) + self.assertEqual(mock_post.call_count, 1) + self.assertIn( + "Failed to export logs batch code", + warning.records[0].message, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 10dcb1a9e01..5f61344bbf1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -20,6 +20,7 @@ import requests from requests import Session +from requests.exceptions import ConnectionError from requests.models import Response from opentelemetry.exporter.otlp.proto.http import Compression @@ -303,6 +304,40 @@ def test_retry_timeout(self, mock_post): warning.records[0].message, ) + @patch.object(Session, "post") + def test_export_no_collector_available_retryable(self, mock_post): + exporter = OTLPSpanExporter(timeout=1.5) + msg = "Server not available." + mock_post.side_effect = ConnectionError(msg) + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export([BASIC_SPAN]), + SpanExportResult.FAILURE, + ) + # Check for greater 2 because the request is on each retry + # done twice at the moment. + self.assertGreater(mock_post.call_count, 2) + self.assertIn( + f"Transient error {msg} encountered while exporting span batch, retrying in", + warning.records[0].message, + ) + + @patch.object(Session, "post") + def test_export_no_collector_available(self, mock_post): + exporter = OTLPSpanExporter(timeout=1.5) + + mock_post.side_effect = requests.exceptions.RequestException() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export([BASIC_SPAN]), + SpanExportResult.FAILURE, + ) + self.assertEqual(mock_post.call_count, 1) + self.assertIn( + "Failed to export span batch code", + warning.records[0].message, + ) + @patch.object(Session, "post") def test_timeout_set_correctly(self, mock_post): resp = Response() diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 475cfb1266e..fa89da4e71e 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -225,36 +225,23 @@ def _translate_to_prometheus( for metric in metrics: label_values_data_points = [] - label_keys_data_points = [] values = [] - per_metric_family_ids = [] - metric_name = sanitize_full_name(metric.name) metric_description = metric.description or "" metric_unit = map_unit(metric.unit) + # First pass: collect all unique label keys across all data points + all_label_keys_set = set() + data_point_attributes = [] for number_data_point in metric.data.data_points: - label_keys = [] - label_values = [] - - for key, value in sorted(number_data_point.attributes.items()): - label_keys.append(sanitize_attribute(key)) - label_values.append(self._check_value(value)) - - per_metric_family_ids.append( - "|".join( - [ - metric_name, - metric_description, - "%".join(label_keys), - metric_unit, - ] - ) - ) + attrs = {} + for key, value in number_data_point.attributes.items(): + sanitized_key = sanitize_attribute(key) + all_label_keys_set.add(sanitized_key) + attrs[sanitized_key] = self._check_value(value) + data_point_attributes.append(attrs) - label_values_data_points.append(label_values) - label_keys_data_points.append(label_keys) if isinstance(number_data_point, HistogramDataPoint): values.append( { @@ -268,87 +255,106 @@ def _translate_to_prometheus( else: values.append(number_data_point.value) - for per_metric_family_id, label_keys, label_values, value in zip( - per_metric_family_ids, - label_keys_data_points, - label_values_data_points, - values, - ): - is_non_monotonic_sum = ( - isinstance(metric.data, Sum) - and metric.data.is_monotonic is False - ) - is_cumulative = ( - isinstance(metric.data, Sum) - and metric.data.aggregation_temporality - == AggregationTemporality.CUMULATIVE - ) + # Sort label keys for consistent ordering + all_label_keys = sorted(all_label_keys_set) - # The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge. - should_convert_sum_to_gauge = ( - is_non_monotonic_sum and is_cumulative - ) + # Second pass: build label values with empty strings for missing labels + for attrs in data_point_attributes: + label_values = [] + for key in all_label_keys: + label_values.append(attrs.get(key, "")) + label_values_data_points.append(label_values) - if ( - isinstance(metric.data, Sum) - and not should_convert_sum_to_gauge - ): - metric_family_id = "|".join( - [per_metric_family_id, CounterMetricFamily.__name__] - ) + # Create metric family ID without label keys + per_metric_family_id = "|".join( + [ + metric_name, + metric_description, + metric_unit, + ] + ) + + is_non_monotonic_sum = ( + isinstance(metric.data, Sum) + and metric.data.is_monotonic is False + ) + is_cumulative = ( + isinstance(metric.data, Sum) + and metric.data.aggregation_temporality + == AggregationTemporality.CUMULATIVE + ) + + # The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge. + should_convert_sum_to_gauge = ( + is_non_monotonic_sum and is_cumulative + ) + + if ( + isinstance(metric.data, Sum) + and not should_convert_sum_to_gauge + ): + metric_family_id = "|".join( + [per_metric_family_id, CounterMetricFamily.__name__] + ) - if metric_family_id not in metric_family_id_metric_family: - metric_family_id_metric_family[metric_family_id] = ( - CounterMetricFamily( - name=metric_name, - documentation=metric_description, - labels=label_keys, - unit=metric_unit, - ) + if metric_family_id not in metric_family_id_metric_family: + metric_family_id_metric_family[metric_family_id] = ( + CounterMetricFamily( + name=metric_name, + documentation=metric_description, + labels=all_label_keys, + unit=metric_unit, ) + ) + for label_values, value in zip( + label_values_data_points, values + ): metric_family_id_metric_family[ metric_family_id ].add_metric(labels=label_values, value=value) - elif ( - isinstance(metric.data, Gauge) - or should_convert_sum_to_gauge - ): - metric_family_id = "|".join( - [per_metric_family_id, GaugeMetricFamily.__name__] - ) + elif isinstance(metric.data, Gauge) or should_convert_sum_to_gauge: + metric_family_id = "|".join( + [per_metric_family_id, GaugeMetricFamily.__name__] + ) - if ( - metric_family_id - not in metric_family_id_metric_family.keys() - ): - metric_family_id_metric_family[metric_family_id] = ( - GaugeMetricFamily( - name=metric_name, - documentation=metric_description, - labels=label_keys, - unit=metric_unit, - ) + if ( + metric_family_id + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[metric_family_id] = ( + GaugeMetricFamily( + name=metric_name, + documentation=metric_description, + labels=all_label_keys, + unit=metric_unit, ) + ) + for label_values, value in zip( + label_values_data_points, values + ): metric_family_id_metric_family[ metric_family_id ].add_metric(labels=label_values, value=value) - elif isinstance(metric.data, Histogram): - metric_family_id = "|".join( - [per_metric_family_id, HistogramMetricFamily.__name__] - ) + elif isinstance(metric.data, Histogram): + metric_family_id = "|".join( + [per_metric_family_id, HistogramMetricFamily.__name__] + ) - if ( - metric_family_id - not in metric_family_id_metric_family.keys() - ): - metric_family_id_metric_family[metric_family_id] = ( - HistogramMetricFamily( - name=metric_name, - documentation=metric_description, - labels=label_keys, - unit=metric_unit, - ) + if ( + metric_family_id + not in metric_family_id_metric_family.keys() + ): + metric_family_id_metric_family[metric_family_id] = ( + HistogramMetricFamily( + name=metric_name, + documentation=metric_description, + labels=all_label_keys, + unit=metric_unit, ) + ) + for label_values, value in zip( + label_values_data_points, values + ): metric_family_id_metric_family[ metric_family_id ].add_metric( @@ -358,10 +364,10 @@ def _translate_to_prometheus( ), sum_value=value["sum"], ) - else: - _logger.warning( - "Unsupported metric data. %s", type(metric.data) - ) + else: + _logger.warning( + "Unsupported metric data. %s", type(metric.data) + ) # pylint: disable=no-self-use def _check_value(self, value: Union[int, float, str, Sequence]) -> str: diff --git a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py index a7a3868a8a0..d98c69cb860 100644 --- a/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus/tests/test_prometheus_exporter.py @@ -684,13 +684,11 @@ def test_multiple_data_points_with_different_label_sets(self): http_server_request_duration_seconds_bucket{http_target="/foobar",le="+Inf",net_host_port="8080"} 6.0 http_server_request_duration_seconds_count{http_target="/foobar",net_host_port="8080"} 6.0 http_server_request_duration_seconds_sum{http_target="/foobar",net_host_port="8080"} 579.0 - # HELP http_server_request_duration_seconds test multiple label sets - # TYPE http_server_request_duration_seconds histogram - http_server_request_duration_seconds_bucket{le="123.0",net_host_port="8080"} 1.0 - http_server_request_duration_seconds_bucket{le="456.0",net_host_port="8080"} 4.0 - http_server_request_duration_seconds_bucket{le="+Inf",net_host_port="8080"} 7.0 - http_server_request_duration_seconds_count{net_host_port="8080"} 7.0 - http_server_request_duration_seconds_sum{net_host_port="8080"} 579.0 + http_server_request_duration_seconds_bucket{http_target="",le="123.0",net_host_port="8080"} 1.0 + http_server_request_duration_seconds_bucket{http_target="",le="456.0",net_host_port="8080"} 4.0 + http_server_request_duration_seconds_bucket{http_target="",le="+Inf",net_host_port="8080"} 7.0 + http_server_request_duration_seconds_count{http_target="",net_host_port="8080"} 7.0 + http_server_request_duration_seconds_sum{http_target="",net_host_port="8080"} 579.0 """ ), ) diff --git a/opentelemetry-api/src/opentelemetry/context/__init__.py b/opentelemetry-api/src/opentelemetry/context/__init__.py index cad7f951428..39772554d5a 100644 --- a/opentelemetry-api/src/opentelemetry/context/__init__.py +++ b/opentelemetry-api/src/opentelemetry/context/__init__.py @@ -160,6 +160,7 @@ def detach(token: Token[Context]) -> None: # FIXME This is a temporary location for the suppress instrumentation key. # Once the decision around how to suppress instrumentation is made in the # spec, this key should be moved accordingly. +_ON_EMIT_RECURSION_COUNT_KEY = create_key("on_emit_recursion_count") _SUPPRESS_INSTRUMENTATION_KEY = create_key("suppress_instrumentation") _SUPPRESS_HTTP_INSTRUMENTATION_KEY = create_key( "suppress_http_instrumentation" diff --git a/opentelemetry-api/src/opentelemetry/util/_importlib_metadata.py b/opentelemetry-api/src/opentelemetry/util/_importlib_metadata.py index 94b0e4db55d..a527bd76fe1 100644 --- a/opentelemetry-api/src/opentelemetry/util/_importlib_metadata.py +++ b/opentelemetry-api/src/opentelemetry/util/_importlib_metadata.py @@ -36,7 +36,7 @@ def _original_entry_points_cached(): return original_entry_points() -def entry_points(**params): +def entry_points(**params) -> EntryPoints: """Replacement for importlib_metadata.entry_points that caches getting all the entry points. That part can be very slow, and OTel uses this function many times.""" diff --git a/opentelemetry-api/test-requirements.txt b/opentelemetry-api/test-requirements.txt index d13bcf6875c..360573104e6 100644 --- a/opentelemetry-api/test-requirements.txt +++ b/opentelemetry-api/test-requirements.txt @@ -12,5 +12,5 @@ wrapt==1.16.0 zipp==3.20.2 -e opentelemetry-sdk -e opentelemetry-semantic-conventions --e tests/opentelemetry-test-utils -e opentelemetry-api +-e tests/opentelemetry-test-utils diff --git a/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py index 02d665f8b5a..d1b9cf543c5 100644 --- a/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py +++ b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logging_handler.py @@ -1,3 +1,17 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging import pytest diff --git a/opentelemetry-sdk/benchmarks/logs/test_benchmark_logs.py b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logs.py new file mode 100644 index 00000000000..f1578af58e3 --- /dev/null +++ b/opentelemetry-sdk/benchmarks/logs/test_benchmark_logs.py @@ -0,0 +1,87 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from opentelemetry._logs import SeverityNumber +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + BatchLogRecordProcessor, + InMemoryLogRecordExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.resources import Resource + +resource = Resource( + { + "service.name": "A123456789", + "service.version": "1.34567890", + "service.instance.id": "123ab456-a123-12ab-12ab-12340a1abc12", + } +) + +simple_exporter = InMemoryLogRecordExporter() +simple_provider = LoggerProvider(resource=resource) +simple_provider.add_log_record_processor( + SimpleLogRecordProcessor(simple_exporter) +) +simple_logger = simple_provider.get_logger("simple_logger") + +batch_exporter = InMemoryLogRecordExporter() +batch_provider = LoggerProvider(resource=resource) +batch_provider.add_log_record_processor( + BatchLogRecordProcessor(batch_exporter) +) +batch_logger = batch_provider.get_logger("batch_logger") + + +@pytest.mark.parametrize("num_attributes", [0, 1, 3, 5, 10]) +def test_simple_log_record_processor(benchmark, num_attributes): + attributes = {f"key{i}": f"value{i}" for i in range(num_attributes)} + + def benchmark_emit(): + simple_logger.emit( + severity_number=SeverityNumber.INFO, + body="benchmark log message", + attributes=attributes, + event_name="test.event", + ) + + benchmark(benchmark_emit) + + +@pytest.mark.parametrize("num_attributes", [0, 1, 3, 5, 10]) +def test_batch_log_record_processor(benchmark, num_attributes): + attributes = {f"key{i}": f"value{i}" for i in range(num_attributes)} + + def benchmark_emit(): + batch_logger.emit( + severity_number=SeverityNumber.INFO, + body="benchmark log message", + attributes=attributes, + event_name="test.event", + ) + + benchmark(benchmark_emit) + + +def test_get_logger(benchmark): + def benchmark_get_logger(): + simple_provider.get_logger( + "test_logger", + version="1.0.0", + schema_url="https://opentelemetry.io/schemas/1.38.0", + ) + + benchmark(benchmark_get_logger) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_configuration/models.py b/opentelemetry-sdk/src/opentelemetry/sdk/_configuration/models.py index 06e5107f957..75697ed97b3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_configuration/models.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_configuration/models.py @@ -1,25 +1,25 @@ # generated by datamodel-codegen: # filename: https://raw.githubusercontent.com/open-telemetry/opentelemetry-configuration/refs/tags/v1.0.0-rc.3/opentelemetry_configuration.json -# timestamp: 2026-01-15T17:57:30+00:00 +# timestamp: 2026-02-04T12:43:05+00:00 from __future__ import annotations from dataclasses import dataclass from enum import Enum -from typing import Any, Optional, Union +from typing import Any from typing_extensions import TypeAlias -AlwaysOffSampler: TypeAlias = Optional[dict[str, Any]] +AlwaysOffSampler: TypeAlias = dict[str, Any] | None -AlwaysOnSampler: TypeAlias = Optional[dict[str, Any]] +AlwaysOnSampler: TypeAlias = dict[str, Any] | None @dataclass class AttributeLimits: - attribute_value_length_limit: Optional[int] = None - attribute_count_limit: Optional[int] = None + attribute_value_length_limit: int | None = None + attribute_count_limit: int | None = None Value: TypeAlias = list[str] @@ -42,44 +42,44 @@ class AttributeType(Enum): double_array = "double_array" -B3MultiPropagator: TypeAlias = Optional[dict[str, Any]] +B3MultiPropagator: TypeAlias = dict[str, Any] | None -B3Propagator: TypeAlias = Optional[dict[str, Any]] +B3Propagator: TypeAlias = dict[str, Any] | None -BaggagePropagator: TypeAlias = Optional[dict[str, Any]] +BaggagePropagator: TypeAlias = dict[str, Any] | None @dataclass class Base2ExponentialBucketHistogramAggregation: - max_scale: Optional[int] = None - max_size: Optional[int] = None - record_min_max: Optional[bool] = None + max_scale: int | None = None + max_size: int | None = None + record_min_max: bool | None = None @dataclass class CardinalityLimits: - default: Optional[int] = None - counter: Optional[int] = None - gauge: Optional[int] = None - histogram: Optional[int] = None - observable_counter: Optional[int] = None - observable_gauge: Optional[int] = None - observable_up_down_counter: Optional[int] = None - up_down_counter: Optional[int] = None + default: int | None = None + counter: int | None = None + gauge: int | None = None + histogram: int | None = None + observable_counter: int | None = None + observable_gauge: int | None = None + observable_up_down_counter: int | None = None + up_down_counter: int | None = None -ConsoleExporter: TypeAlias = Optional[dict[str, Any]] +ConsoleExporter: TypeAlias = dict[str, Any] | None -DefaultAggregation: TypeAlias = Optional[dict[str, Any]] +DefaultAggregation: TypeAlias = dict[str, Any] | None Distribution: TypeAlias = dict[str, dict[str, Any]] -DropAggregation: TypeAlias = Optional[dict[str, Any]] +DropAggregation: TypeAlias = dict[str, Any] | None class ExemplarFilter(Enum): @@ -88,22 +88,22 @@ class ExemplarFilter(Enum): trace_based = "trace_based" -ExperimentalComposableAlwaysOffSampler: TypeAlias = Optional[dict[str, Any]] +ExperimentalComposableAlwaysOffSampler: TypeAlias = dict[str, Any] | None -ExperimentalComposableAlwaysOnSampler: TypeAlias = Optional[dict[str, Any]] +ExperimentalComposableAlwaysOnSampler: TypeAlias = dict[str, Any] | None @dataclass class ExperimentalComposableProbabilitySampler: - ratio: Optional[float] = None + ratio: float | None = None @dataclass class ExperimentalComposableRuleBasedSamplerRuleAttributePatterns: key: str - included: Optional[list[str]] = None - excluded: Optional[list[str]] = None + included: list[str] | None = None + excluded: list[str] | None = None @dataclass @@ -112,22 +112,22 @@ class ExperimentalComposableRuleBasedSamplerRuleAttributeValues: values: list[str] -ExperimentalContainerResourceDetector: TypeAlias = Optional[dict[str, Any]] +ExperimentalContainerResourceDetector: TypeAlias = dict[str, Any] | None -ExperimentalHostResourceDetector: TypeAlias = Optional[dict[str, Any]] +ExperimentalHostResourceDetector: TypeAlias = dict[str, Any] | None @dataclass class ExperimentalHttpClientInstrumentation: - request_captured_headers: Optional[list[str]] = None - response_captured_headers: Optional[list[str]] = None + request_captured_headers: list[str] | None = None + response_captured_headers: list[str] | None = None @dataclass class ExperimentalHttpServerInstrumentation: - request_captured_headers: Optional[list[str]] = None - response_captured_headers: Optional[list[str]] = None + request_captured_headers: list[str] | None = None + response_captured_headers: list[str] | None = None ExperimentalLanguageSpecificInstrumentation: TypeAlias = dict[ @@ -137,7 +137,7 @@ class ExperimentalHttpServerInstrumentation: @dataclass class ExperimentalMeterConfig: - disabled: Optional[bool] = None + disabled: bool | None = None @dataclass @@ -148,7 +148,7 @@ class ExperimentalMeterMatcherAndConfig: @dataclass class ExperimentalOtlpFileExporter: - output_stream: Optional[str] = None + output_stream: str | None = None @dataclass @@ -159,10 +159,10 @@ class ExperimentalPeerServiceMapping: @dataclass class ExperimentalProbabilitySampler: - ratio: Optional[float] = None + ratio: float | None = None -ExperimentalProcessResourceDetector: TypeAlias = Optional[dict[str, Any]] +ExperimentalProcessResourceDetector: TypeAlias = dict[str, Any] | None class ExperimentalPrometheusTranslationStrategy(Enum): @@ -174,7 +174,7 @@ class ExperimentalPrometheusTranslationStrategy(Enum): no_translation = "no_translation" -ExperimentalServiceResourceDetector: TypeAlias = Optional[dict[str, Any]] +ExperimentalServiceResourceDetector: TypeAlias = dict[str, Any] | None class ExperimentalSpanParent(Enum): @@ -185,7 +185,7 @@ class ExperimentalSpanParent(Enum): @dataclass class ExperimentalTracerConfig: - disabled: Optional[bool] = None + disabled: bool | None = None @dataclass @@ -196,8 +196,8 @@ class ExperimentalTracerMatcherAndConfig: @dataclass class ExplicitBucketHistogramAggregation: - boundaries: Optional[list[float]] = None - record_min_max: Optional[bool] = None + boundaries: list[float] | None = None + record_min_max: bool | None = None class ExporterDefaultHistogramAggregation(Enum): @@ -213,23 +213,23 @@ class ExporterTemporalityPreference(Enum): @dataclass class GrpcTls: - ca_file: Optional[str] = None - key_file: Optional[str] = None - cert_file: Optional[str] = None - insecure: Optional[bool] = None + ca_file: str | None = None + key_file: str | None = None + cert_file: str | None = None + insecure: bool | None = None @dataclass class HttpTls: - ca_file: Optional[str] = None - key_file: Optional[str] = None - cert_file: Optional[str] = None + ca_file: str | None = None + key_file: str | None = None + cert_file: str | None = None @dataclass class IncludeExclude: - included: Optional[list[str]] = None - excluded: Optional[list[str]] = None + included: list[str] | None = None + excluded: list[str] | None = None class InstrumentType(Enum): @@ -242,52 +242,52 @@ class InstrumentType(Enum): up_down_counter = "up_down_counter" -JaegerPropagator: TypeAlias = Optional[dict[str, Any]] +JaegerPropagator: TypeAlias = dict[str, Any] | None -LastValueAggregation: TypeAlias = Optional[dict[str, Any]] +LastValueAggregation: TypeAlias = dict[str, Any] | None @dataclass class LogRecordLimits: - attribute_value_length_limit: Optional[int] = None - attribute_count_limit: Optional[int] = None + attribute_value_length_limit: int | None = None + attribute_count_limit: int | None = None @dataclass class NameStringValuePair: name: str - value: Optional[str] + value: str | None -OpenCensusMetricProducer: TypeAlias = Optional[dict[str, Any]] +OpenCensusMetricProducer: TypeAlias = dict[str, Any] | None -OpenTracingPropagator: TypeAlias = Optional[dict[str, Any]] +OpenTracingPropagator: TypeAlias = dict[str, Any] | None @dataclass class OtlpGrpcExporter: - endpoint: Optional[str] = None - tls: Optional[GrpcTls] = None - headers: Optional[list[NameStringValuePair]] = None - headers_list: Optional[str] = None - compression: Optional[str] = None - timeout: Optional[int] = None + endpoint: str | None = None + tls: GrpcTls | None = None + headers: list[NameStringValuePair] | None = None + headers_list: str | None = None + compression: str | None = None + timeout: int | None = None @dataclass class OtlpGrpcMetricExporter: - endpoint: Optional[str] = None - tls: Optional[GrpcTls] = None - headers: Optional[list[NameStringValuePair]] = None - headers_list: Optional[str] = None - compression: Optional[str] = None - timeout: Optional[int] = None - temporality_preference: Optional[ExporterTemporalityPreference] = None - default_histogram_aggregation: Optional[ - ExporterDefaultHistogramAggregation - ] = None + endpoint: str | None = None + tls: GrpcTls | None = None + headers: list[NameStringValuePair] | None = None + headers_list: str | None = None + compression: str | None = None + timeout: int | None = None + temporality_preference: ExporterTemporalityPreference | None = None + default_histogram_aggregation: ( + ExporterDefaultHistogramAggregation | None + ) = None class OtlpHttpEncoding(Enum): @@ -297,28 +297,28 @@ class OtlpHttpEncoding(Enum): @dataclass class OtlpHttpExporter: - endpoint: Optional[str] = None - tls: Optional[HttpTls] = None - headers: Optional[list[NameStringValuePair]] = None - headers_list: Optional[str] = None - compression: Optional[str] = None - timeout: Optional[int] = None - encoding: Optional[OtlpHttpEncoding] = None + endpoint: str | None = None + tls: HttpTls | None = None + headers: list[NameStringValuePair] | None = None + headers_list: str | None = None + compression: str | None = None + timeout: int | None = None + encoding: OtlpHttpEncoding | None = None @dataclass class OtlpHttpMetricExporter: - endpoint: Optional[str] = None - tls: Optional[HttpTls] = None - headers: Optional[list[NameStringValuePair]] = None - headers_list: Optional[str] = None - compression: Optional[str] = None - timeout: Optional[int] = None - encoding: Optional[OtlpHttpEncoding] = None - temporality_preference: Optional[ExporterTemporalityPreference] = None - default_histogram_aggregation: Optional[ - ExporterDefaultHistogramAggregation - ] = None + endpoint: str | None = None + tls: HttpTls | None = None + headers: list[NameStringValuePair] | None = None + headers_list: str | None = None + compression: str | None = None + timeout: int | None = None + encoding: OtlpHttpEncoding | None = None + temporality_preference: ExporterTemporalityPreference | None = None + default_histogram_aggregation: ( + ExporterDefaultHistogramAggregation | None + ) = None class SeverityNumber(Enum): @@ -350,10 +350,10 @@ class SeverityNumber(Enum): @dataclass class SpanExporter: - otlp_http: Optional[OtlpHttpExporter] = None - otlp_grpc: Optional[OtlpGrpcExporter] = None - otlp_file_development: Optional[ExperimentalOtlpFileExporter] = None - console: Optional[ConsoleExporter] = None + otlp_http: OtlpHttpExporter | None = None + otlp_grpc: OtlpGrpcExporter | None = None + otlp_file_development: ExperimentalOtlpFileExporter | None = None + console: ConsoleExporter | None = None class SpanKind(Enum): @@ -366,84 +366,82 @@ class SpanKind(Enum): @dataclass class SpanLimits: - attribute_value_length_limit: Optional[int] = None - attribute_count_limit: Optional[int] = None - event_count_limit: Optional[int] = None - link_count_limit: Optional[int] = None - event_attribute_count_limit: Optional[int] = None - link_attribute_count_limit: Optional[int] = None + attribute_value_length_limit: int | None = None + attribute_count_limit: int | None = None + event_count_limit: int | None = None + link_count_limit: int | None = None + event_attribute_count_limit: int | None = None + link_attribute_count_limit: int | None = None -SumAggregation: TypeAlias = Optional[dict[str, Any]] +SumAggregation: TypeAlias = dict[str, Any] | None -TraceContextPropagator: TypeAlias = Optional[dict[str, Any]] +TraceContextPropagator: TypeAlias = dict[str, Any] | None @dataclass class TraceIdRatioBasedSampler: - ratio: Optional[float] = None + ratio: float | None = None @dataclass class ViewSelector: - instrument_name: Optional[str] = None - instrument_type: Optional[InstrumentType] = None - unit: Optional[str] = None - meter_name: Optional[str] = None - meter_version: Optional[str] = None - meter_schema_url: Optional[str] = None + instrument_name: str | None = None + instrument_type: InstrumentType | None = None + unit: str | None = None + meter_name: str | None = None + meter_version: str | None = None + meter_schema_url: str | None = None @dataclass class Aggregation: - default: Optional[DefaultAggregation] = None - drop: Optional[DropAggregation] = None - explicit_bucket_histogram: Optional[ExplicitBucketHistogramAggregation] = ( - None - ) - base2_exponential_bucket_histogram: Optional[ - Base2ExponentialBucketHistogramAggregation - ] = None - last_value: Optional[LastValueAggregation] = None - sum: Optional[SumAggregation] = None + default: DefaultAggregation | None = None + drop: DropAggregation | None = None + explicit_bucket_histogram: ExplicitBucketHistogramAggregation | None = None + base2_exponential_bucket_histogram: ( + Base2ExponentialBucketHistogramAggregation | None + ) = None + last_value: LastValueAggregation | None = None + sum: SumAggregation | None = None @dataclass class AttributeNameValue: name: str - value: Optional[Union[str, float, bool, Value, Value1, Value2]] - type: Optional[AttributeType] = None + value: str | float | bool | Value | Value1 | Value2 | None + type: AttributeType | None = None @dataclass class BatchSpanProcessor: exporter: SpanExporter - schedule_delay: Optional[int] = None - export_timeout: Optional[int] = None - max_queue_size: Optional[int] = None - max_export_batch_size: Optional[int] = None + schedule_delay: int | None = None + export_timeout: int | None = None + max_queue_size: int | None = None + max_export_batch_size: int | None = None @dataclass class ConsoleMetricExporter: - temporality_preference: Optional[ExporterTemporalityPreference] = None - default_histogram_aggregation: Optional[ - ExporterDefaultHistogramAggregation - ] = None + temporality_preference: ExporterTemporalityPreference | None = None + default_histogram_aggregation: ( + ExporterDefaultHistogramAggregation | None + ) = None @dataclass class ExperimentalHttpInstrumentation: - client: Optional[ExperimentalHttpClientInstrumentation] = None - server: Optional[ExperimentalHttpServerInstrumentation] = None + client: ExperimentalHttpClientInstrumentation | None = None + server: ExperimentalHttpServerInstrumentation | None = None @dataclass class ExperimentalLoggerConfig: - disabled: Optional[bool] = None - minimum_severity: Optional[SeverityNumber] = None - trace_based: Optional[bool] = None + disabled: bool | None = None + minimum_severity: SeverityNumber | None = None + trace_based: bool | None = None @dataclass @@ -454,83 +452,81 @@ class ExperimentalLoggerMatcherAndConfig: @dataclass class ExperimentalMeterConfigurator: - default_config: Optional[ExperimentalMeterConfig] = None - meters: Optional[list[ExperimentalMeterMatcherAndConfig]] = None + default_config: ExperimentalMeterConfig | None = None + meters: list[ExperimentalMeterMatcherAndConfig] | None = None @dataclass class ExperimentalOtlpFileMetricExporter: - output_stream: Optional[str] = None - temporality_preference: Optional[ExporterTemporalityPreference] = None - default_histogram_aggregation: Optional[ - ExporterDefaultHistogramAggregation - ] = None + output_stream: str | None = None + temporality_preference: ExporterTemporalityPreference | None = None + default_histogram_aggregation: ( + ExporterDefaultHistogramAggregation | None + ) = None @dataclass class ExperimentalPeerInstrumentation: - service_mapping: Optional[list[ExperimentalPeerServiceMapping]] = None + service_mapping: list[ExperimentalPeerServiceMapping] | None = None @dataclass class ExperimentalPrometheusMetricExporter: - host: Optional[str] = None - port: Optional[int] = None - without_scope_info: Optional[bool] = None - without_target_info: Optional[bool] = None - with_resource_constant_labels: Optional[IncludeExclude] = None - translation_strategy: Optional[ - ExperimentalPrometheusTranslationStrategy - ] = None + host: str | None = None + port: int | None = None + without_scope_info: bool | None = None + without_target_info: bool | None = None + with_resource_constant_labels: IncludeExclude | None = None + translation_strategy: ExperimentalPrometheusTranslationStrategy | None = ( + None + ) @dataclass class ExperimentalResourceDetector: - container: Optional[ExperimentalContainerResourceDetector] = None - host: Optional[ExperimentalHostResourceDetector] = None - process: Optional[ExperimentalProcessResourceDetector] = None - service: Optional[ExperimentalServiceResourceDetector] = None + container: ExperimentalContainerResourceDetector | None = None + host: ExperimentalHostResourceDetector | None = None + process: ExperimentalProcessResourceDetector | None = None + service: ExperimentalServiceResourceDetector | None = None @dataclass class ExperimentalTracerConfigurator: - default_config: Optional[ExperimentalTracerConfig] = None - tracers: Optional[list[ExperimentalTracerMatcherAndConfig]] = None + default_config: ExperimentalTracerConfig | None = None + tracers: list[ExperimentalTracerMatcherAndConfig] | None = None @dataclass class LogRecordExporter: - otlp_http: Optional[OtlpHttpExporter] = None - otlp_grpc: Optional[OtlpGrpcExporter] = None - otlp_file_development: Optional[ExperimentalOtlpFileExporter] = None - console: Optional[ConsoleExporter] = None + otlp_http: OtlpHttpExporter | None = None + otlp_grpc: OtlpGrpcExporter | None = None + otlp_file_development: ExperimentalOtlpFileExporter | None = None + console: ConsoleExporter | None = None @dataclass class MetricProducer: - opencensus: Optional[OpenCensusMetricProducer] = None + opencensus: OpenCensusMetricProducer | None = None @dataclass class PullMetricExporter: - prometheus_development: Optional[ExperimentalPrometheusMetricExporter] = ( - None - ) + prometheus_development: ExperimentalPrometheusMetricExporter | None = None @dataclass class PullMetricReader: exporter: PullMetricExporter - producers: Optional[list[MetricProducer]] = None - cardinality_limits: Optional[CardinalityLimits] = None + producers: list[MetricProducer] | None = None + cardinality_limits: CardinalityLimits | None = None @dataclass class PushMetricExporter: - otlp_http: Optional[OtlpHttpMetricExporter] = None - otlp_grpc: Optional[OtlpGrpcMetricExporter] = None - otlp_file_development: Optional[ExperimentalOtlpFileMetricExporter] = None - console: Optional[ConsoleMetricExporter] = None + otlp_http: OtlpHttpMetricExporter | None = None + otlp_grpc: OtlpGrpcMetricExporter | None = None + otlp_file_development: ExperimentalOtlpFileMetricExporter | None = None + console: ConsoleMetricExporter | None = None @dataclass @@ -545,99 +541,99 @@ class SimpleSpanProcessor: @dataclass class SpanProcessor: - batch: Optional[BatchSpanProcessor] = None - simple: Optional[SimpleSpanProcessor] = None + batch: BatchSpanProcessor | None = None + simple: SimpleSpanProcessor | None = None @dataclass class TextMapPropagator: - tracecontext: Optional[TraceContextPropagator] = None - baggage: Optional[BaggagePropagator] = None - b3: Optional[B3Propagator] = None - b3multi: Optional[B3MultiPropagator] = None - jaeger: Optional[JaegerPropagator] = None - ottrace: Optional[OpenTracingPropagator] = None + tracecontext: TraceContextPropagator | None = None + baggage: BaggagePropagator | None = None + b3: B3Propagator | None = None + b3multi: B3MultiPropagator | None = None + jaeger: JaegerPropagator | None = None + ottrace: OpenTracingPropagator | None = None @dataclass class ViewStream: - name: Optional[str] = None - description: Optional[str] = None - aggregation: Optional[Aggregation] = None - aggregation_cardinality_limit: Optional[int] = None - attribute_keys: Optional[IncludeExclude] = None + name: str | None = None + description: str | None = None + aggregation: Aggregation | None = None + aggregation_cardinality_limit: int | None = None + attribute_keys: IncludeExclude | None = None @dataclass class BatchLogRecordProcessor: exporter: LogRecordExporter - schedule_delay: Optional[int] = None - export_timeout: Optional[int] = None - max_queue_size: Optional[int] = None - max_export_batch_size: Optional[int] = None + schedule_delay: int | None = None + export_timeout: int | None = None + max_queue_size: int | None = None + max_export_batch_size: int | None = None @dataclass class ExperimentalGeneralInstrumentation: - peer: Optional[ExperimentalPeerInstrumentation] = None - http: Optional[ExperimentalHttpInstrumentation] = None + peer: ExperimentalPeerInstrumentation | None = None + http: ExperimentalHttpInstrumentation | None = None @dataclass class ExperimentalInstrumentation: - general: Optional[ExperimentalGeneralInstrumentation] = None - cpp: Optional[ExperimentalLanguageSpecificInstrumentation] = None - dotnet: Optional[ExperimentalLanguageSpecificInstrumentation] = None - erlang: Optional[ExperimentalLanguageSpecificInstrumentation] = None - go: Optional[ExperimentalLanguageSpecificInstrumentation] = None - java: Optional[ExperimentalLanguageSpecificInstrumentation] = None - js: Optional[ExperimentalLanguageSpecificInstrumentation] = None - php: Optional[ExperimentalLanguageSpecificInstrumentation] = None - python: Optional[ExperimentalLanguageSpecificInstrumentation] = None - ruby: Optional[ExperimentalLanguageSpecificInstrumentation] = None - rust: Optional[ExperimentalLanguageSpecificInstrumentation] = None - swift: Optional[ExperimentalLanguageSpecificInstrumentation] = None + general: ExperimentalGeneralInstrumentation | None = None + cpp: ExperimentalLanguageSpecificInstrumentation | None = None + dotnet: ExperimentalLanguageSpecificInstrumentation | None = None + erlang: ExperimentalLanguageSpecificInstrumentation | None = None + go: ExperimentalLanguageSpecificInstrumentation | None = None + java: ExperimentalLanguageSpecificInstrumentation | None = None + js: ExperimentalLanguageSpecificInstrumentation | None = None + php: ExperimentalLanguageSpecificInstrumentation | None = None + python: ExperimentalLanguageSpecificInstrumentation | None = None + ruby: ExperimentalLanguageSpecificInstrumentation | None = None + rust: ExperimentalLanguageSpecificInstrumentation | None = None + swift: ExperimentalLanguageSpecificInstrumentation | None = None @dataclass class ExperimentalLoggerConfigurator: - default_config: Optional[ExperimentalLoggerConfig] = None - loggers: Optional[list[ExperimentalLoggerMatcherAndConfig]] = None + default_config: ExperimentalLoggerConfig | None = None + loggers: list[ExperimentalLoggerMatcherAndConfig] | None = None @dataclass class ExperimentalResourceDetection: - attributes: Optional[IncludeExclude] = None - detectors: Optional[list[ExperimentalResourceDetector]] = None + attributes: IncludeExclude | None = None + detectors: list[ExperimentalResourceDetector] | None = None @dataclass class LogRecordProcessor: - batch: Optional[BatchLogRecordProcessor] = None - simple: Optional[SimpleLogRecordProcessor] = None + batch: BatchLogRecordProcessor | None = None + simple: SimpleLogRecordProcessor | None = None @dataclass class PeriodicMetricReader: exporter: PushMetricExporter - interval: Optional[int] = None - timeout: Optional[int] = None - producers: Optional[list[MetricProducer]] = None - cardinality_limits: Optional[CardinalityLimits] = None + interval: int | None = None + timeout: int | None = None + producers: list[MetricProducer] | None = None + cardinality_limits: CardinalityLimits | None = None @dataclass class Propagator: - composite: Optional[list[TextMapPropagator]] = None - composite_list: Optional[str] = None + composite: list[TextMapPropagator] | None = None + composite_list: str | None = None @dataclass class Resource: - attributes: Optional[list[AttributeNameValue]] = None - detection_development: Optional[ExperimentalResourceDetection] = None - schema_url: Optional[str] = None - attributes_list: Optional[str] = None + attributes: list[AttributeNameValue] | None = None + detection_development: ExperimentalResourceDetection | None = None + schema_url: str | None = None + attributes_list: str | None = None @dataclass @@ -649,41 +645,39 @@ class View: @dataclass class LoggerProvider: processors: list[LogRecordProcessor] - limits: Optional[LogRecordLimits] = None - logger_configurator_development: Optional[ - ExperimentalLoggerConfigurator - ] = None + limits: LogRecordLimits | None = None + logger_configurator_development: ExperimentalLoggerConfigurator | None = ( + None + ) @dataclass class MetricReader: - periodic: Optional[PeriodicMetricReader] = None - pull: Optional[PullMetricReader] = None + periodic: PeriodicMetricReader | None = None + pull: PullMetricReader | None = None @dataclass class MeterProvider: readers: list[MetricReader] - views: Optional[list[View]] = None - exemplar_filter: Optional[ExemplarFilter] = None - meter_configurator_development: Optional[ExperimentalMeterConfigurator] = ( - None - ) + views: list[View] | None = None + exemplar_filter: ExemplarFilter | None = None + meter_configurator_development: ExperimentalMeterConfigurator | None = None @dataclass class OpenTelemetryConfiguration: file_format: str - disabled: Optional[bool] = None - log_level: Optional[SeverityNumber] = None - attribute_limits: Optional[AttributeLimits] = None - logger_provider: Optional[LoggerProvider] = None - meter_provider: Optional[MeterProvider] = None - propagator: Optional[Propagator] = None - tracer_provider: Optional[TracerProvider] = None - resource: Optional[Resource] = None - instrumentation_development: Optional[ExperimentalInstrumentation] = None - distribution: Optional[Distribution] = None + disabled: bool | None = None + log_level: SeverityNumber | None = None + attribute_limits: AttributeLimits | None = None + logger_provider: LoggerProvider | None = None + meter_provider: MeterProvider | None = None + propagator: Propagator | None = None + tracer_provider: TracerProvider | None = None + resource: Resource | None = None + instrumentation_development: ExperimentalInstrumentation | None = None + distribution: Distribution | None = None @dataclass @@ -693,7 +687,7 @@ class ExperimentalComposableParentThresholdSampler: @dataclass class ExperimentalComposableRuleBasedSampler: - rules: Optional[list[ExperimentalComposableRuleBasedSamplerRule]] = None + rules: list[ExperimentalComposableRuleBasedSamplerRule] | None = None @dataclass @@ -705,59 +699,59 @@ class ExperimentalComposableRuleBasedSamplerRule: """ sampler: ExperimentalComposableSampler - attribute_values: Optional[ - ExperimentalComposableRuleBasedSamplerRuleAttributeValues - ] = None - attribute_patterns: Optional[ - ExperimentalComposableRuleBasedSamplerRuleAttributePatterns - ] = None - span_kinds: Optional[list[Optional[SpanKind]]] = None - parent: Optional[list[Optional[ExperimentalSpanParent]]] = None + attribute_values: ( + ExperimentalComposableRuleBasedSamplerRuleAttributeValues | None + ) = None + attribute_patterns: ( + ExperimentalComposableRuleBasedSamplerRuleAttributePatterns | None + ) = None + span_kinds: list[SpanKind | None] | None = None + parent: list[ExperimentalSpanParent | None] | None = None @dataclass class ExperimentalComposableSampler: - always_off: Optional[ExperimentalComposableAlwaysOffSampler] = None - always_on: Optional[ExperimentalComposableAlwaysOnSampler] = None - parent_threshold: Optional[ - ExperimentalComposableParentThresholdSampler - ] = None - probability: Optional[ExperimentalComposableProbabilitySampler] = None - rule_based: Optional[ExperimentalComposableRuleBasedSampler] = None + always_off: ExperimentalComposableAlwaysOffSampler | None = None + always_on: ExperimentalComposableAlwaysOnSampler | None = None + parent_threshold: ExperimentalComposableParentThresholdSampler | None = ( + None + ) + probability: ExperimentalComposableProbabilitySampler | None = None + rule_based: ExperimentalComposableRuleBasedSampler | None = None @dataclass class ExperimentalJaegerRemoteSampler: endpoint: str initial_sampler: Sampler - interval: Optional[int] = None + interval: int | None = None @dataclass class ParentBasedSampler: - root: Optional[Sampler] = None - remote_parent_sampled: Optional[Sampler] = None - remote_parent_not_sampled: Optional[Sampler] = None - local_parent_sampled: Optional[Sampler] = None - local_parent_not_sampled: Optional[Sampler] = None + root: Sampler | None = None + remote_parent_sampled: Sampler | None = None + remote_parent_not_sampled: Sampler | None = None + local_parent_sampled: Sampler | None = None + local_parent_not_sampled: Sampler | None = None @dataclass class Sampler: - always_off: Optional[AlwaysOffSampler] = None - always_on: Optional[AlwaysOnSampler] = None - composite_development: Optional[ExperimentalComposableSampler] = None - jaeger_remote_development: Optional[ExperimentalJaegerRemoteSampler] = None - parent_based: Optional[ParentBasedSampler] = None - probability_development: Optional[ExperimentalProbabilitySampler] = None - trace_id_ratio_based: Optional[TraceIdRatioBasedSampler] = None + always_off: AlwaysOffSampler | None = None + always_on: AlwaysOnSampler | None = None + composite_development: ExperimentalComposableSampler | None = None + jaeger_remote_development: ExperimentalJaegerRemoteSampler | None = None + parent_based: ParentBasedSampler | None = None + probability_development: ExperimentalProbabilitySampler | None = None + trace_id_ratio_based: TraceIdRatioBasedSampler | None = None @dataclass class TracerProvider: processors: list[SpanProcessor] - limits: Optional[SpanLimits] = None - sampler: Optional[Sampler] = None - tracer_configurator_development: Optional[ - ExperimentalTracerConfigurator - ] = None + limits: SpanLimits | None = None + sampler: Sampler | None = None + tracer_configurator_development: ExperimentalTracerConfigurator | None = ( + None + ) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py index ad79f3e687f..f12b9dd8a2d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py @@ -23,9 +23,11 @@ from typing_extensions import deprecated from opentelemetry.context import ( + _ON_EMIT_RECURSION_COUNT_KEY, _SUPPRESS_INSTRUMENTATION_KEY, attach, detach, + get_value, set_value, ) from opentelemetry.sdk._logs import ( @@ -52,6 +54,9 @@ _logger = logging.getLogger(__name__) _logger.addFilter(DuplicateFilter()) +_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false") +_propagate_false_logger.propagate = False + class LogRecordExportResult(enum.Enum): SUCCESS = 0 @@ -145,11 +150,28 @@ def __init__(self, exporter: LogRecordExporter): self._shutdown = False def on_emit(self, log_record: ReadWriteLogRecord): - if self._shutdown: - _logger.warning("Processor is already shutdown, ignoring call") + # Prevent entering a recursive loop. + cnt = get_value(_ON_EMIT_RECURSION_COUNT_KEY) or 0 + # Recursive depth of 3 is sort of arbitrary. It's possible that an Exporter.export call + # emits a log which returns us to this function, but when we call Exporter.export again the log + # is no longer emitted and we exit this recursive loop naturally, a depth of >3 allows 3 + # recursive log calls but exits after because it's likely endless. + if cnt > 3: # pyright: ignore[reportOperatorIssue] + _propagate_false_logger.warning( + "SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop." + ) return - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) + token = attach( + set_value( + _SUPPRESS_INSTRUMENTATION_KEY, + True, + set_value(_ON_EMIT_RECURSION_COUNT_KEY, cnt + 1), # pyright: ignore[reportOperatorIssue] + ) + ) try: + if self._shutdown: + _logger.warning("Processor is already shutdown, ignoring call") + return # Convert ReadWriteLogRecord to ReadableLogRecord before exporting # Note: resource should not be None at this point as it's set during Logger.emit() resource = ( @@ -166,7 +188,8 @@ def on_emit(self, log_record: ReadWriteLogRecord): self._exporter.export((readable_log_record,)) except Exception: # pylint: disable=broad-exception-caught _logger.exception("Exception while exporting logs.") - detach(token) + finally: + detach(token) def shutdown(self): self._shutdown = True diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index cb617253698..d18acfd029d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -42,8 +42,8 @@ class DuplicateFilter(logging.Filter): """Filter that can be applied to internal `logger`'s. - Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a - recursion depth exceeded issue in cases where logging itself results in an exception.""" + Currently applied to `logger`s on the export logs path to prevent endlessly logging the same log + in cases where logging itself is failing.""" def filter(self, record): current_log = ( @@ -81,6 +81,10 @@ def shutdown(self): raise NotImplementedError +_logger = logging.getLogger(__name__) +_logger.addFilter(DuplicateFilter()) + + class BatchProcessor(Generic[Telemetry]): """This class can be used with exporter's that implement the above Exporter interface to buffer and send telemetry in batch through @@ -111,8 +115,6 @@ def __init__( target=self.worker, daemon=True, ) - self._logger = logging.getLogger(__name__) - self._logger.addFilter(DuplicateFilter()) self._exporting = exporting self._shutdown = False @@ -189,20 +191,20 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None: ] ) except Exception: # pylint: disable=broad-exception-caught - self._logger.exception( + _logger.exception( "Exception while exporting %s.", self._exporting ) detach(token) - # Do not add any logging.log statements to this function, they can be being routed back to this `emit` function, - # resulting in endless recursive calls that crash the program. - # See https://github.com/open-telemetry/opentelemetry-python/issues/4261 def emit(self, data: Telemetry) -> None: if self._shutdown: + _logger.info("Shutdown called, ignoring %s.", self._exporting) return if self._pid != os.getpid(): self._bsp_reset_once.do_once(self._at_fork_reinit) - # This will drop a log from the right side if the queue is at _max_queue_length. + if len(self._queue) == self._max_queue_size: + _logger.warning("Queue full, dropping %s.", self._exporting) + # This will drop a log from the right side if the queue is at _max_queue_size. self._queue.appendleft(data) if len(self._queue) >= self._max_export_batch_size: self._worker_awaken.set() diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index e0eabd35b5e..a04d27e9ab1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -70,7 +70,7 @@ from json import dumps from os import environ from types import ModuleType -from typing import List, Optional, cast +from typing import List, Optional, Set, cast from urllib import parse from opentelemetry.attributes import BoundedAttributes @@ -195,7 +195,7 @@ def create( if not attributes: attributes = {} - otel_experimental_resource_detectors = {"otel"}.union( + otel_experimental_resource_detectors: Set[str] = {"otel"}.union( { otel_experimental_resource_detector.strip() for otel_experimental_resource_detector in environ.get( @@ -207,7 +207,11 @@ def create( resource_detectors: List[ResourceDetector] = [] - resource_detector: str + if "*" in otel_experimental_resource_detectors: + otel_experimental_resource_detectors = entry_points( + group="opentelemetry_resource_detector" + ).names + for resource_detector in otel_experimental_resource_detectors: try: resource_detectors.append( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 0e7e1f6db3b..9ae9a2234ef 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -18,9 +18,11 @@ import concurrent.futures import json import logging +import os import threading import traceback import typing +import weakref from os import environ from time import time_ns from types import MappingProxyType, TracebackType @@ -238,6 +240,16 @@ def __init__(self, num_threads: int = 2): # iterating through it on "on_start" and "on_end". self._span_processors = () # type: Tuple[SpanProcessor, ...] self._lock = threading.Lock() + self._init_executor(num_threads) + if hasattr(os, "register_at_fork"): + # Only the main thread is kept in forked processed, the executor + # needs to be re-instantiated to get a fresh pool of threads: + weak_reinit = weakref.WeakMethod(self._init_executor) + os.register_at_fork( + after_in_child=lambda: weak_reinit()(num_threads) + ) + + def _init_executor(self, num_threads: int) -> None: self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=num_threads ) diff --git a/opentelemetry-sdk/test-requirements.txt b/opentelemetry-sdk/test-requirements.txt index 859a2196e1a..96eb7601eed 100644 --- a/opentelemetry-sdk/test-requirements.txt +++ b/opentelemetry-sdk/test-requirements.txt @@ -11,7 +11,7 @@ tomli==2.0.1 typing_extensions==4.10.0 wrapt==1.16.0 zipp==3.19.2 --e tests/opentelemetry-test-utils -e opentelemetry-api +-e tests/opentelemetry-test-utils -e opentelemetry-semantic-conventions -e opentelemetry-sdk \ No newline at end of file diff --git a/opentelemetry-sdk/tests/logs/test_export.py b/opentelemetry-sdk/tests/logs/test_export.py index b58addf44c8..12909b2e225 100644 --- a/opentelemetry-sdk/tests/logs/test_export.py +++ b/opentelemetry-sdk/tests/logs/test_export.py @@ -15,12 +15,13 @@ # pylint: disable=protected-access import logging import os +import sys import time import unittest from concurrent.futures import ( # pylint: disable=no-name-in-module ThreadPoolExecutor, ) -from sys import version_info +from typing import Sequence from unittest.mock import Mock, patch from pytest import mark @@ -38,6 +39,7 @@ BatchLogRecordProcessor, ConsoleLogRecordExporter, InMemoryLogRecordExporter, + LogRecordExporter, SimpleLogRecordProcessor, ) from opentelemetry.sdk.environment_variables import ( @@ -63,6 +65,46 @@ class TestSimpleLogRecordProcessor(unittest.TestCase): + @mark.skipif( + (3, 13, 0) <= sys.version_info <= (3, 13, 5), + reason="This will fail on 3.13.5 due to https://github.com/python/cpython/pull/131812 which prevents recursive log messages but was rolled back in 3.13.6.", + ) + def test_simple_log_record_processor_doesnt_enter_recursive_loop(self): + class Exporter(LogRecordExporter): + def shutdown(self): + pass + + def export(self, batch: Sequence[ReadableLogRecord]): + logger = logging.getLogger("any logger..") + logger.warning("Something happened.") + + exporter = Exporter() + logger_provider = LoggerProvider() + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(exporter) + ) + root_logger = logging.getLogger() + # Add the OTLP handler to the root logger like is done in auto instrumentation. + # This causes logs generated from within SimpleLogRecordProcessor.on_emit (such as the above log in export) + # to be sent back to SimpleLogRecordProcessor.on_emit + handler = LoggingHandler( + level=logging.DEBUG, logger_provider=logger_provider + ) + root_logger.addHandler(handler) + propagate_false_logger = logging.getLogger( + "opentelemetry.sdk._logs._internal.export.propagate.false" + ) + # This would cause a max recursion depth exceeded error.. + try: + with self.assertLogs(propagate_false_logger) as cm: + root_logger.warning("hello!") + assert ( + "SimpleLogRecordProcessor.on_emit has entered a recursive loop" + in cm.output[0] + ) + finally: + root_logger.removeHandler(handler) + def test_simple_log_record_processor_default_level(self): exporter = InMemoryLogRecordExporter() logger_provider = LoggerProvider() @@ -406,39 +448,6 @@ def bulk_emit(num_emit): time.sleep(2) assert len(exporter.get_finished_logs()) == total_expected_logs - @mark.skipif( - version_info < (3, 10), - reason="assertNoLogs only exists in python 3.10+.", - ) - def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use - # See https://github.com/open-telemetry/opentelemetry-python/issues/4261 - exporter = Mock() - processor = BatchLogRecordProcessor(exporter) - logger_provider = LoggerProvider( - resource=SDKResource.create( - { - "service.name": "shoppingcart", - "service.instance.id": "instance-12", - } - ), - ) - logger_provider.add_log_record_processor(processor) - handler = LoggingHandler( - level=logging.INFO, logger_provider=logger_provider - ) - sdk_logger = logging.getLogger("opentelemetry.sdk") - # Attach OTLP handler to SDK logger - sdk_logger.addHandler(handler) - # If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail. - try: - with self.assertNoLogs(sdk_logger, logging.NOTSET): - processor.on_emit(EMPTY_LOG) - processor.shutdown() - with self.assertNoLogs(sdk_logger, logging.NOTSET): - processor.on_emit(EMPTY_LOG) - finally: - sdk_logger.removeHandler(handler) - def test_args(self): exporter = InMemoryLogRecordExporter() log_record_processor = BatchLogRecordProcessor( diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index b080519a867..c083eff1460 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -474,6 +474,7 @@ def test_service_name_env(self): self.assertEqual(resource.attributes["service.name"], "from-code") +# pylint: disable=too-many-public-methods class TestOTELResourceDetector(unittest.TestCase): def setUp(self) -> None: environ[OTEL_RESOURCE_ATTRIBUTES] = "" @@ -697,6 +698,31 @@ def test_resource_detector_entry_points_os(self): self.assertIn(OS_TYPE, resource.attributes) self.assertIn(OS_VERSION, resource.attributes) + @patch.dict( + environ, {OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "*"}, clear=True + ) + def test_resource_detector_entry_points_all(self): + resource = Resource({}).create() + + self.assertIn( + TELEMETRY_SDK_NAME, + resource.attributes, + "'otel' resource detector not enabled", + ) + self.assertIn( + OS_TYPE, resource.attributes, "'os' resource detector not enabled" + ) + self.assertIn( + HOST_ARCH, + resource.attributes, + "'host' resource detector not enabled", + ) + self.assertIn( + PROCESS_RUNTIME_NAME, + resource.attributes, + "'process' resource detector not enabled", + ) + def test_resource_detector_entry_points_otel(self): """ Test that OTELResourceDetector-resource-generated attributes are diff --git a/opentelemetry-sdk/tests/trace/test_span_processor.py b/opentelemetry-sdk/tests/trace/test_span_processor.py index d1cf1e3df00..0ba4ab8ed47 100644 --- a/opentelemetry-sdk/tests/trace/test_span_processor.py +++ b/opentelemetry-sdk/tests/trace/test_span_processor.py @@ -13,9 +13,13 @@ # limitations under the License. import abc +import gc +import multiprocessing +import os import time import typing import unittest +import weakref from platform import python_implementation, system from threading import Event from typing import Optional @@ -26,6 +30,10 @@ from opentelemetry import trace as trace_api from opentelemetry.context import Context from opentelemetry.sdk import trace +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) def span_event_start_fmt(span_processor_name, span_name): @@ -486,3 +494,66 @@ def test_force_flush_late_by_span_processor(self): for mock_processor in mocks: self.assertEqual(1, mock_processor.force_flush.call_count) multi_processor.shutdown() + + def test_processor_gc(self): + multi_processor = trace.ConcurrentMultiSpanProcessor(5) + weak_ref = weakref.ref(multi_processor) + multi_processor.shutdown() + + # When the processor is garbage collected + del multi_processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The ConcurrentMultiSpanProcessor object created by this test wasn't garbage collected", + ) + + @unittest.skipUnless(hasattr(os, "fork"), "needs *nix") + def test_batch_span_processor_fork(self): + multiprocessing_context = multiprocessing.get_context("fork") + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + exporter = InMemorySpanExporter() + multi_processor = trace.ConcurrentMultiSpanProcessor(2) + multi_processor.add_span_processor(SimpleSpanProcessor(exporter)) + tracer_provider.add_span_processor(multi_processor) + + # Use the ConcurrentMultiSpanProcessor in the main process. + # This is necessary in this test to start using the underlying ThreadPoolExecutor and avoid false positive: + with tracer.start_as_current_span("main process before fork span"): + pass + assert ( + exporter.get_finished_spans()[-1].name + == "main process before fork span" + ) + + # The forked ConcurrentMultiSpanProcessor is usable in the child process: + def child(conn): + with tracer.start_as_current_span("child process span"): + pass + conn.send(exporter.get_finished_spans()[-1].name) + conn.close() + + parent_conn, child_conn = multiprocessing_context.Pipe() + process = multiprocessing_context.Process( + target=child, args=(child_conn,) + ) + process.start() + has_response = parent_conn.poll(timeout=5) + if not has_response: + process.kill() + self.fail( + "The child process did not send any message after 5 seconds, it's very probably locked" + ) + process.join(timeout=5) + assert parent_conn.recv() == "child process span" + + # The ConcurrentMultiSpanProcessor is still usable in the main process after the child process termination: + with tracer.start_as_current_span("main process after fork span"): + pass + assert ( + exporter.get_finished_spans()[-1].name + == "main process after fork span" + ) diff --git a/pyproject.toml b/pyproject.toml index 5b53109c04a..873af4f2332 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,7 +111,8 @@ formatters = ["ruff-format", "ruff-check"] use-standard-collections = true use-schema-description = true use-title-as-name = true -target-python-version = "3.9" +use-union-operator = false +target-python-version = "3.10" include = [ "opentelemetry-semantic-conventions", diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/test_base.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/test_base.py index 69da617bb69..0ec7d594ca5 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/test_base.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/test_base.py @@ -142,7 +142,13 @@ def disable_logging(highest_level=logging.CRITICAL): finally: logging.disable(logging.NOTSET) - def get_sorted_metrics(self): + def get_sorted_metrics(self, scope: Optional[str] = None): + """Returns recorded metrics sorted by name. + + Args: + scope: Optional scope name to filter metrics by. If unset, + all metrics are returned. + """ metrics_data = self.memory_metrics_reader.get_metrics_data() resource_metrics = ( metrics_data.resource_metrics if metrics_data else [] @@ -151,6 +157,8 @@ def get_sorted_metrics(self): all_metrics = [] for metrics in resource_metrics: for scope_metrics in metrics.scope_metrics: + if scope is not None and scope_metrics.scope.name != scope: + continue all_metrics.extend(scope_metrics.metrics) return self.sorted_metrics(all_metrics) diff --git a/tox.ini b/tox.ini index b5deafafbcd..01bdff059cb 100644 --- a/tox.ini +++ b/tox.ini @@ -332,7 +332,10 @@ commands = [testenv:generate-config-from-jsonschema] deps = - tox + datamodel-code-generator[http] + datamodel-code-generator[ruff] +allowlist_externals = + datamodel-codegen commands = datamodel-codegen