diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/README.rst b/exporter/opentelemetry-exporter-otlp-proto-common/README.rst index 9756a49bc35..725144e02aa 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-common/README.rst +++ b/exporter/opentelemetry-exporter-otlp-proto-common/README.rst @@ -6,7 +6,8 @@ OpenTelemetry Protobuf Encoding .. |pypi| image:: https://badge.fury.io/py/opentelemetry-exporter-otlp-proto-common.svg :target: https://pypi.org/project/opentelemetry-exporter-otlp-proto-common/ -This library is provided as a convenience to encode to Protobuf. Currently used by: + +This library provides the shared exporter interface as well as convenience modules to encode to Protobuf. Currently used by: * opentelemetry-exporter-otlp-proto-grpc * opentelemetry-exporter-otlp-proto-http diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py new file mode 100644 index 00000000000..befdc150ab2 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/exporter.py @@ -0,0 +1,278 @@ +import math +import threading +from itertools import count +from logging import getLogger +from os import environ +from random import uniform +from time import time +from typing import ( + Generic, + Iterator, + Optional, + Protocol, + Type, + TypeVar, +) + +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPORTER_OTLP_TIMEOUT, +) + +logger = getLogger(__name__) + +_DEFAULT_EXPORT_TIMEOUT_S = 10 + +ExportResultT = TypeVar("ExportResultT", covariant=True) + + +class _ExportProtocol(Protocol[ExportResultT]): + def __call__(self, timeout_s: float, *args, **kwargs) -> ExportResultT: + ... + + +class RetryableExportError(Exception): + def __init__(self, retry_delay_s: Optional[float] = None): + super().__init__() + + self.retry_delay_s = retry_delay_s + + +class RetryingExporter(Generic[ExportResultT]): + def __init__( + self, + export_function: _ExportProtocol[ExportResultT], + result_type: Type[ExportResultT], + timeout_s: Optional[float] = None, + ): + """OTLP exporter helper class. + + Encapsulates timeout behavior for shutdown and export tasks. + + Accepts a callable `export_function` of the form + + def export_function( + timeout_s: float, + *args, + **kwargs + ) -> result_type: + .... + + that either returns the appropriate export result, or raises a + RetryableExportError exception if the encountered error should + be retried. + + Args: + export_function: A callable handling a single export attempt to + be used by export_with_retry() + result_type: Enum-like type defining SUCCESS and FAILURE values + returned by export. + timeout_s: Optional timeout for exports in seconds. Set to smaller + of provided arg and value in OTEL_EXPORTER_OTLP_TIMEOUT. Defaults + to constant if both are unset. + """ + self._result_type = result_type + self._export_function = export_function + if timeout_s: + # If the user provided a timeout, don't use the default as a lower + # bound. + self._timeout_s = min( + timeout_s, + float(environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, math.inf)), + ) + else: + self._timeout_s = float( + environ.get( + OTEL_EXPORTER_OTLP_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_S + ) + ) + + self._shutdown_event = threading.Event() + self._export_lock = threading.Lock() + + def shutdown(self, timeout_millis: float = 30_000): + if self._shutdown_event.is_set(): + logger.warning("Exporter already shutdown, ignoring call") + return + locked = self._export_lock.acquire(timeout=timeout_millis * 1e-3) + self._shutdown_event.set() + if locked: + self._export_lock.release() + + def export_with_retry( + self, + timeout_s: float, + *args, + **kwargs, + ) -> ExportResultT: + """Exports data with handling of retryable errors. + + Calls the export_function provided at initialization with the following + signature: + + export_function(*args, timeout_s=remaining_time, **kwargs) + + where `remaining_time` is updated with each retry, and *args and + **kwargs are forwarded as-is. + + Retries will be attempted using exponential backoff with full jitter. + If retry_delay_s is specified in the raised error, a retry attempt will + not occur before that delay. If a retry after that delay is + not possible, will immediately abort without retrying. + + Will reattempt the export until timeout has passed, at which point + the export will be abandoned and a failure will be returned. + A pending shutdown timing out will also cause retries to time out. + + Note: Can block longer than timeout if export_function is blocking. + Ensure export_function blocks minimally and does not attempt + retries. + + Args: + timeout_s: Timeout in seconds. No more reattempts will occur after + this time. + *args: Variable length argument list forwarded to underlying export + **kwargs: Arbitrary keyword arguments forwarded to underlying export + + """ + # After the call to shutdown, subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown_event.is_set(): + logger.warning("Exporter already shutdown, ignoring batch") + return self._result_type.FAILURE + # If negative timeout passed (from e.g. external batch deadline) + # fail immediately + if timeout_s <= 0: + logger.warning("Export deadline passed, ignoring data") + return self._result_type.FAILURE + + # Use the lowest of the possible timeouts + timeout_s = ( + min(timeout_s, self._timeout_s) + if timeout_s is not None + else self._timeout_s + ) + deadline_s = time() + timeout_s + # We acquire a lock to prevent shutdown from interrupting us + try: + if not self._export_lock.acquire(timeout=timeout_s): + logger.warning( + "Exporter failed to acquire lock before timeout" + ) + return self._result_type.FAILURE + # _create_exp_backoff_with_jitter returns a generator that yields random delay + # values whose upper bounds grow exponentially. The upper bound will cap at max + # value (never wait more than 64 seconds at once) + max_value = 64 + for delay_s in _create_exp_backoff_with_jitter_generator( + max_value=max_value + ): + remaining_time_s = deadline_s - time() + + if remaining_time_s < 1e-09: + # Timed out + return self._result_type.FAILURE + + if self._shutdown_event.is_set(): + logger.warning( + "Export cancelled due to shutdown timing out", + ) + return self._result_type.FAILURE + + try: + return self._export_function( + remaining_time_s, + *args, + **kwargs, + ) + except RetryableExportError as err: + time_remaining_s = deadline_s - time() + delay_s = min(time_remaining_s, delay_s) + if err.retry_delay_s is not None: + if err.retry_delay_s > time_remaining_s: + # We should not retry before the requested interval, so + # we must fail out prematurely. + return self._result_type.FAILURE + delay_s = max(err.retry_delay_s, delay_s) + logger.warning( + "Retrying in %ss", + delay_s, + ) + self._shutdown_event.wait(delay_s) + finally: + self._export_lock.release() + + return self._result_type.FAILURE + + +def _create_exp_backoff_generator(max_value: int = 0) -> Iterator[int]: + """ + Generates an infinite sequence of exponential backoff values. The sequence starts + from 1 (2^0) and doubles each time (2^1, 2^2, 2^3, ...). If a max_value is specified + and non-zero, the generated values will not exceed this maximum, capping at max_value + instead of growing indefinitely. + + Parameters: + - max_value (int, optional): The maximum value to yield. If 0 or not provided, the + sequence grows without bound. + + Returns: + Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or + capped at max_value. + + Example: + ``` + gen = _create_exp_backoff_generator(max_value=10) + for _ in range(5): + print(next(gen)) + ``` + This will print: + 1 + 2 + 4 + 8 + 10 + + Note: this functionality used to be handled by the 'backoff' package. + """ + for i in count(0): + out = 2**i + yield min(out, max_value) if max_value else out + + +def _create_exp_backoff_with_jitter_generator( + max_value: int = 0, +) -> Iterator[float]: + """ + Generates an infinite sequence of exponential backoff values with jitter using the + FullJitter approach. For each element "n" in the exponential backoff series created + by _create_exp_backoff(max_value), yields a random number in the half-open range [0,n). + + This algorithm is originally documented at + https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + + Parameters: + - max_value (int, optional): The maximum value to yield. If 0 or not provided, the + sequence grows without bound. + + Returns: + Iterator[int]: An iterator that yields the exponential backoff values, either uncapped or + capped at max_value. + + Example: + ``` + import random + random.seed(20240220) + gen = _create_exp_backoff_with_jitter_generator(max_value=10) + for _ in range(5): + print(next(gen)) + ``` + This will print: + 0.1341603010697452 + 0.34773275270578097 + 3.6022913287022913 + 6.663388602254524 + 10 + + """ + for i in _create_exp_backoff_generator(max_value): + yield uniform(0, i) diff --git a/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py new file mode 100644 index 00000000000..3c4c4625178 --- /dev/null +++ b/exporter/opentelemetry-exporter-otlp-proto-common/tests/test_retryable_exporter.py @@ -0,0 +1,304 @@ +import threading +import time +import unittest +from itertools import repeat +from logging import WARNING +from unittest.mock import ANY, Mock, patch + +from opentelemetry.exporter.otlp.proto.common.exporter import ( + _DEFAULT_EXPORT_TIMEOUT_S, + RetryableExportError, + RetryingExporter, +) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + logger as exporter_logger, +) +from opentelemetry.sdk.environment_variables import OTEL_EXPORTER_OTLP_TIMEOUT + +result_type = Mock() + + +class TestRetryableExporter(unittest.TestCase): + def test_export_no_retry(self): + export_func = Mock() + exporter = RetryingExporter(export_func, result_type) + with self.subTest("Export success"): + export_func.reset_mock() + export_func.configure_mock(return_value=result_type.SUCCESS) + pos_arg = Mock() + with self.assertRaises(AssertionError): + with self.assertLogs(level=WARNING): + result = exporter.export_with_retry( + _DEFAULT_EXPORT_TIMEOUT_S, pos_arg, foo="bar" + ) + self.assertIs(result, result_type.SUCCESS) + export_func.assert_called_once_with(ANY, pos_arg, foo="bar") + + with self.subTest("Export Fail"): + export_func.reset_mock() + export_func.configure_mock(return_value=result_type.FAILURE) + pos_arg = Mock() + with self.assertNoLogs(exporter_logger, level=WARNING): + result = exporter.export_with_retry( + _DEFAULT_EXPORT_TIMEOUT_S, pos_arg, foo="bar" + ) + self.assertIs(result, result_type.FAILURE) + export_func.assert_called_once_with(ANY, pos_arg, foo="bar") + + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_with_jitter_generator", + return_value=repeat(0), + ) + def test_export_retry(self, mock_backoff): + """ + Test we retry until success/failure. + """ + side_effect = [ + RetryableExportError, + RetryableExportError, + result_type.SUCCESS, + ] + export_func = Mock(side_effect=side_effect) + exporter = RetryingExporter(export_func, result_type) + + with self.subTest("Retry until success"): + result = exporter.export_with_retry(10) + self.assertEqual(export_func.call_count, len(side_effect)) + self.assertIs(result, result_type.SUCCESS) + + with self.subTest("Retry until failure"): + export_func.reset_mock() + side_effect.insert(0, RetryableExportError) + side_effect[-1] = result_type.FAILURE + export_func.configure_mock(side_effect=side_effect) + result = exporter.export_with_retry(10) + self.assertEqual(export_func.call_count, len(side_effect)) + self.assertIs(result, result_type.FAILURE) + + def test_export_uses_smallest_timeout(self): + """ + Test that the exporter uses the smallest of attribute, argument, + environment variable as timeout. + """ + + def patch_and_time(attrib_timeout, environ_timeout, arg_timeout): + export_func = Mock(side_effect=RetryableExportError()) + with patch.dict( + "os.environ", + {OTEL_EXPORTER_OTLP_TIMEOUT: str(environ_timeout)}, + ): + start = time.time() + exporter = RetryingExporter( + export_func, result_type, timeout_s=attrib_timeout + ) + exporter.export_with_retry(arg_timeout) + duration = time.time() - start + self.assertAlmostEqual( + duration, + min(attrib_timeout, environ_timeout, arg_timeout), + places=1, + ) + + patch_and_time(2, 10, 10) + patch_and_time(10, 2, 10) + patch_and_time(10, 10, 2) + + def test_explicit_environ_timeout_beats_default(self): + """Ensure a specific timeout in environment can be higher than default.""" + with patch.dict( + "os.environ", + {OTEL_EXPORTER_OTLP_TIMEOUT: str(2 * _DEFAULT_EXPORT_TIMEOUT_S)}, + ): + self.assertEqual( + # pylint: disable=protected-access + RetryingExporter(Mock(), result_type)._timeout_s, + 2 * _DEFAULT_EXPORT_TIMEOUT_S, + ) + + @patch( + ( + "opentelemetry.exporter.otlp.proto.common.exporter" + "._create_exp_backoff_with_jitter_generator" + ), + return_value=repeat(0.25), + ) + def test_export_uses_retry_delay(self, mock_backoff): + """ + Test we retry using the delay specified in the RPC error as a lower bound. + """ + side_effects = [ + RetryableExportError(0), + RetryableExportError(0.25), + RetryableExportError(0.75), + RetryableExportError(1), + result_type.SUCCESS, + ] + exporter = RetryingExporter( + Mock(side_effect=side_effects), result_type + ) + + # pylint: disable=protected-access + with patch.object(exporter._shutdown_event, "wait") as wait_mock: + result = exporter.export_with_retry(timeout_s=10, foo="bar") + self.assertIs(result, result_type.SUCCESS) + self.assertEqual(wait_mock.call_count, len(side_effects) - 1) + self.assertEqual(wait_mock.call_args_list[0].args, (0.25,)) + self.assertEqual(wait_mock.call_args_list[1].args, (0.25,)) + self.assertEqual(wait_mock.call_args_list[2].args, (0.75,)) + self.assertEqual(wait_mock.call_args_list[3].args, (1.00,)) + + @patch( + ( + "opentelemetry.exporter.otlp.proto.common.exporter" + "._create_exp_backoff_with_jitter_generator" + ), + return_value=repeat(0.1), + ) + def test_retry_delay_exceeds_timeout(self, mock_backoff): + """ + Test we timeout if we can't respect retry_delay. + """ + side_effects = [ + RetryableExportError(0.25), + RetryableExportError(1), # should timeout here + result_type.SUCCESS, + ] + + mock_export_func = Mock(side_effect=side_effects) + exporter = RetryingExporter( + mock_export_func, + result_type, + ) + + self.assertEqual(exporter.export_with_retry(0.5), result_type.FAILURE) + self.assertEqual(mock_export_func.call_count, 2) + + def test_shutdown(self): + """Test we refuse to export if shut down.""" + mock_export_func = Mock(return_value=result_type.SUCCESS) + exporter = RetryingExporter( + mock_export_func, + result_type, + ) + + self.assertEqual(exporter.export_with_retry(10), result_type.SUCCESS) + mock_export_func.assert_called_once() + exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + exporter.export_with_retry(10), result_type.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + mock_export_func.assert_called_once() + + def test_shutdown_wait_last_export(self): + """Test that shutdown waits for ongoing export to complete.""" + + timeout_s = 10 + + class ExportFunc: + is_exporting = threading.Event() + ready_to_continue = threading.Event() + side_effect = [ + RetryableExportError(), + RetryableExportError(), + result_type.SUCCESS, + ] + mock_export_func = Mock(side_effect=side_effect) + + def __call__(self, *args, **kwargs): + self.is_exporting.set() + self.ready_to_continue.wait() + return self.mock_export_func(*args, **kwargs) + + export_func = ExportFunc() + + exporter = RetryingExporter( + export_func, result_type, timeout_s=timeout_s + ) + + class ExportWrap: + def __init__(self) -> None: + self.result = None + + def __call__(self, *args, **kwargs): + self.result = exporter.export_with_retry(timeout_s) + return self.result + + export_wrapped = ExportWrap() + + export_thread = threading.Thread(target=export_wrapped) + try: + export_thread.start() + export_func.is_exporting.wait() + start_time = time.time() + shutdown_thread = threading.Thread(target=exporter.shutdown) + shutdown_thread.start() + time.sleep(0.25) + export_func.ready_to_continue.set() + finally: + export_thread.join() + shutdown_thread.join() + + duration = time.time() - start_time + self.assertLessEqual(duration, timeout_s) + # pylint: disable=protected-access + self.assertTrue(exporter._shutdown_event.is_set()) + self.assertIs(export_wrapped.result, result_type.SUCCESS) + + def test_shutdown_timeout_cancels_export_retries(self): + """Test that shutdown timing out cancels ongoing retries.""" + + class ExportFunc: + is_exporting = threading.Event() + ready_to_continue = threading.Event() + mock_export_func = Mock(side_effect=RetryableExportError()) + + def __call__(self, *args, **kwargs): + self.is_exporting.set() + self.ready_to_continue.wait() + return self.mock_export_func(*args, **kwargs) + + export_func = ExportFunc() + + exporter = RetryingExporter(export_func, result_type, timeout_s=30) + + class ExportWrap: + def __init__(self) -> None: + self.result = None + + def __call__(self, *args, **kwargs): + self.result = exporter.export_with_retry(30) + return self.result + + export_wrapped = ExportWrap() + + shutdown_timeout = 1 + + export_thread = threading.Thread(target=export_wrapped) + with self.assertLogs(level=WARNING) as warning: + try: + export_thread.start() + export_func.is_exporting.wait() + start_time = time.time() + shutdown_thread = threading.Thread( + target=exporter.shutdown, args=[shutdown_timeout * 1e3] + ) + shutdown_thread.start() + time.sleep(0) + export_func.ready_to_continue.set() + finally: + export_thread.join() + shutdown_thread.join() + duration = time.time() - start_time + self.assertAlmostEqual(duration, shutdown_timeout, places=1) + # pylint: disable=protected-access + self.assertTrue(exporter._shutdown_event.is_set()) + self.assertIs(export_wrapped.result, result_type.FAILURE) + self.assertEqual( + warning.records[-1].message, + "Export cancelled due to shutdown timing out", + ) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py index 3a87ef1223c..766e68785c5 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py @@ -104,8 +104,10 @@ def _translate_data( ) -> ExportLogsServiceRequest: return encode_logs(data) - def export(self, batch: Sequence[LogData]) -> LogExportResult: - return self._export(batch) + def export( + self, data: Sequence[LogData], timeout_millis: float = 10_000, **kwargs + ) -> LogExportResult: + return self._exporter.export_with_retry(timeout_millis * 1e-3, data) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index b4226828280..533172507c8 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,23 +14,19 @@ """OTLP Exporter""" -import threading from abc import ABC, abstractmethod -from collections.abc import Sequence # noqa: F401 from logging import getLogger from os import environ -from time import sleep -from typing import ( # noqa: F401 - Any, +from typing import ( Callable, Dict, Generic, List, Optional, + Sequence, Tuple, Union, ) -from typing import Sequence as TypingSequence from typing import TypeVar from urllib.parse import urlparse @@ -38,7 +34,6 @@ from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, - _create_exp_backoff_generator, ) from google.rpc.error_details_pb2 import RetryInfo from grpc import ( @@ -51,6 +46,10 @@ ssl_channel_credentials, ) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryingExporter, + RetryableExportError, +) from opentelemetry.exporter.otlp.proto.grpc import ( _OTLP_GRPC_HEADERS, ) @@ -84,6 +83,7 @@ None: None, "gzip": Compression.Gzip, } +_DEFAULT_EXPORT_TIMEOUT_S = 10 class InvalidCompressionValueException(Exception): @@ -137,11 +137,11 @@ def _get_credentials(creds, environ_key): return ssl_channel_credentials() -# pylint: disable=no-member class OTLPExporterMixin( - ABC, Generic[SDKDataT, ExportServiceRequestT, ExportResultT] + ABC, + Generic[SDKDataT, ExportServiceRequestT, ExportResultT], ): - """OTLP span exporter + """OTLP exporter Args: endpoint: OpenTelemetry Collector receiver endpoint @@ -158,9 +158,9 @@ def __init__( insecure: Optional[bool] = None, credentials: Optional[ChannelCredentials] = None, headers: Optional[ - Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] + Union[Sequence[Tuple[str, str]], Dict[str, str], str] ] = None, - timeout: Optional[int] = None, + timeout: Optional[float] = None, compression: Optional[Compression] = None, ): super().__init__() @@ -197,8 +197,8 @@ def __init__( else: self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS) - self._timeout = timeout or int( - environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10) + self._timeout = timeout or float( + environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_S) ) self._collector_kwargs = None @@ -222,110 +222,92 @@ def __init__( ) ) - self._export_lock = threading.Lock() self._shutdown = False + self._exporter = RetryingExporter( + self._export, self._result, self._timeout + ) @abstractmethod def _translate_data( - self, data: TypingSequence[SDKDataT] + self, data: Sequence[SDKDataT] ) -> ExportServiceRequestT: pass def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, + timeout_s: float, + data: Union[Sequence[ReadableSpan], MetricsData], + *args, + **kwargs, ) -> ExportResultT: - # After the call to shutdown, subsequent calls to Export are - # not allowed and should return a Failure result. - if self._shutdown: - logger.warning("Exporter already shutdown, ignoring batch") - return self._result.FAILURE - - # FIXME remove this check if the export type for traces - # gets updated to a class that represents the proto - # TracesData and use the code below instead. - # logger.warning( - # "Transient error %s encountered while exporting %s, retrying in %ss.", - # error.code(), - # data.__class__.__name__, - # delay, - # ) - max_value = 64 - # expo returns a generator that yields delay values which grow - # exponentially. Once delay is greater than max_value, the yielded - # value will remain constant. - for delay in _create_exp_backoff_generator(max_value=max_value): - if delay == max_value or self._shutdown: - return self._result.FAILURE - - with self._export_lock: - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) - - return self._result.SUCCESS - - except RpcError as error: - - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: - - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 - ) - - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s to %s, retrying in %ss." - ), - error.code(), - self._exporting, - self._endpoint, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s to %s, error code: %s", - self._exporting, - self._endpoint, - error.code(), - exc_info=error.code() == StatusCode.UNKNOWN, - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS - - return self._result.FAILURE - - return self._result.FAILURE + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=timeout_s, + ) + return self._result.SUCCESS + + except RpcError as error: + if error.code() not in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: + # Not retryable, bail out + logger.error( + "Failed to export %s to %s, error code: %s", + self._exporting, + self._endpoint, + error.code(), + exc_info=error.code() == StatusCode.UNKNOWN, + ) + + return ( + self._result.SUCCESS + if error.code() == StatusCode.OK + else self._result.FAILURE + ) + + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + delay_s = None + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay_s = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 + ) + logger.warning( + "Transient error %s encountered while exporting %s to %s", + error.code(), + self._exporting, + self._endpoint, + ) + raise RetryableExportError( + retry_delay_s=delay_s, + ) + + @abstractmethod + def export( + self, data, timeout_millis: float = 10_000, **kwargs + ) -> ExportResultT: + pass def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: if self._shutdown: logger.warning("Exporter already shutdown, ignoring call") return - # wait for the last export if any - self._export_lock.acquire(timeout=timeout_millis / 1e3) + # Wait for the current export to finish. Shutdown timeout preempts export + # to prevent application hanging after completion. + self._exporter.shutdown(timeout_millis=timeout_millis) self._shutdown = True - self._export_lock.release() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 0ceca25c867..955f548265b 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -14,6 +14,7 @@ from dataclasses import replace from logging import getLogger from os import environ +from time import time from typing import Dict, Iterable, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from grpc import ChannelCredentials, Compression @@ -151,18 +152,22 @@ def _translate_data( def export( self, - metrics_data: MetricsData, + data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - # TODO(#2663): OTLPExporterMixin should pass timeout to gRPC if self._max_export_batch_size is None: - return self._export(data=metrics_data) + return self._exporter.export_with_retry( + timeout_millis * 1e-3, data + ) export_result = MetricExportResult.SUCCESS - - for split_metrics_data in self._split_metrics_data(metrics_data): - split_export_result = self._export(data=split_metrics_data) + deadline_s = time() + timeout_millis * 1e-3 + for split_metrics_data in self._split_metrics_data(data): + time_remaining_s = deadline_s - time() + split_export_result = self._exporter.export_with_retry( + time_remaining_s, split_metrics_data + ) if split_export_result is MetricExportResult.FAILURE: export_result = MetricExportResult.FAILURE diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index bd120ac7874..bbbe6cc2843 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -137,8 +137,13 @@ def _translate_data( ) -> ExportTraceServiceRequest: return encode_spans(data) - def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: - return self._export(spans) + def export( + self, + spans: Sequence[ReadableSpan], + timeout_millis: float = 10000, + **kwargs + ) -> SpanExportResult: + return self._exporter.export_with_retry(timeout_millis, spans) def shutdown(self) -> None: OTLPExporterMixin.shutdown(self) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py index a6479a14741..e26ed3c964e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/logs/test_otlp_logs_exporter.py @@ -292,38 +292,6 @@ def test_otlp_headers_from_env(self): (("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),), ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLE(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(1) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_LogsServiceServicer_to_server( - LogsServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export([self.log_data_1]), LogExportResult.FAILURE - ) - mock_sleep.assert_called_with(4) - def test_success(self): add_LogsServiceServicer_to_server( LogsServiceServicerSUCCESS(), self.server diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 4dfed3e1541..0abb05be05a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -12,16 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading -import time -from logging import WARNING +from logging import ERROR, WARNING from types import MethodType from typing import Sequence from unittest import TestCase from unittest.mock import Mock, patch -from google.protobuf.duration_pb2 import Duration -from google.rpc.error_details_pb2 import RetryInfo from grpc import Compression from opentelemetry.exporter.otlp.proto.grpc.exporter import ( @@ -61,10 +57,10 @@ def test_environ_to_compression(self): environ_to_compression("test_invalid") @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_with_jitter_generator" ) def test_export_warning(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [0]}) + mock_expo.configure_mock(return_value=[0]) rpc_error = RpcError() @@ -73,10 +69,12 @@ def code(self): rpc_error.code = MethodType(code, rpc_error) + _result_type = Mock() + class OTLPMockExporter(OTLPExporterMixin): - _result = Mock() + _result = _result_type _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} + return_value=Mock(**{"Export.side_effect": rpc_error}) ) def _translate_data( @@ -84,6 +82,11 @@ def _translate_data( ) -> ExportServiceRequestT: pass + def export(self, data, timeout_millis: float = 10_000, **kwargs): + return self._exporter.export_with_retry( + timeout_millis * 1e-3, data + ) + @property def _exporting(self) -> str: return "mock" @@ -92,7 +95,7 @@ def _exporting(self) -> str: with self.assertLogs(level=WARNING) as warning: # pylint: disable=protected-access - otlp_mock_exporter._export(Mock()) + otlp_mock_exporter.export(Mock()) self.assertEqual( warning.records[0].message, "Failed to export mock to localhost:4317, error code: None", @@ -109,69 +112,35 @@ def trailing_metadata(self): with self.assertLogs(level=WARNING) as warning: # pylint: disable=protected-access - otlp_mock_exporter._export([]) + otlp_mock_exporter.export([]) self.assertEqual( warning.records[0].message, ( "Transient error StatusCode.CANCELLED encountered " - "while exporting mock to localhost:4317, retrying in 0s." + "while exporting mock to localhost:4317" ), ) + self.assertEqual(warning.records[1].message, "Retrying in 0s") - def test_shutdown(self): - result_mock = Mock() - - class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock - _stub = Mock(**{"return_value": Mock()}) - - def _translate_data( - self, data: Sequence[SDKDataT] - ) -> ExportServiceRequestT: - pass - - @property - def _exporting(self) -> str: - return "mock" - - otlp_mock_exporter = OTLPMockExporter() - - with self.assertLogs(level=WARNING) as warning: - # pylint: disable=protected-access - self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.SUCCESS - ) - otlp_mock_exporter.shutdown() - # pylint: disable=protected-access - self.assertEqual( - otlp_mock_exporter._export(data={}), result_mock.FAILURE - ) - self.assertEqual( - warning.records[0].message, - "Exporter already shutdown, ignoring batch", - ) + @patch( + "opentelemetry.exporter.otlp.proto.common.exporter._create_exp_backoff_with_jitter_generator" + ) + def test_err_code_unknown_logs_exception(self, mock_expo): + mock_expo.configure_mock(return_value=[0]) - def test_shutdown_wait_last_export(self): - result_mock = Mock() rpc_error = RpcError() def code(self): - return StatusCode.UNAVAILABLE - - def trailing_metadata(self): - return { - "google.rpc.retryinfo-bin": RetryInfo( - retry_delay=Duration(seconds=1) - ).SerializeToString() - } + return StatusCode.UNKNOWN rpc_error.code = MethodType(code, rpc_error) - rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) + + result_type = Mock() class OTLPMockExporter(OTLPExporterMixin): - _result = result_mock + _result = result_type _stub = Mock( - **{"return_value": Mock(**{"Export.side_effect": rpc_error})} + return_value=Mock(**{"Export.side_effect": rpc_error}) ) def _translate_data( @@ -179,28 +148,25 @@ def _translate_data( ) -> ExportServiceRequestT: pass + def export(self, data, timeout_millis: float = 10_000, **kwargs): + return self._exporter.export_with_retry( + timeout_millis * 1e-3, data + ) + @property def _exporting(self) -> str: return "mock" otlp_mock_exporter = OTLPMockExporter() - # pylint: disable=protected-access - export_thread = threading.Thread( - target=otlp_mock_exporter._export, args=({},) + with self.assertLogs(level=ERROR) as error: + self.assertEqual( + otlp_mock_exporter.export([]), + result_type.FAILURE, + ) + + self.assertEqual( + error.records[0].message, + f"Failed to export mock to localhost:4317, error code: {StatusCode.UNKNOWN}", ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._export_lock.locked()) - # delay is 1 second while the default shutdown timeout is 30_000 milliseconds - start_time = time.time() - otlp_mock_exporter.shutdown() - now = time.time() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(otlp_mock_exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(otlp_mock_exporter._export_lock.locked()) - finally: - export_thread.join() + self.assertIsNotNone(error.records[0].exc_info) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 95733b917bf..954f57137f7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import threading -import time from concurrent.futures import ThreadPoolExecutor # pylint: disable=too-many-lines @@ -377,13 +375,10 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): mock_method.reset_mock() # pylint: disable=no-self-use - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel, mock_expo + self, mock_insecure_channel ): """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPMetricExporter(insecure=True) @@ -415,65 +410,6 @@ def test_otlp_exporter_otlp_compression_unspecified( "localhost:4317", compression=Compression.NoCompression ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLE(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(1) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_called_with(4) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.logger.error") - def test_unknown_logs(self, mock_logger_error, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNKNOWN(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - mock_sleep.assert_not_called() - mock_logger_error.assert_called_with( - "Failed to export %s to %s, error code: %s", - "metrics", - "localhost:4317", - StatusCode.UNKNOWN, - exc_info=True, - ) - def test_success(self): add_MetricsServiceServicer_to_server( MetricsServiceServicerSUCCESS(), self.server @@ -767,50 +703,6 @@ def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) mock_secure_channel.assert_called() - def test_shutdown(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerSUCCESS(), self.server - ) - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.SUCCESS, - ) - self.exporter.shutdown() - with self.assertLogs(level=WARNING) as warning: - self.assertEqual( - self.exporter.export(self.metrics["sum_int"]), - MetricExportResult.FAILURE, - ) - self.assertEqual( - warning.records[0].message, - "Exporter already shutdown, ignoring batch", - ) - self.exporter = OTLPMetricExporter() - - def test_shutdown_wait_last_export(self): - add_MetricsServiceServicer_to_server( - MetricsServiceServicerUNAVAILABLEDelay(), self.server - ) - - export_thread = threading.Thread( - target=self.exporter.export, args=(self.metrics["sum_int"],) - ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds - start_time = time.time() - self.exporter.shutdown() - now = time.time() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) - finally: - export_thread.join() - def test_aggregation_temporality(self): # pylint: disable=protected-access diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index bb17e35b7b7..7ca264b90cc 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -13,8 +13,6 @@ # limitations under the License. import os -import threading -import time from concurrent.futures import ThreadPoolExecutor from logging import WARNING from unittest import TestCase @@ -458,37 +456,6 @@ def test_otlp_headers(self, mock_ssl_channel, mock_secure): (("user-agent", "OTel-OTLP-Exporter-Python/" + __version__),), ) - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLE(), self.server - ) - result = self.exporter.export([self.span]) - self.assertEqual(result, SpanExportResult.FAILURE) - mock_sleep.assert_called_with(1) - - @patch( - "opentelemetry.exporter.otlp.proto.grpc.exporter._create_exp_backoff_generator" - ) - @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.sleep") - def test_unavailable_delay(self, mock_sleep, mock_expo): - - mock_expo.configure_mock(**{"return_value": [1]}) - - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLEDelay(), self.server - ) - self.assertEqual( - self.exporter.export([self.span]), SpanExportResult.FAILURE - ) - mock_sleep.assert_called_with(4) - def test_success(self): add_TraceServiceServicer_to_server( TraceServiceServicerSUCCESS(), self.server @@ -933,30 +900,6 @@ def test_shutdown(self): "Exporter already shutdown, ignoring batch", ) - def test_shutdown_wait_last_export(self): - add_TraceServiceServicer_to_server( - TraceServiceServicerUNAVAILABLEDelay(), self.server - ) - - export_thread = threading.Thread( - target=self.exporter.export, args=([self.span],) - ) - export_thread.start() - try: - # pylint: disable=protected-access - self.assertTrue(self.exporter._export_lock.locked()) - # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds - start_time = time.time() - self.exporter.shutdown() - now = time.time() - self.assertGreaterEqual(now, (start_time + 30 / 1000)) - # pylint: disable=protected-access - self.assertTrue(self.exporter._shutdown) - # pylint: disable=protected-access - self.assertFalse(self.exporter._export_lock.locked()) - finally: - export_thread.join() - def _create_span_with_status(status: SDKStatus): span = _Span( diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py index 902ac5f2429..c9523889e64 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py @@ -18,12 +18,12 @@ from io import BytesIO from os import environ from typing import Dict, Optional, Sequence -from time import sleep import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs from opentelemetry.sdk.environment_variables import ( @@ -59,9 +59,15 @@ DEFAULT_TIMEOUT = 10 # in seconds -class OTLPLogExporter(LogExporter): +def _retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False - _MAX_RETRY_TIMEOUT = 64 + +class OTLPLogExporter(LogExporter): def __init__( self, @@ -102,8 +108,13 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + self._exporter = RetryingExporter( + self._export, LogExportResult, self._timeout + ) - def _export(self, serialized_data: bytes): + def _export( + self, timeout_s: float, serialized_data: bytes, *args, **kwargs + ) -> LogExportResult: data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -113,57 +124,50 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_s, ) - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - - def export(self, batch: Sequence[LogData]) -> LogExportResult: - # After the call to Shutdown subsequent calls to Export are - # not allowed and should return a Failure result. + if resp.ok: + return LogExportResult.SUCCESS + elif _retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting logs batch.", + resp.reason, + ) + raise RetryableExportError() + _logger.error( + "Failed to export logs batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return LogExportResult.FAILURE + + def export( + self, + data: Sequence[LogData], + timeout_millis: float = 10_000, + **kwargs, + ) -> LogExportResult: if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return LogExportResult.FAILURE - serialized_data = encode_logs(batch).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return LogExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return - if resp.ok: - return LogExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting logs batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export logs batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return LogExportResult.FAILURE - return LogExportResult.FAILURE + serialized_data = encode_logs(data).SerializeToString() + if self._compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + serialized_data = gzip_data.getvalue() + elif self._compression == Compression.Deflate: + serialized_data = zlib.compress(serialized_data) + + return self._exporter.export_with_retry( + timeout_millis * 1e-3, serialized_data + ) def force_flush(self, timeout_millis: float = 10_000) -> bool: """Nothing is buffered in this exporter, so this method does nothing.""" diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py index 57e030bd549..73fb134c4a0 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/metric_exporter/__init__.py @@ -16,15 +16,16 @@ import zlib from os import environ from typing import Dict, Optional, Any, Callable, List -from typing import Sequence, Mapping # noqa: F401 from io import BytesIO -from time import sleep from deprecated import deprecated from opentelemetry.exporter.otlp.proto.common._internal import ( _get_resource_data, - _create_exp_backoff_generator, +) +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common._internal.metrics_encoder import ( OTLPMetricExporterMixin, @@ -88,9 +89,15 @@ DEFAULT_TIMEOUT = 10 # in seconds -class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): +def _retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False + - _MAX_RETRY_TIMEOUT = 64 +class OTLPMetricExporter(MetricExporter, OTLPMetricExporterMixin): def __init__( self, @@ -138,8 +145,14 @@ def __init__( self._common_configuration( preferred_temporality, preferred_aggregation ) + self._shutdown = False + self._exporter = RetryingExporter( + self._export, MetricExportResult, self._timeout + ) - def _export(self, serialized_data: bytes): + def _export( + self, timeout_s: float, serialized_data: bytes, *args, **kwargs + ): data = serialized_data if self._compression == Compression.Gzip: gzip_data = BytesIO() @@ -149,20 +162,27 @@ def _export(self, serialized_data: bytes): elif self._compression == Compression.Deflate: data = zlib.compress(serialized_data) - return self._session.post( + resp = self._session.post( url=self._endpoint, data=data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_s, ) - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False + if resp.ok: + return MetricExportResult.SUCCESS + elif _retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting metric batch.", + resp.reason, + ) + raise RetryableExportError() + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return MetricExportResult.FAILURE def export( self, @@ -170,37 +190,30 @@ def export( timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: - serialized_data = encode_metrics(metrics_data) - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return MetricExportResult.FAILURE - - resp = self._export(serialized_data.SerializeToString()) - # pylint: disable=no-else-return - if resp.ok: - return MetricExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting metric batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return MetricExportResult.FAILURE - return MetricExportResult.FAILURE + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring batch") + return MetricExportResult.FAILURE + + serialized_data = encode_metrics(metrics_data).SerializeToString() + if self._compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + serialized_data = gzip_data.getvalue() + elif self._compression == Compression.Deflate: + serialized_data = zlib.compress(serialized_data) + + return self._exporter.export_with_retry( + timeout_millis * 1e-3, serialized_data + ) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + if self._shutdown: + _logger.warning("Exporter already shutdown, ignoring call") + return + self._exporter.shutdown(timeout_millis=timeout_millis) + self._session.close() + self._shutdown = True @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py index c624dfe476b..b40fd831ace 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/trace_exporter/__init__.py @@ -18,12 +18,12 @@ from io import BytesIO from os import environ from typing import Dict, Optional -from time import sleep import requests -from opentelemetry.exporter.otlp.proto.common._internal import ( - _create_exp_backoff_generator, +from opentelemetry.exporter.otlp.proto.common.exporter import ( + RetryableExportError, + RetryingExporter, ) from opentelemetry.exporter.otlp.proto.common.trace_encoder import ( encode_spans, @@ -57,9 +57,15 @@ DEFAULT_TIMEOUT = 10 # in seconds -class OTLPSpanExporter(SpanExporter): +def _retryable(resp: requests.Response) -> bool: + if resp.status_code == 408: + return True + if resp.status_code >= 500 and resp.status_code <= 599: + return True + return False - _MAX_RETRY_TIMEOUT = 64 + +class OTLPSpanExporter(SpanExporter): def __init__( self, @@ -100,73 +106,58 @@ def __init__( {"Content-Encoding": self._compression.value} ) self._shutdown = False + self._exporter = RetryingExporter( + self._export, SpanExportResult, self._timeout + ) - def _export(self, serialized_data: bytes): - data = serialized_data - if self._compression == Compression.Gzip: - gzip_data = BytesIO() - with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: - gzip_stream.write(serialized_data) - data = gzip_data.getvalue() - elif self._compression == Compression.Deflate: - data = zlib.compress(serialized_data) - - return self._session.post( + def _export(self, timeout_s: float, serialized_data: bytes): + resp = self._session.post( url=self._endpoint, - data=data, + data=serialized_data, verify=self._certificate_file, - timeout=self._timeout, + timeout=timeout_s, ) + if resp.ok: + return SpanExportResult.SUCCESS + elif _retryable(resp): + _logger.warning( + "Transient error %s encountered while exporting span batch", + resp.reason, + ) + raise RetryableExportError() + _logger.error( + "Failed to export batch code: %s, reason: %s", + resp.status_code, + resp.text, + ) + return SpanExportResult.FAILURE - @staticmethod - def _retryable(resp: requests.Response) -> bool: - if resp.status_code == 408: - return True - if resp.status_code >= 500 and resp.status_code <= 599: - return True - return False - - def export(self, spans) -> SpanExportResult: + def export(self, data, timeout_millis=10_000) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result. if self._shutdown: _logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE - serialized_data = encode_spans(spans).SerializeToString() - - for delay in _create_exp_backoff_generator( - max_value=self._MAX_RETRY_TIMEOUT - ): - - if delay == self._MAX_RETRY_TIMEOUT: - return SpanExportResult.FAILURE - - resp = self._export(serialized_data) - # pylint: disable=no-else-return - if resp.ok: - return SpanExportResult.SUCCESS - elif self._retryable(resp): - _logger.warning( - "Transient error %s encountered while exporting span batch, retrying in %ss.", - resp.reason, - delay, - ) - sleep(delay) - continue - else: - _logger.error( - "Failed to export batch code: %s, reason: %s", - resp.status_code, - resp.text, - ) - return SpanExportResult.FAILURE - return SpanExportResult.FAILURE + serialized_data = encode_spans(data).SerializeToString() + + if self._compression == Compression.Gzip: + gzip_data = BytesIO() + with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream: + gzip_stream.write(serialized_data) + serialized_data = gzip_data.getvalue() + elif self._compression == Compression.Deflate: + serialized_data = zlib.compress(serialized_data) + + return self._exporter.export_with_retry( + timeout_millis * 1e-3, serialized_data + ) - def shutdown(self): + def shutdown(self, timeout_millis: int = 30000): if self._shutdown: _logger.warning("Exporter already shutdown, ignoring call") return + self._exporter.shutdown(timeout_millis=timeout_millis) self._session.close() self._shutdown = True diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py index 674785056a5..24c9e0e6f88 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/metrics/test_otlp_metrics_exporter.py @@ -15,11 +15,12 @@ from logging import WARNING from os import environ from unittest import TestCase -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import ANY, MagicMock, Mock, patch from requests import Session from requests.models import Response from responses import POST, activate, add +from responses.registries import OrderedRegistry from opentelemetry.exporter.otlp.proto.common.metrics_encoder import ( encode_metrics, @@ -275,7 +276,7 @@ def test_failure(self, mock_post): ) @patch.object(Session, "post") - def test_serialization(self, mock_post): + def test_serialization(self, mock_post: Mock): resp = Response() resp.status_code = 200 @@ -293,29 +294,50 @@ def test_serialization(self, mock_post): url=exporter._endpoint, data=serialized_data.SerializeToString(), verify=exporter._certificate_file, - timeout=exporter._timeout, + timeout=ANY, ) - @activate - @patch("opentelemetry.exporter.otlp.proto.http.metric_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error + # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + @activate(registry=OrderedRegistry) + def test_exponential_backoff(self): + # return 3 retryable errors and then success add( POST, "http://metrics.example.com/export", json={"error": "something exploded"}, status=500, ) + add( + POST, + "http://metrics.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + add( + POST, + "http://metrics.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + add( + POST, + "http://metrics.example.com/export", + status=200, + ) exporter = OTLPMetricExporter( endpoint="http://metrics.example.com/export" ) metrics_data = self.metrics["sum_int"] - exporter.export(metrics_data) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown_event, "wait" + ) as mock_wait: + with self.assertLogs(level="WARNING"): + self.assertIs( + exporter.export(metrics_data), MetricExportResult.SUCCESS + ) + self.assertEqual(mock_wait.call_count, 3) def test_aggregation_temporality(self): @@ -469,15 +491,16 @@ def test_exponential_explicit_bucket_histogram(self): ExplicitBucketHistogramAggregation, ) - @patch.object(OTLPMetricExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ + exporter = OTLPMetricExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ) self.assertEqual( - OTLPMetricExporter().export(MagicMock()), - MetricExportResult.SUCCESS, + exporter.export(MagicMock()), MetricExportResult.SUCCESS ) def test_preferred_aggregation_override(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py index 6b6aafd465f..5a04d2b1e6e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_log_exporter.py @@ -16,10 +16,11 @@ import unittest from typing import List -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, patch import requests import responses +from responses.registries import OrderedRegistry from opentelemetry._logs import SeverityNumber from opentelemetry.exporter.otlp.proto.http import Compression @@ -167,24 +168,38 @@ def test_exporter_env(self): ) self.assertIsInstance(exporter._session, requests.Session) - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http._log_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, + # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + @responses.activate(registry=OrderedRegistry) + def test_exponential_backoff(self): + # return 3 retryable errors and then success + responses.post( "http://logs.example.com/export", json={"error": "something exploded"}, status=500, ) - + responses.post( + "http://logs.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + responses.post( + "http://logs.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + responses.post( + "http://logs.example.com/export", + status=200, + ) exporter = OTLPLogExporter(endpoint="http://logs.example.com/export") logs = self._get_sdk_log_data() - exporter.export(logs) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown_event, "wait" + ) as mock_wait: + with self.assertLogs(level="WARNING"): + self.assertIs(exporter.export(logs), LogExportResult.SUCCESS) + self.assertEqual(mock_wait.call_count, 3) @staticmethod def _get_sdk_log_data() -> List[LogData]: @@ -256,12 +271,11 @@ def _get_sdk_log_data() -> List[LogData]: return [log1, log2, log3, log4] - @patch.object(OTLPLogExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ - - self.assertEqual( - OTLPLogExporter().export(MagicMock()), LogExportResult.SUCCESS + exporter = OTLPLogExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) ) + self.assertEqual(exporter.export(MagicMock()), LogExportResult.SUCCESS) diff --git a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py index 69874664c7a..a1c37243ac2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-http/tests/test_proto_span_exporter.py @@ -13,10 +13,11 @@ # limitations under the License. import unittest -from unittest.mock import MagicMock, Mock, call, patch +from unittest.mock import MagicMock, Mock, patch import requests import responses +from responses.registries import OrderedRegistry from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( @@ -201,21 +202,34 @@ def test_headers_parse_from_env(self): ), ) - # pylint: disable=no-self-use - @responses.activate - @patch("opentelemetry.exporter.otlp.proto.http.trace_exporter.sleep") - def test_exponential_backoff(self, mock_sleep): - # return a retryable error - responses.add( - responses.POST, + # pylint: disable=unexpected-keyword-arg,no-value-for-parameter + @responses.activate(registry=OrderedRegistry) + def test_retryable_error(self): + # return 3 retryable errors and then success + responses.post( "http://traces.example.com/export", json={"error": "something exploded"}, status=500, ) + responses.post( + "http://traces.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + responses.post( + "http://traces.example.com/export", + json={"error": "something exploded"}, + status=500, + ) + responses.post( + "http://traces.example.com/export", + status=200, + ) exporter = OTLPSpanExporter( endpoint="http://traces.example.com/export" ) + span = _Span( "abc", context=Mock( @@ -227,17 +241,23 @@ def test_exponential_backoff(self, mock_sleep): ), ) - exporter.export([span]) - mock_sleep.assert_has_calls( - [call(1), call(2), call(4), call(8), call(16), call(32)] - ) + with patch.object( + exporter._exporter._shutdown_event, "wait" + ) as mock_wait: + with self.assertLogs(level="WARNING"): + self.assertIs( + exporter.export([span]), SpanExportResult.SUCCESS + ) + self.assertEqual(mock_wait.call_count, 3) - @patch.object(OTLPSpanExporter, "_export", return_value=Mock(ok=True)) - def test_2xx_status_code(self, mock_otlp_metric_exporter): + def test_2xx_status_code(self): """ Test that any HTTP 2XX code returns a successful result """ - + exporter = OTLPSpanExporter( + session=Mock(**{"post.return_value": Mock(ok=True)}) + ) self.assertEqual( - OTLPSpanExporter().export(MagicMock()), SpanExportResult.SUCCESS + exporter.export(MagicMock()), + SpanExportResult.SUCCESS, )