From d2eda077019f610741bf40de71ffc14d4e3e2e23 Mon Sep 17 00:00:00 2001 From: Georgi Date: Fri, 22 Mar 2024 22:20:53 +0100 Subject: [PATCH 1/2] Centralize metrics submission in the forwarder --- .../caching/base_tags_cache.py | 3 +- .../caching/cloudwatch_log_group_cache.py | 3 +- aws/logs_monitoring/caching/common.py | 23 --------------- aws/logs_monitoring/caching/lambda_cache.py | 6 ++-- aws/logs_monitoring/caching/s3_tags_cache.py | 6 ++-- .../caching/step_functions_cache.py | 2 +- aws/logs_monitoring/forwarders.py | 28 ++++--------------- aws/logs_monitoring/steps/parsing.py | 13 ++------- aws/logs_monitoring/telemetry.py | 24 ++++++++++++++-- .../tests/test_awslogs_handler.py | 2 +- .../tests/test_enhanced_lambda_metrics.py | 2 +- aws/logs_monitoring/tests/test_parsing.py | 2 +- 12 files changed, 41 insertions(+), 73 deletions(-) diff --git a/aws/logs_monitoring/caching/base_tags_cache.py b/aws/logs_monitoring/caching/base_tags_cache.py index e69f349f6..e4d75bb15 100644 --- a/aws/logs_monitoring/caching/base_tags_cache.py +++ b/aws/logs_monitoring/caching/base_tags_cache.py @@ -10,7 +10,8 @@ DD_TAGS_CACHE_TTL_SECONDS, DD_S3_CACHE_LOCK_TTL_SECONDS, ) -from caching.common import get_last_modified_time, send_forwarder_internal_metrics +from caching.common import get_last_modified_time +from telemetry import send_forwarder_internal_metrics JITTER_MIN = 1 JITTER_MAX = 100 diff --git a/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py b/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py index 741608269..0cb1c73ee 100644 --- a/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py +++ b/aws/logs_monitoring/caching/cloudwatch_log_group_cache.py @@ -1,7 +1,8 @@ import os import boto3 from caching.base_tags_cache import BaseTagsCache -from caching.common import sanitize_aws_tag_string, send_forwarder_internal_metrics +from caching.common import sanitize_aws_tag_string +from telemetry import send_forwarder_internal_metrics from settings import ( DD_S3_LOG_GROUP_CACHE_FILENAME, DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME, diff --git a/aws/logs_monitoring/caching/common.py b/aws/logs_monitoring/caching/common.py index 55f3af11e..7d7db8816 100644 --- a/aws/logs_monitoring/caching/common.py +++ b/aws/logs_monitoring/caching/common.py @@ -3,24 +3,10 @@ import logging import re from collections import defaultdict -from telemetry import ( - DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, - get_forwarder_telemetry_tags, -) logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) -try: - from datadog_lambda.metric import lambda_stats - - DD_SUBMIT_ENHANCED_METRICS = True -except ImportError: - logger.debug( - "Could not import from the Datadog Lambda layer so enhanced metrics won't be submitted. " - "Add the Datadog Lambda layer to this function to submit enhanced metrics." - ) - DD_SUBMIT_ENHANCED_METRICS = False _other_chars = r"\w:\-\.\/" Sanitize = re.compile(r"[^%s]" % _other_chars, re.UNICODE).sub @@ -28,15 +14,6 @@ FixInit = re.compile(r"^[_\d]*", re.UNICODE).sub -def send_forwarder_internal_metrics(name, additional_tags=[]): - """Send forwarder's internal metrics to DD""" - lambda_stats.distribution( - "{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name), - 1, - tags=get_forwarder_telemetry_tags() + additional_tags, - ) - - def get_last_modified_time(s3_file): last_modified_str = s3_file["ResponseMetadata"]["HTTPHeaders"]["last-modified"] last_modified_date = datetime.datetime.strptime( diff --git a/aws/logs_monitoring/caching/lambda_cache.py b/aws/logs_monitoring/caching/lambda_cache.py index a8da76a5f..f23c645d7 100644 --- a/aws/logs_monitoring/caching/lambda_cache.py +++ b/aws/logs_monitoring/caching/lambda_cache.py @@ -1,10 +1,8 @@ import os from botocore.exceptions import ClientError from caching.base_tags_cache import BaseTagsCache -from caching.common import ( - send_forwarder_internal_metrics, - parse_get_resources_response_for_tags_by_arn, -) +from caching.common import parse_get_resources_response_for_tags_by_arn +from telemetry import send_forwarder_internal_metrics from settings import ( DD_S3_CACHE_FILENAME, DD_S3_CACHE_LOCK_FILENAME, diff --git a/aws/logs_monitoring/caching/s3_tags_cache.py b/aws/logs_monitoring/caching/s3_tags_cache.py index 3d7e320d5..b60c8737c 100644 --- a/aws/logs_monitoring/caching/s3_tags_cache.py +++ b/aws/logs_monitoring/caching/s3_tags_cache.py @@ -1,9 +1,7 @@ from botocore.exceptions import ClientError from caching.base_tags_cache import BaseTagsCache -from caching.common import ( - send_forwarder_internal_metrics, - parse_get_resources_response_for_tags_by_arn, -) +from caching.common import parse_get_resources_response_for_tags_by_arn +from telemetry import send_forwarder_internal_metrics from settings import ( DD_S3_TAGS_CACHE_FILENAME, DD_S3_TAGS_CACHE_LOCK_FILENAME, diff --git a/aws/logs_monitoring/caching/step_functions_cache.py b/aws/logs_monitoring/caching/step_functions_cache.py index 7cff68685..7c26411c1 100644 --- a/aws/logs_monitoring/caching/step_functions_cache.py +++ b/aws/logs_monitoring/caching/step_functions_cache.py @@ -4,8 +4,8 @@ from caching.common import ( sanitize_aws_tag_string, parse_get_resources_response_for_tags_by_arn, - send_forwarder_internal_metrics, ) +from telemetry import send_forwarder_internal_metrics from settings import ( DD_S3_STEP_FUNCTIONS_CACHE_FILENAME, DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME, diff --git a/aws/logs_monitoring/forwarders.py b/aws/logs_monitoring/forwarders.py index dc5980876..ccadaac26 100644 --- a/aws/logs_monitoring/forwarders.py +++ b/aws/logs_monitoring/forwarders.py @@ -2,11 +2,7 @@ import json import os -from telemetry import ( - DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, - get_forwarder_telemetry_tags, -) -from datadog_lambda.metric import lambda_stats +from telemetry import send_event_metric, send_log_metric from trace_forwarder.connection import TraceConnection from logs.logs import ( DatadogScrubber, @@ -65,11 +61,7 @@ def forward_logs(logs): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Forwarded log batch: {json.dumps(batch)}") - lambda_stats.distribution( - "{}.logs_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX), - len(logs_to_forward), - tags=get_forwarder_telemetry_tags(), - ) + send_event_metric("logs_forwarded", len(logs_to_forward)) def forward_metrics(metrics): @@ -82,20 +74,14 @@ def forward_metrics(metrics): for metric in metrics: try: - lambda_stats.distribution( - metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"] - ) + send_log_metric(metric) except Exception: logger.exception(f"Exception while forwarding metric {json.dumps(metric)}") else: if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Forwarded metric: {json.dumps(metric)}") - lambda_stats.distribution( - "{}.metrics_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX), - len(metrics), - tags=get_forwarder_telemetry_tags(), - ) + send_event_metric("metrics_forwarded", len(metrics)) def forward_traces(trace_payloads): @@ -112,8 +98,4 @@ def forward_traces(trace_payloads): if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}") - lambda_stats.distribution( - "{}.traces_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX), - len(trace_payloads), - tags=get_forwarder_telemetry_tags(), - ) + send_event_metric("traces_forwarded", len(trace_payloads)) diff --git a/aws/logs_monitoring/steps/parsing.py b/aws/logs_monitoring/steps/parsing.py index 87d9144a1..a70a71245 100644 --- a/aws/logs_monitoring/steps/parsing.py +++ b/aws/logs_monitoring/steps/parsing.py @@ -7,12 +7,7 @@ import os import itertools import logging -from datadog_lambda.metric import lambda_stats -from telemetry import ( - DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, - get_forwarder_telemetry_tags, - set_forwarder_telemetry_tags, -) +from telemetry import set_forwarder_telemetry_tags, send_event_metric from steps.handlers.awslogs_handler import awslogs_handler from steps.handlers.s3_handler import s3_handler from steps.common import ( @@ -182,10 +177,6 @@ def normalize_events(events, metadata): continue """Submit count of total events""" - lambda_stats.distribution( - "{}.incoming_events".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX), - events_counter, - tags=get_forwarder_telemetry_tags(), - ) + send_event_metric("incoming_events", events_counter) return normalized diff --git a/aws/logs_monitoring/telemetry.py b/aws/logs_monitoring/telemetry.py index 63263dc2c..34e849806 100644 --- a/aws/logs_monitoring/telemetry.py +++ b/aws/logs_monitoring/telemetry.py @@ -3,6 +3,7 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2021 Datadog, Inc. +from datadog_lambda.metric import lambda_stats from settings import DD_FORWARDER_VERSION DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX = "aws.dd_forwarder" @@ -22,5 +23,24 @@ def set_forwarder_telemetry_tags(context, event_type): ] -def get_forwarder_telemetry_tags(): - return DD_FORWARDER_TELEMETRY_TAGS +def send_forwarder_internal_metrics(name, additional_tags=[]): + """Send forwarder's internal metrics to DD""" + lambda_stats.distribution( + "{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name), + 1, + tags=DD_FORWARDER_TELEMETRY_TAGS + additional_tags, + ) + + +def send_event_metric(metric_name, metric_value): + lambda_stats.distribution( + "{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, metric_name), + metric_value, + tags=DD_FORWARDER_TELEMETRY_TAGS, + ) + + +def send_log_metric(metric): + lambda_stats.distribution( + metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"] + ) diff --git a/aws/logs_monitoring/tests/test_awslogs_handler.py b/aws/logs_monitoring/tests/test_awslogs_handler.py index a903bc5b5..9cf01c3ce 100644 --- a/aws/logs_monitoring/tests/test_awslogs_handler.py +++ b/aws/logs_monitoring/tests/test_awslogs_handler.py @@ -7,7 +7,6 @@ from unittest.mock import patch, MagicMock from approvaltests.approvals import verify_as_json from approvaltests.namer import NamerFactory -from caching.cache_layer import CacheLayer sys.modules["trace_forwarder.connection"] = MagicMock() sys.modules["datadog_lambda.wrapper"] = MagicMock() @@ -30,6 +29,7 @@ get_lower_cased_lambda_function_name, ) from steps.handlers.aws_attributes import AwsAttributes +from caching.cache_layer import CacheLayer env_patch.stop() diff --git a/aws/logs_monitoring/tests/test_enhanced_lambda_metrics.py b/aws/logs_monitoring/tests/test_enhanced_lambda_metrics.py index 16a50bf41..f71010f74 100644 --- a/aws/logs_monitoring/tests/test_enhanced_lambda_metrics.py +++ b/aws/logs_monitoring/tests/test_enhanced_lambda_metrics.py @@ -297,7 +297,7 @@ def test_generate_enhanced_lambda_metrics_timeout( del os.environ["DD_FETCH_LAMBDA_TAGS"] @patch("caching.lambda_cache.send_forwarder_internal_metrics") - @patch("caching.common.send_forwarder_internal_metrics") + @patch("telemetry.send_forwarder_internal_metrics") @patch("caching.lambda_cache.LambdaTagsCache.get_cache_from_s3") def test_generate_enhanced_lambda_metrics_out_of_memory( self, mock_get_s3_cache, mock_forward_metrics, mock_base_forward_metrics diff --git a/aws/logs_monitoring/tests/test_parsing.py b/aws/logs_monitoring/tests/test_parsing.py index 6e6186bb7..bb2ed83cc 100644 --- a/aws/logs_monitoring/tests/test_parsing.py +++ b/aws/logs_monitoring/tests/test_parsing.py @@ -4,11 +4,11 @@ parse_event_source, get_service_from_tags_and_remove_duplicates, ) +from steps.enums import AwsEventSource from settings import ( DD_CUSTOM_TAGS, DD_SOURCE, ) -from steps.enums import AwsEventSource class TestParseEventSource(unittest.TestCase): From 1e036becdb6dae08e2ac8800b107836bba648f7e Mon Sep 17 00:00:00 2001 From: Georgi Date: Mon, 25 Mar 2024 10:55:32 +0100 Subject: [PATCH 2/2] Add a check on lambda_stats import --- aws/logs_monitoring/telemetry.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/aws/logs_monitoring/telemetry.py b/aws/logs_monitoring/telemetry.py index 34e849806..7506a46ba 100644 --- a/aws/logs_monitoring/telemetry.py +++ b/aws/logs_monitoring/telemetry.py @@ -3,7 +3,13 @@ # This product includes software developed at Datadog (https://www.datadoghq.com/). # Copyright 2021 Datadog, Inc. -from datadog_lambda.metric import lambda_stats +try: + from datadog_lambda.metric import lambda_stats + + DD_SUBMIT_ENHANCED_METRICS = True +except ImportError: + DD_SUBMIT_ENHANCED_METRICS = False + from settings import DD_FORWARDER_VERSION DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX = "aws.dd_forwarder" @@ -24,6 +30,9 @@ def set_forwarder_telemetry_tags(context, event_type): def send_forwarder_internal_metrics(name, additional_tags=[]): + if not DD_SUBMIT_ENHANCED_METRICS: + return + """Send forwarder's internal metrics to DD""" lambda_stats.distribution( "{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name), @@ -33,6 +42,9 @@ def send_forwarder_internal_metrics(name, additional_tags=[]): def send_event_metric(metric_name, metric_value): + if not DD_SUBMIT_ENHANCED_METRICS: + return + lambda_stats.distribution( "{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, metric_name), metric_value, @@ -41,6 +53,9 @@ def send_event_metric(metric_name, metric_value): def send_log_metric(metric): + if not DD_SUBMIT_ENHANCED_METRICS: + return + lambda_stats.distribution( metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"] )