Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions src/elasticotel/distro/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
from opentelemetry._opamp import messages
from opentelemetry._opamp.agent import OpAMPAgent
from opentelemetry._opamp.client import OpAMPClient
from opentelemetry._opamp.exceptions import (
OpAMPRemoteConfigDecodeException,
OpAMPRemoteConfigParseException,
)
from opentelemetry._opamp.proto import opamp_pb2 as opamp_pb2


Expand Down Expand Up @@ -145,17 +149,21 @@ def opamp_handler(agent: OpAMPAgent, client: OpAMPClient, message: opamp_pb2.Ser

_config = _get_config()
error_messages = []
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
# we don't have standardized config values so limit to configs coming from our backend
if config_filename == "elastic":
logger.debug("Config %s: %s", config_filename, remote_config)
config_update = _handle_logging_level(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)

config_update = _handle_sampling_rate(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)
try:
for config_filename, remote_config in messages._decode_remote_config(message.remote_config):
# we don't have standardized config values so limit to configs coming from our backend
if config_filename == "elastic":
logger.debug("Config %s: %s", config_filename, remote_config)
config_update = _handle_logging_level(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)

config_update = _handle_sampling_rate(remote_config)
if config_update.error_message:
error_messages.append(config_update.error_message)
except (OpAMPRemoteConfigParseException, OpAMPRemoteConfigDecodeException) as exc:
logger.error(str(exc))
error_messages.append(str(exc))

error_message = "\n".join(error_messages)
status = opamp_pb2.RemoteConfigStatuses_FAILED if error_message else opamp_pb2.RemoteConfigStatuses_APPLIED
Expand Down
3 changes: 1 addition & 2 deletions src/opentelemetry/_opamp/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ def _decode_remote_config(remote_config: opamp_pb2.AgentRemoteConfig) -> Generat
config_data = json.loads(body)
except (UnicodeDecodeError, json.JSONDecodeError) as exc:
raise OpAMPRemoteConfigDecodeException(
f"Failed to decode {config_file} with content type {config_file.content_type}: {exc}"
f"Failed to decode {config_file_name} with content type {config_file.content_type}: {exc}"
)
continue

yield config_file_name, config_data
else:
Expand Down
6 changes: 5 additions & 1 deletion src/opentelemetry/_opamp/transport/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Mapping

import requests
Expand All @@ -22,6 +23,8 @@
from opentelemetry._opamp.transport.exceptions import OpAMPException
from opentelemetry._opamp.transport.base import HttpTransport, base_headers

logger = logging.getLogger(__name__)


class RequestsTransport(HttpTransport):
# TODO: move some stuff here instead of send?
Expand All @@ -35,7 +38,8 @@ def send(self, url: str, headers: Mapping[str, str], data: bytes, timeout_millis
try:
response = self.session.post(url, headers=headers, data=data, timeout=timeout)
response.raise_for_status()
except Exception:
except Exception as exc:
logger.error(str(exc))
raise OpAMPException

message = messages._decode_message(response.content)
Expand Down
56 changes: 56 additions & 0 deletions tests/distro/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,62 @@ def test_ignores_non_elastic_filename(self, get_logger_mock, get_config_mock):
agent.send.assert_called_once_with(payload=mock.ANY)
client._build_full_state_message.assert_not_called()

@mock.patch("elasticotel.distro.config._get_config")
@mock.patch("elasticotel.distro.config.logger")
def test_fails_if_cannot_decode_elastic_config_json(self, logger_mock, get_config_mock):
get_config_mock.return_value = Config()
agent = mock.Mock()
client = mock.Mock()
config = opamp_pb2.AgentConfigMap()
config.config_map["elastic"].body = b"{"
config.config_map["elastic"].content_type = "application/json"
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
opamp_handler(agent, client, message)

error_message = "Failed to decode elastic with content type application/json: Expecting property name enclosed in double quotes: line 1 column 2 (char 1)"
logger_mock.error.assert_called_once_with(error_message)

client._update_remote_config_status.assert_called_once_with(
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message
)
client._update_effective_config.assert_called_once_with(
{"elastic": {"logging_level": "info", "sampling_rate": "1.0"}}
)
client._build_remote_config_status_response_message.assert_called_once_with(
client._update_remote_config_status()
)
agent.send.assert_called_once_with(payload=mock.ANY)
client._build_full_state_message.assert_not_called()

@mock.patch("elasticotel.distro.config._get_config")
@mock.patch("elasticotel.distro.config.logger")
def test_fails_if_elastic_config_is_not_json(self, logger_mock, get_config_mock):
get_config_mock.return_value = Config()
agent = mock.Mock()
client = mock.Mock()
config = opamp_pb2.AgentConfigMap()
config.config_map["elastic"].body = b"not-json"
config.config_map["elastic"].content_type = "not/json"
remote_config = opamp_pb2.AgentRemoteConfig(config=config, config_hash=b"1234")
message = opamp_pb2.ServerToAgent(remote_config=remote_config)
opamp_handler(agent, client, message)

error_message = "Cannot parse elastic with content type not/json"
logger_mock.error.assert_called_once_with(error_message)

client._update_remote_config_status.assert_called_once_with(
remote_config_hash=b"1234", status=opamp_pb2.RemoteConfigStatuses_FAILED, error_message=error_message
)
client._update_effective_config.assert_called_once_with(
{"elastic": {"logging_level": "info", "sampling_rate": "1.0"}}
)
client._build_remote_config_status_response_message.assert_called_once_with(
client._update_remote_config_status()
)
agent.send.assert_called_once_with(payload=mock.ANY)
client._build_full_state_message.assert_not_called()

@mock.patch("elasticotel.distro.config._get_config")
@mock.patch.object(logging, "getLogger")
def test_sets_matching_logging_level(self, get_logger_mock, get_config_mock):
Expand Down