Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aws/logs_monitoring/caching/base_tags_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
DD_TAGS_CACHE_TTL_SECONDS,
DD_S3_CACHE_LOCK_TTL_SECONDS,
)
from caching.common import get_last_modified_time, send_forwarder_internal_metrics
from caching.common import get_last_modified_time
from telemetry import send_forwarder_internal_metrics

JITTER_MIN = 1
JITTER_MAX = 100
Expand Down
3 changes: 2 additions & 1 deletion aws/logs_monitoring/caching/cloudwatch_log_group_cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import boto3
from caching.base_tags_cache import BaseTagsCache
from caching.common import sanitize_aws_tag_string, send_forwarder_internal_metrics
from caching.common import sanitize_aws_tag_string
from telemetry import send_forwarder_internal_metrics
from settings import (
DD_S3_LOG_GROUP_CACHE_FILENAME,
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
Expand Down
23 changes: 0 additions & 23 deletions aws/logs_monitoring/caching/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,17 @@
import logging
import re
from collections import defaultdict
from telemetry import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
get_forwarder_telemetry_tags,
)

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))

try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That DD_SUBMIT_ENHANCED_METRICS bit is important I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were not using it in the cache metrics submission, only in enhanced metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case I added the same check here

from datadog_lambda.metric import lambda_stats

DD_SUBMIT_ENHANCED_METRICS = True
except ImportError:
logger.debug(
"Could not import from the Datadog Lambda layer so enhanced metrics won't be submitted. "
"Add the Datadog Lambda layer to this function to submit enhanced metrics."
)
DD_SUBMIT_ENHANCED_METRICS = False

_other_chars = r"\w:\-\.\/"
Sanitize = re.compile(r"[^%s]" % _other_chars, re.UNICODE).sub
Dedupe = re.compile(r"_+", re.UNICODE).sub
FixInit = re.compile(r"^[_\d]*", re.UNICODE).sub


def send_forwarder_internal_metrics(name, additional_tags=[]):
"""Send forwarder's internal metrics to DD"""
lambda_stats.distribution(
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name),
1,
tags=get_forwarder_telemetry_tags() + additional_tags,
)


def get_last_modified_time(s3_file):
last_modified_str = s3_file["ResponseMetadata"]["HTTPHeaders"]["last-modified"]
last_modified_date = datetime.datetime.strptime(
Expand Down
6 changes: 2 additions & 4 deletions aws/logs_monitoring/caching/lambda_cache.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import os
from botocore.exceptions import ClientError
from caching.base_tags_cache import BaseTagsCache
from caching.common import (
send_forwarder_internal_metrics,
parse_get_resources_response_for_tags_by_arn,
)
from caching.common import parse_get_resources_response_for_tags_by_arn
from telemetry import send_forwarder_internal_metrics
from settings import (
DD_S3_CACHE_FILENAME,
DD_S3_CACHE_LOCK_FILENAME,
Expand Down
6 changes: 2 additions & 4 deletions aws/logs_monitoring/caching/s3_tags_cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from botocore.exceptions import ClientError
from caching.base_tags_cache import BaseTagsCache
from caching.common import (
send_forwarder_internal_metrics,
parse_get_resources_response_for_tags_by_arn,
)
from caching.common import parse_get_resources_response_for_tags_by_arn
from telemetry import send_forwarder_internal_metrics
from settings import (
DD_S3_TAGS_CACHE_FILENAME,
DD_S3_TAGS_CACHE_LOCK_FILENAME,
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/caching/step_functions_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from caching.common import (
sanitize_aws_tag_string,
parse_get_resources_response_for_tags_by_arn,
send_forwarder_internal_metrics,
)
from telemetry import send_forwarder_internal_metrics
from settings import (
DD_S3_STEP_FUNCTIONS_CACHE_FILENAME,
DD_S3_STEP_FUNCTIONS_CACHE_LOCK_FILENAME,
Expand Down
28 changes: 5 additions & 23 deletions aws/logs_monitoring/forwarders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@
import json
import os

from telemetry import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
get_forwarder_telemetry_tags,
)
from datadog_lambda.metric import lambda_stats
from telemetry import send_event_metric, send_log_metric
from trace_forwarder.connection import TraceConnection
from logs.logs import (
DatadogScrubber,
Expand Down Expand Up @@ -65,11 +61,7 @@ def forward_logs(logs):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded log batch: {json.dumps(batch)}")

lambda_stats.distribution(
"{}.logs_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(logs_to_forward),
tags=get_forwarder_telemetry_tags(),
)
send_event_metric("logs_forwarded", len(logs_to_forward))


def forward_metrics(metrics):
Expand All @@ -82,20 +74,14 @@ def forward_metrics(metrics):

for metric in metrics:
try:
lambda_stats.distribution(
metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"]
)
send_log_metric(metric)
except Exception:
logger.exception(f"Exception while forwarding metric {json.dumps(metric)}")
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded metric: {json.dumps(metric)}")

lambda_stats.distribution(
"{}.metrics_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(metrics),
tags=get_forwarder_telemetry_tags(),
)
send_event_metric("metrics_forwarded", len(metrics))


def forward_traces(trace_payloads):
Expand All @@ -112,8 +98,4 @@ def forward_traces(trace_payloads):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Forwarded traces: {json.dumps(trace_payloads)}")

lambda_stats.distribution(
"{}.traces_forwarded".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
len(trace_payloads),
tags=get_forwarder_telemetry_tags(),
)
send_event_metric("traces_forwarded", len(trace_payloads))
13 changes: 2 additions & 11 deletions aws/logs_monitoring/steps/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,7 @@
import os
import itertools
import logging
from datadog_lambda.metric import lambda_stats
from telemetry import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
get_forwarder_telemetry_tags,
set_forwarder_telemetry_tags,
)
from telemetry import set_forwarder_telemetry_tags, send_event_metric
from steps.handlers.awslogs_handler import awslogs_handler
from steps.handlers.s3_handler import s3_handler
from steps.common import (
Expand Down Expand Up @@ -182,10 +177,6 @@ def normalize_events(events, metadata):
continue

"""Submit count of total events"""
lambda_stats.distribution(
"{}.incoming_events".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX),
events_counter,
tags=get_forwarder_telemetry_tags(),
)
send_event_metric("incoming_events", events_counter)

return normalized
39 changes: 37 additions & 2 deletions aws/logs_monitoring/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.

try:
from datadog_lambda.metric import lambda_stats

DD_SUBMIT_ENHANCED_METRICS = True
except ImportError:
DD_SUBMIT_ENHANCED_METRICS = False

from settings import DD_FORWARDER_VERSION

DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX = "aws.dd_forwarder"
Expand All @@ -22,5 +29,33 @@ def set_forwarder_telemetry_tags(context, event_type):
]


def get_forwarder_telemetry_tags():
return DD_FORWARDER_TELEMETRY_TAGS
def send_forwarder_internal_metrics(name, additional_tags=[]):
if not DD_SUBMIT_ENHANCED_METRICS:
return

"""Send forwarder's internal metrics to DD"""
lambda_stats.distribution(
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, name),
1,
tags=DD_FORWARDER_TELEMETRY_TAGS + additional_tags,
)


def send_event_metric(metric_name, metric_value):
if not DD_SUBMIT_ENHANCED_METRICS:
return

lambda_stats.distribution(
"{}.{}".format(DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX, metric_name),
metric_value,
tags=DD_FORWARDER_TELEMETRY_TAGS,
)


def send_log_metric(metric):
if not DD_SUBMIT_ENHANCED_METRICS:
return

lambda_stats.distribution(
metric["m"], metric["v"], timestamp=metric["e"], tags=metric["t"]
)
2 changes: 1 addition & 1 deletion aws/logs_monitoring/tests/test_awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from unittest.mock import patch, MagicMock
from approvaltests.approvals import verify_as_json
from approvaltests.namer import NamerFactory
from caching.cache_layer import CacheLayer

sys.modules["trace_forwarder.connection"] = MagicMock()
sys.modules["datadog_lambda.wrapper"] = MagicMock()
Expand All @@ -30,6 +29,7 @@
get_lower_cased_lambda_function_name,
)
from steps.handlers.aws_attributes import AwsAttributes
from caching.cache_layer import CacheLayer

env_patch.stop()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def test_generate_enhanced_lambda_metrics_timeout(
del os.environ["DD_FETCH_LAMBDA_TAGS"]

@patch("caching.lambda_cache.send_forwarder_internal_metrics")
@patch("caching.common.send_forwarder_internal_metrics")
@patch("telemetry.send_forwarder_internal_metrics")
@patch("caching.lambda_cache.LambdaTagsCache.get_cache_from_s3")
def test_generate_enhanced_lambda_metrics_out_of_memory(
self, mock_get_s3_cache, mock_forward_metrics, mock_base_forward_metrics
Expand Down
2 changes: 1 addition & 1 deletion aws/logs_monitoring/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
parse_event_source,
get_service_from_tags_and_remove_duplicates,
)
from steps.enums import AwsEventSource
from settings import (
DD_CUSTOM_TAGS,
DD_SOURCE,
)
from steps.enums import AwsEventSource


class TestParseEventSource(unittest.TestCase):
Expand Down