From b442bcd078e74ef2f8bc114241d524156cd6ba9e Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 2 Oct 2025 11:09:56 +0200 Subject: [PATCH 1/2] opamp: log transport send exception --- src/opentelemetry/_opamp/transport/requests.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/opentelemetry/_opamp/transport/requests.py b/src/opentelemetry/_opamp/transport/requests.py index 975c671..555c75a 100644 --- a/src/opentelemetry/_opamp/transport/requests.py +++ b/src/opentelemetry/_opamp/transport/requests.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging from typing import Mapping import requests @@ -22,6 +23,8 @@ from opentelemetry._opamp.transport.exceptions import OpAMPException from opentelemetry._opamp.transport.base import HttpTransport, base_headers +logger = logging.getLogger(__name__) + class RequestsTransport(HttpTransport): # TODO: move some stuff here instead of send? @@ -35,7 +38,8 @@ def send(self, url: str, headers: Mapping[str, str], data: bytes, timeout_millis try: response = self.session.post(url, headers=headers, data=data, timeout=timeout) response.raise_for_status() - except Exception: + except Exception as exc: + logger.error(str(exc)) raise OpAMPException message = messages._decode_message(response.content) From 450a5af21e87b1de215e479987149eef1c6d9431 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Thu, 2 Oct 2025 11:49:33 +0200 Subject: [PATCH 2/2] distro: message handler should handle remote config decode exceptions So that we don't go into a loop if we don't ackwnoledge the message to the server. --- src/elasticotel/distro/config.py | 30 +++++++++------ src/opentelemetry/_opamp/messages.py | 3 +- tests/distro/test_distro.py | 56 ++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 13 deletions(-) diff --git a/src/elasticotel/distro/config.py b/src/elasticotel/distro/config.py index 88820ed..9ca3371 100644 --- a/src/elasticotel/distro/config.py +++ b/src/elasticotel/distro/config.py @@ -25,6 +25,10 @@ from opentelemetry._opamp import messages from opentelemetry._opamp.agent import OpAMPAgent from opentelemetry._opamp.client import OpAMPClient +from opentelemetry._opamp.exceptions import ( + OpAMPRemoteConfigDecodeException, + OpAMPRemoteConfigParseException, +) from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2 @@ -145,17 +149,21 @@ def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.Ser _config = _get_config() error_messages = [] - for config_filename, remote_config in messages._decode_remote_config(message.remote_config): - # we don't have standardized config values so limit to configs coming from our backend - if config_filename == "elastic": - logger.debug("Config %s: %s", config_filename, remote_config) - config_update = _handle_logging_level(remote_config) - if config_update.error_message: - error_messages.append(config_update.error_message) - - config_update = _handle_sampling_rate(remote_config) - if config_update.error_message: - error_messages.append(config_update.error_message) + try: + for config_filename, remote_config in messages._decode_remote_config(message.remote_config): + # we don't have standardized config values so limit to configs coming from our backend + if config_filename == "elastic": + logger.debug("Config %s: %s", config_filename, remote_config) + config_update = _handle_logging_level(remote_config) + if config_update.error_message: + error_messages.append(config_update.error_message) + + config_update = _handle_sampling_rate(remote_config) + if config_update.error_message: + error_messages.append(config_update.error_message) + except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc: + logger.error(str(exc)) + error_messages.append(str(exc)) error_message = "\n".join(error_messages) status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED diff --git a/src/opentelemetry/_opamp/messages.py b/src/opentelemetry/_opamp/messages.py index f8a366f..4878f83 100644 --- a/src/opentelemetry/_opamp/messages.py +++ b/src/opentelemetry/_opamp/messages.py @@ -157,9 +157,8 @@ def _decode_remote_config(remote_config: opamp_pb2.AgentRemoteConfig) -> Generat config_data = json.loads(body) except (UnicodeDecodeError, json.JSONDecodeError) as exc: raise OpAMPRemoteConfigDecodeException( - f"Failed to decode {config_file} with content type {config_file.content_type}: {exc}" + f"Failed to decode {config_file_name} with content type {config_file.content_type}: {exc}" ) - continue yield config_file_name, config_data else: diff --git a/tests/distro/test_distro.py b/tests/distro/test_distro.py index 0340a02..4f2ed33 100644 --- a/tests/distro/test_distro.py +++ b/tests/distro/test_distro.py @@ -273,6 +273,62 @@ def test_ignores_non_elastic_filename(self, get_logger_mock, get_config_mock): agent.send.assert_called_once_with(payload=mock.ANY) client._build_full_state_message.assert_not_called() + @mock.patch("elasticotel.distro.config._get_config") + @mock.patch("elasticotel.distro.config.logger") + def test_fails_if_cannot_decode_elastic_config_json(self, logger_mock, get_config_mock): + get_config_mock.return_value = Config() + agent = mock.Mock() + client = mock.Mock() + config = opamp_pb2.AgentConfigMap() + config.config_map["elastic"].body = b"{" + config.config_map["elastic"].content_type = "application/json" + remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234") + message = opamp_pb2.ServerToAgent(remote_config=remote_config) + opamp_handler(agent, client, message) + + error_message = "Failed to decode elastic with content type application/json: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)" + logger_mock.error.assert_called_once_with(error_message) + + client._update_remote_config_status.assert_called_once_with( + remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message + ) + client._update_effective_config.assert_called_once_with( + {"elastic": {"logging_level": "info", "sampling_rate": "1.0"}} + ) + client._build_remote_config_status_response_message.assert_called_once_with( + client._update_remote_config_status() + ) + agent.send.assert_called_once_with(payload=mock.ANY) + client._build_full_state_message.assert_not_called() + + @mock.patch("elasticotel.distro.config._get_config") + @mock.patch("elasticotel.distro.config.logger") + def test_fails_if_elastic_config_is_not_json(self, logger_mock, get_config_mock): + get_config_mock.return_value = Config() + agent = mock.Mock() + client = mock.Mock() + config = opamp_pb2.AgentConfigMap() + config.config_map["elastic"].body = b"not-json" + config.config_map["elastic"].content_type = "not/json" + remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234") + message = opamp_pb2.ServerToAgent(remote_config=remote_config) + opamp_handler(agent, client, message) + + error_message = "Cannot parse elastic with content type not/json" + logger_mock.error.assert_called_once_with(error_message) + + client._update_remote_config_status.assert_called_once_with( + remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message + ) + client._update_effective_config.assert_called_once_with( + {"elastic": {"logging_level": "info", "sampling_rate": "1.0"}} + ) + client._build_remote_config_status_response_message.assert_called_once_with( + client._update_remote_config_status() + ) + agent.send.assert_called_once_with(payload=mock.ANY) + client._build_full_state_message.assert_not_called() + @mock.patch("elasticotel.distro.config._get_config") @mock.patch.object(logging, "getLogger") def test_sets_matching_logging_level(self, get_logger_mock, get_config_mock):