Skip to content

Commit ecf1278

Browse files
authored
Forward Step Functions Tags with logs to the backend (#618)
1 parent dfeb6b2 commit ecf1278

23 files changed

+849
-246
lines changed

aws/logs_monitoring/.dockerignore

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,4 @@
11
*
2-
!lambda_function.py
3-
!enhanced_lambda_metrics.py
4-
!logs.py
5-
!parsing.py
6-
!cache.py
7-
!telemetry.py
8-
!settings.py
9-
!setup.py
2+
!**.py
103
!template.yaml
11-
!trace_forwarder/bin
12-
!trace_forwarder/__init__.py
13-
!trace_forwarder/connection.py
4+
!trace_forwarder/bin

aws/logs_monitoring/cache.py renamed to aws/logs_monitoring/base_tags_cache.py

Lines changed: 6 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,12 @@
1414
DD_S3_BUCKET_NAME,
1515
DD_TAGS_CACHE_TTL_SECONDS,
1616
DD_S3_CACHE_LOCK_TTL_SECONDS,
17-
DD_S3_CACHE_FILENAME,
18-
DD_S3_CACHE_LOCK_FILENAME,
19-
DD_S3_LOG_GROUP_CACHE_FILENAME,
20-
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
2117
)
2218
from telemetry import (
2319
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
2420
get_forwarder_telemetry_tags,
2521
)
2622

27-
2823
JITTER_MIN = 1
2924
JITTER_MAX = 100
3025

@@ -99,6 +94,11 @@ def should_fetch_log_group_tags():
9994
return os.environ.get("DD_FETCH_LOG_GROUP_TAGS", "false").lower() == "true"
10095

10196

97+
def should_fetch_step_functions_tags():
98+
"""Checks the env var to determine if the customer has opted-in to fetching step functions tags"""
99+
return os.environ.get("DD_FETCH_STEP_FUNCTIONS_TAGS", "false").lower() == "true"
100+
101+
102102
def get_last_modified_time(s3_file):
103103
last_modified_str = s3_file["ResponseMetadata"]["HTTPHeaders"]["last-modified"]
104104
last_modified_date = datetime.datetime.strptime(
@@ -108,8 +108,7 @@ def get_last_modified_time(s3_file):
108108
return last_modified_unix_time
109109

110110

111-
class LambdaTagsCache(object):
112-
111+
class BaseTagsCache(object):
113112
CACHE_FILENAME = None
114113
CACHE_LOCK_FILENAME = None
115114

@@ -233,10 +232,6 @@ def build_tags_cache(self):
233232
raise Exception("BUILD TAGS MUST BE DEFINED FOR TAGS CACHES")
234233

235234

236-
######################
237-
# Lambda Custom Tags #
238-
######################
239-
240235
resource_tagging_client = boto3.client("resourcegroupstaggingapi")
241236
GET_RESOURCES_LAMBDA_FILTER = "lambda"
242237

@@ -289,159 +284,3 @@ def parse_get_resources_response_for_tags_by_arn(get_resources_page):
289284
tags_by_arn[lowercase_function_arn] += tags
290285

291286
return tags_by_arn
292-
293-
294-
class LambdaCustomTagsCache(LambdaTagsCache):
295-
CACHE_FILENAME = DD_S3_CACHE_FILENAME
296-
CACHE_LOCK_FILENAME = DD_S3_CACHE_LOCK_FILENAME
297-
298-
def should_fetch_tags(self):
299-
return should_fetch_lambda_tags()
300-
301-
def build_tags_cache(self):
302-
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions
303-
304-
Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true
305-
306-
Returns:
307-
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
308-
"""
309-
tags_fetch_success = False
310-
tags_by_arn_cache = {}
311-
get_resources_paginator = resource_tagging_client.get_paginator("get_resources")
312-
313-
try:
314-
for page in get_resources_paginator.paginate(
315-
ResourceTypeFilters=[GET_RESOURCES_LAMBDA_FILTER], ResourcesPerPage=100
316-
):
317-
send_forwarder_internal_metrics("get_resources_api_calls")
318-
page_tags_by_arn = parse_get_resources_response_for_tags_by_arn(page)
319-
tags_by_arn_cache.update(page_tags_by_arn)
320-
tags_fetch_success = True
321-
322-
except ClientError as e:
323-
logger.exception(
324-
"Encountered a ClientError when trying to fetch tags. You may need to give "
325-
"this Lambda's role the 'tag:GetResources' permission"
326-
)
327-
additional_tags = [
328-
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
329-
]
330-
send_forwarder_internal_metrics(
331-
"client_error", additional_tags=additional_tags
332-
)
333-
tags_fetch_success = False
334-
335-
logger.debug(
336-
"Built this tags cache from GetResources API calls: %s", tags_by_arn_cache
337-
)
338-
339-
return tags_fetch_success, tags_by_arn_cache
340-
341-
def get(self, key):
342-
"""Get the tags for the Lambda function from the cache
343-
344-
Will refetch the tags if they are out of date, or a lambda arn is encountered
345-
which isn't in the tag list
346-
347-
Note: the ARNs in the cache have been lowercased, so resource_arn must be lowercased
348-
349-
Args:
350-
key (str): the key we're getting tags from the cache for
351-
352-
Returns:
353-
lambda_tags (str[]): the list of "key:value" Datadog tag strings
354-
"""
355-
if self._is_expired():
356-
send_forwarder_internal_metrics("local_cache_expired")
357-
logger.debug("Local cache expired, fetching cache from S3")
358-
self._refresh()
359-
360-
function_tags = self.tags_by_id.get(key, [])
361-
return function_tags
362-
363-
364-
#############################
365-
# Cloudwatch Log Group Tags #
366-
#############################
367-
368-
cloudwatch_logs_client = boto3.client("logs")
369-
370-
371-
def get_log_group_tags(log_group):
372-
response = None
373-
try:
374-
send_forwarder_internal_metrics("list_tags_log_group_api_call")
375-
response = cloudwatch_logs_client.list_tags_log_group(logGroupName=log_group)
376-
except Exception as e:
377-
logger.exception(f"Failed to get log group tags due to {e}")
378-
formatted_tags = None
379-
if response is not None:
380-
formatted_tags = [
381-
"{key}:{value}".format(
382-
key=sanitize_aws_tag_string(k, remove_colons=True),
383-
value=sanitize_aws_tag_string(v, remove_leading_digits=False),
384-
)
385-
if v
386-
else sanitize_aws_tag_string(k, remove_colons=True)
387-
for k, v in response["tags"].items()
388-
]
389-
return formatted_tags
390-
391-
392-
class CloudwatchLogGroupTagsCache(LambdaTagsCache):
393-
CACHE_FILENAME = DD_S3_LOG_GROUP_CACHE_FILENAME
394-
CACHE_LOCK_FILENAME = DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME
395-
396-
def should_fetch_tags(self):
397-
return should_fetch_log_group_tags()
398-
399-
def build_tags_cache(self):
400-
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions
401-
402-
Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true
403-
404-
Returns:
405-
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
406-
"""
407-
new_tags = {}
408-
for log_group in self.tags_by_id.keys():
409-
log_group_tags = get_log_group_tags(log_group)
410-
# If we didn't get back log group tags we'll use the locally cached ones if they exist
411-
# This avoids losing tags on a failed api call
412-
if log_group_tags is None:
413-
log_group_tags = self.tags_by_id.get(log_group, [])
414-
new_tags[log_group] = log_group_tags
415-
416-
logger.debug("All tags in Cloudwatch Log Groups refresh: {}".format(new_tags))
417-
return True, new_tags
418-
419-
def get(self, log_group):
420-
"""Get the tags for the Cloudwatch Log Group from the cache
421-
422-
Will refetch the tags if they are out of date, or a log group is encountered
423-
which isn't in the tag list
424-
425-
Args:
426-
key (str): the key we're getting tags from the cache for
427-
428-
Returns:
429-
log_group_tags (str[]): the list of "key:value" Datadog tag strings
430-
"""
431-
if self._is_expired():
432-
send_forwarder_internal_metrics("local_cache_expired")
433-
logger.debug("Local cache expired, fetching cache from S3")
434-
self._refresh()
435-
436-
log_group_tags = self.tags_by_id.get(log_group, None)
437-
if log_group_tags is None:
438-
# If the custom tag fetch env var is not set to true do not fetch
439-
if not self.should_fetch_tags():
440-
logger.debug(
441-
"Not fetching custom tags because the env variable DD_FETCH_LAMBDA_TAGS is not set to true"
442-
)
443-
return []
444-
log_group_tags = get_log_group_tags(log_group) or []
445-
self.tags_by_id[log_group] = log_group_tags
446-
447-
return log_group_tags
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import boto3
2+
3+
from base_tags_cache import (
4+
BaseTagsCache,
5+
logger,
6+
sanitize_aws_tag_string,
7+
send_forwarder_internal_metrics,
8+
should_fetch_log_group_tags,
9+
)
10+
from settings import (
11+
DD_S3_LOG_GROUP_CACHE_FILENAME,
12+
DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME,
13+
)
14+
15+
16+
class CloudwatchLogGroupTagsCache(BaseTagsCache):
17+
CACHE_FILENAME = DD_S3_LOG_GROUP_CACHE_FILENAME
18+
CACHE_LOCK_FILENAME = DD_S3_LOG_GROUP_CACHE_LOCK_FILENAME
19+
20+
def should_fetch_tags(self):
21+
return should_fetch_log_group_tags()
22+
23+
def build_tags_cache(self):
24+
"""Makes API calls to GetResources to get the live tags of the account's Lambda functions
25+
26+
Returns an empty dict instead of fetching custom tags if the tag fetch env variable is not set to true
27+
28+
Returns:
29+
tags_by_arn_cache (dict<str, str[]>): each Lambda's tags in a dict keyed by ARN
30+
"""
31+
new_tags = {}
32+
for log_group in self.tags_by_id.keys():
33+
log_group_tags = get_log_group_tags(log_group)
34+
# If we didn't get back log group tags we'll use the locally cached ones if they exist
35+
# This avoids losing tags on a failed api call
36+
if log_group_tags is None:
37+
log_group_tags = self.tags_by_id.get(log_group, [])
38+
new_tags[log_group] = log_group_tags
39+
40+
logger.debug("All tags in Cloudwatch Log Groups refresh: {}".format(new_tags))
41+
return True, new_tags
42+
43+
def get(self, log_group):
44+
"""Get the tags for the Cloudwatch Log Group from the cache
45+
46+
Will refetch the tags if they are out of date, or a log group is encountered
47+
which isn't in the tag list
48+
49+
Args:
50+
key (str): the key we're getting tags from the cache for
51+
52+
Returns:
53+
log_group_tags (str[]): the list of "key:value" Datadog tag strings
54+
"""
55+
if self._is_expired():
56+
send_forwarder_internal_metrics("local_cache_expired")
57+
logger.debug("Local cache expired, fetching cache from S3")
58+
self._refresh()
59+
60+
log_group_tags = self.tags_by_id.get(log_group, None)
61+
if log_group_tags is None:
62+
# If the custom tag fetch env var is not set to true do not fetch
63+
if not self.should_fetch_tags():
64+
logger.debug(
65+
"Not fetching custom tags because the env variable DD_FETCH_LOG_GROUP_TAGS is "
66+
"not set to true"
67+
)
68+
return []
69+
log_group_tags = get_log_group_tags(log_group) or []
70+
self.tags_by_id[log_group] = log_group_tags
71+
72+
return log_group_tags
73+
74+
75+
def get_log_group_tags(log_group):
76+
response = None
77+
try:
78+
send_forwarder_internal_metrics("list_tags_log_group_api_call")
79+
response = cloudwatch_logs_client.list_tags_log_group(logGroupName=log_group)
80+
except Exception as e:
81+
logger.exception(f"Failed to get log group tags due to {e}")
82+
formatted_tags = None
83+
if response is not None:
84+
formatted_tags = [
85+
"{key}:{value}".format(
86+
key=sanitize_aws_tag_string(k, remove_colons=True),
87+
value=sanitize_aws_tag_string(v, remove_leading_digits=False),
88+
)
89+
if v
90+
else sanitize_aws_tag_string(k, remove_colons=True)
91+
for k, v in response["tags"].items()
92+
]
93+
return formatted_tags
94+
95+
96+
cloudwatch_logs_client = boto3.client("logs")

aws/logs_monitoring/enhanced_lambda_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from time import time
1111

12-
from cache import LambdaCustomTagsCache
12+
from lambda_cache import LambdaTagsCache
1313

1414
ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
1515

@@ -86,7 +86,7 @@
8686

8787
# Store the cache in the global scope so that it will be reused as long as
8888
# the log forwarder Lambda container is running
89-
account_lambda_custom_tags_cache = LambdaCustomTagsCache()
89+
account_lambda_custom_tags_cache = LambdaTagsCache()
9090

9191

9292
class DatadogMetricPoint(object):

0 commit comments

Comments
 (0)