Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions aws/logs_monitoring/caching/base_tags_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ def write_cache_to_s3(self, data):
DD_S3_BUCKET_NAME, self.get_cache_name_with_prefix()
)
s3_object.put(Body=(bytes(json.dumps(data).encode("UTF-8"))))
except ClientError:
except ClientError as e:
send_forwarder_internal_metrics("s3_cache_write_failure")
self.logger.debug("Unable to write new cache to S3", exc_info=True)
self.logger.debug(f"Unable to write new cache to S3: {e}", exc_info=True)

def acquire_s3_cache_lock(self):
"""Acquire cache lock"""
Expand All @@ -76,16 +76,16 @@ def acquire_s3_cache_lock(self):
last_modified_unix_time = get_last_modified_time(file_content)
if last_modified_unix_time + DD_S3_CACHE_LOCK_TTL_SECONDS >= time():
return False
except Exception:
self.logger.debug("Unable to get cache lock file")
except Exception as e:
self.logger.debug(f"Unable to get cache lock file: {e}")

# lock file doesn't exist, create file to acquire lock
try:
cache_lock_object.put(Body=(bytes("lock".encode("UTF-8"))))
send_forwarder_internal_metrics("s3_cache_lock_acquired")
self.logger.debug("S3 cache lock acquired")
except ClientError:
self.logger.debug("Unable to write S3 cache lock file", exc_info=True)
except ClientError as e:
self.logger.debug(f"Unable to write S3 cache lock file: {e}", exc_info=True)
return False

return True
Expand All @@ -99,9 +99,9 @@ def release_s3_cache_lock(self):
cache_lock_object.delete()
send_forwarder_internal_metrics("s3_cache_lock_released")
self.logger.debug("S3 cache lock released")
except ClientError:
except ClientError as e:
send_forwarder_internal_metrics("s3_cache_lock_release_failure")
self.logger.debug("Unable to release S3 cache lock", exc_info=True)
self.logger.debug(f"Unable to release S3 cache lock: {e}", exc_info=True)

def get_cache_from_s3(self):
"""Retrieves tags cache from s3 and returns the body along with
Expand All @@ -113,9 +113,9 @@ def get_cache_from_s3(self):
file_content = cache_object.get()
tags_cache = json.loads(file_content["Body"].read().decode("utf-8"))
last_modified_unix_time = get_last_modified_time(file_content)
except:
except Exception as e:
send_forwarder_internal_metrics("s3_cache_fetch_failure")
self.logger.debug("Unable to fetch cache from S3", exc_info=True)
self.logger.debug(f"Unable to fetch cache from S3: {e}", exc_info=True)
return {}, -1

return tags_cache, last_modified_unix_time
Expand Down
16 changes: 8 additions & 8 deletions aws/logs_monitoring/caching/cloudwatch_log_group_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ def _get_log_group_tags_from_cache(self, cache_file_name):
)
tags_cache = json.loads(response.get("Body").read().decode("utf-8"))
last_modified_unix_time = int(response.get("LastModified").timestamp())
except Exception:
except Exception as e:
send_forwarder_internal_metrics("loggroup_cache_fetch_failure")
self.logger.exception(
"Failed to get log group tags from cache", exc_info=True
self.logger.error(
f"Failed to get log group tags from cache: {e}", exc_info=True
)
return None, -1

Expand All @@ -120,10 +120,10 @@ def _update_log_group_tags_cache(self, log_group, tags):
Key=cache_file_name,
Body=(bytes(json.dumps(tags).encode("UTF-8"))),
)
except Exception:
except Exception as e:
send_forwarder_internal_metrics("loggroup_cache_write_failure")
self.logger.exception(
"Failed to update log group tags cache", exc_info=True
self.logger.error(
f"Failed to update log group tags cache: {e}", exc_info=True
)

def _is_expired(self, last_modified):
Expand All @@ -150,8 +150,8 @@ def _get_log_group_tags(self, log_group_arn):
response = self.cloudwatch_logs_client.list_tags_for_resource(
resourceArn=log_group_arn
)
except Exception:
self.logger.exception("Failed to get log group tags", exc_info=True)
except Exception as e:
self.logger.error(f"Failed to get log group tags: {e}", exc_info=True)
formatted_tags = None
if response is not None:
formatted_tags = [
Expand Down
6 changes: 3 additions & 3 deletions aws/logs_monitoring/caching/lambda_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def build_tags_cache(self):
tags_fetch_success = True

except ClientError as e:
self.logger.exception(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
self.logger.error(
f"Failed to fetch Lambda tags: {e}. "
"Add 'tag:GetResources' permission to the Forwarder's IAM role."
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down
4 changes: 2 additions & 2 deletions aws/logs_monitoring/caching/s3_tags_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def build_tags_cache(self):
tags_by_arn_cache.update(page_tags_by_arn)
tags_fetch_success = True
except ClientError as e:
self.logger.exception(
self.logger.error(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
f"this Lambda's role the 'tag:GetResources' permission: {e}"
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down
6 changes: 3 additions & 3 deletions aws/logs_monitoring/caching/step_functions_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def build_tags_cache(self):
tags_fetch_success = True

except ClientError as e:
self.logger.exception(
self.logger.error(
"Encountered a ClientError when trying to fetch tags. You may need to give "
"this Lambda's role the 'tag:GetResources' permission"
f"this Lambda's role the 'tag:GetResources' permission: {e}"
)
additional_tags = [
f"http_status_code:{e.response['ResponseMetadata']['HTTPStatusCode']}"
Expand Down Expand Up @@ -131,7 +131,7 @@ def _get_state_machine_tags(self, state_machine_arn: str):
ResourceARNList=[state_machine_arn]
)
except Exception as e:
self.logger.exception(f"Failed to get Step Functions tags due to {e}")
self.logger.error(f"Failed to get Step Functions tags due to {e}")

if response and len(response.get("ResourceTagMappingList", {})) > 0:
resource_dict = response.get("ResourceTagMappingList")[0]
Expand Down
17 changes: 9 additions & 8 deletions aws/logs_monitoring/enhanced_lambda_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.
import datetime
import json
import os
import logging
import os
import re
import datetime
from time import time

ENHANCED_METRICS_NAMESPACE_PREFIX = "aws.lambda.enhanced"
Expand Down Expand Up @@ -168,10 +168,11 @@ def parse_and_submit_enhanced_metrics(logs, cache_layer):
)
for enhanced_metric in enhanced_metrics:
enhanced_metric.submit_to_dd()
except Exception:
logger.exception(
"Encountered an error while trying to parse and submit enhanced metrics for log %s",
except Exception as e:
logger.error(
"Encountered an error while trying to parse and submit enhanced metrics for log %s: %s",
log,
str(e),
)


Expand Down Expand Up @@ -347,9 +348,9 @@ def parse_metrics_from_report_log(report_log_line):

Args:
report_log_line (str): The REPORT log generated by Lambda
EX: "REPORT RequestId: 814ba7cb-071e-4181-9a09-fa41db5bccad Duration: 1711.87 ms \
Billed Duration: 1800 ms Memory Size: 128 MB Max Memory Used: 98 MB \
XRAY TraceId: 1-5d83c0ad-b8eb33a0b1de97d804fac890 SegmentId: 31255c3b19bd3637 Sampled: true"
EX: "REPORT RequestId: 814ba7cb-071e-4181-9a09-fa41db5bccad Duration: 1711.87 ms \
Billed Duration: 1800 ms Memory Size: 128 MB Max Memory Used: 98 MB \
XRAY TraceId: 1-5d83c0ad-b8eb33a0b1de97d804fac890 SegmentId: 31255c3b19bd3637 Sampled: true"

Returns:
metrics - DatadogMetricPoint[]
Expand Down
18 changes: 9 additions & 9 deletions aws/logs_monitoring/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _forward_logs(self, logs, key=None):
log["message"] = scrubber.scrub(log["message"])
evaluated_log = log["message"]
except Exception as e:
logger.exception(
logger.error(
f"Exception while scrubbing log message {log['message']}: {e}"
)

Expand All @@ -116,8 +116,8 @@ def _forward_logs(self, logs, key=None):
for batch in batcher.batch(logs_to_forward):
try:
client.send(batch)
except Exception:
logger.exception(f"Exception while forwarding log batch {batch}")
except Exception as e:
logger.error(f"Exception while forwarding log batch {batch}: {e}")
failed_logs.extend(batch)
else:
if logger.isEnabledFor(logging.DEBUG):
Expand All @@ -142,9 +142,9 @@ def _forward_metrics(self, metrics, key=None):
for metric in metrics:
try:
send_log_metric(metric)
except Exception:
logger.exception(
f"Exception while forwarding metric {json.dumps(metric)}"
except Exception as e:
logger.error(
f"Exception while forwarding metric {json.dumps(metric)}: {e}"
)
failed_metrics.append(metric)
else:
Expand All @@ -168,9 +168,9 @@ def _forward_traces(self, traces, key=None):
try:
serialized_trace_paylods = json.dumps(traces)
self.trace_connection.send_traces(serialized_trace_paylods)
except Exception:
logger.exception(
f"Exception while forwarding traces {serialized_trace_paylods}"
except Exception as e:
logger.error(
f"Exception while forwarding traces {serialized_trace_paylods}: {e}"
)
if DD_STORE_FAILED_EVENTS and not key:
self.storage.store_data(RetryPrefix.TRACES, traces)
Expand Down
43 changes: 26 additions & 17 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,46 @@
# Copyright 2021 Datadog, Inc.

import json
import os
import boto3
import logging
import requests
import os
from hashlib import sha1

from datadog_lambda.wrapper import datadog_lambda_wrapper
import boto3
import requests
from datadog import api
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from steps.parsing import parse
from steps.enrichment import enrich
from steps.transformation import transform
from steps.splitting import split
from datadog_lambda.wrapper import datadog_lambda_wrapper

from caching.cache_layer import CacheLayer
from enhanced_lambda_metrics import parse_and_submit_enhanced_metrics
from forwarder import Forwarder
from settings import (
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_API_KEY,
DD_SKIP_SSL_VALIDATION,
DD_API_URL,
DD_FORWARDER_VERSION,
DD_ADDITIONAL_TARGET_LAMBDAS,
DD_RETRY_KEYWORD,
DD_SITE,
DD_SKIP_SSL_VALIDATION,
)
from steps.enrichment import enrich
from steps.parsing import parse
from steps.splitting import split
from steps.transformation import transform

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))

# DD_API_KEY must be set
if DD_API_KEY == "<YOUR_DATADOG_API_KEY>" or DD_API_KEY == "":
raise Exception("Missing Datadog API key")
raise Exception(
"Missing Datadog API key. Set DD_API_KEY environment variable. "
"See: https://docs.datadoghq.com/serverless/forwarder/"
)
# Check if the API key is the correct number of characters
if len(DD_API_KEY) != 32:
raise Exception(
"The API key is not the expected length. "
"Please confirm that your API key is correct"
f"Invalid Datadog API key format. Expected 32 characters, received {len(DD_API_KEY)}. "
f"Verify your API key at https://app.{DD_SITE}/organization-settings/api-keys"
)
# Validate the API key
logger.debug("Validating the Datadog API key")
Expand All @@ -57,7 +62,11 @@
timeout=10,
)
if not validation_res.ok:
raise Exception("The API key is not valid.")
raise Exception(
f"Datadog API key validation failed (HTTP {validation_res.status_code}). "
f"Verify your API key is correct and DD_SITE matches your Datadog account region (current: {DD_SITE}). "
"See: https://docs.datadoghq.com/getting_started/site/"
)

# Force the layer to use the exact same API key and host as the forwarder
api._api_key = DD_API_KEY
Expand Down Expand Up @@ -106,7 +115,7 @@ def init_cache_layer(function_prefix):
if cache_layer is None:
cache_layer = CacheLayer(function_prefix)
except Exception as e:
logger.exception(f"Failed to create cache layer due to {e}")
logger.error(f"Failed to create cache layer due to {e}")
raise


Expand Down Expand Up @@ -135,7 +144,7 @@ def invoke_additional_target_lambdas(event):
Payload=lambda_payload,
)
except Exception as e:
logger.exception(
logger.error(
f"Failed to invoke additional target lambda {lambda_arn} due to {e}"
)

Expand Down
8 changes: 4 additions & 4 deletions aws/logs_monitoring/logs/datadog_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def _close(self):
for future in as_completed(self._futures):
try:
future.result()
except Exception:
logger.exception("Exception while forwarding logs")
except Exception as e:
logger.error(f"Exception while forwarding logs: {e}")

self._session.close()

Expand All @@ -95,8 +95,8 @@ def send(self, logs):
"""
try:
data = self._scrubber.scrub("[{}]".format(",".join(logs)))
except ScrubbingException:
raise Exception("could not scrub the payload")
except ScrubbingException as e:
raise Exception(f"could not scrub the payload: {e}")
if DD_USE_COMPRESSION:
data = compress_logs(data, DD_COMPRESSION_LEVEL)

Expand Down
7 changes: 5 additions & 2 deletions aws/logs_monitoring/logs/datadog_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,8 @@ def match(self, log):

return True

except ScrubbingException:
raise Exception("could not filter the payload")
except ScrubbingException as e:
raise Exception(f"Failed to filter log: {e}")

except Exception as e:
raise Exception(f"Failed to filter log: {e}")
10 changes: 5 additions & 5 deletions aws/logs_monitoring/logs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def compileRegex(rule, pattern):
if pattern == "":
# If pattern is an empty string, raise exception
raise Exception(
"No pattern provided:\nAdd pattern or remove {} environment variable".format(
rule
)
f"Empty pattern for {rule}. Set a valid regex pattern or remove the {rule} environment variable."
)
try:
return re.compile(pattern)
except Exception:
except re.error as e:
raise Exception(
"could not compile {} regex with pattern: {}".format(rule, pattern)
f"Invalid regex pattern for {rule}: '{pattern}'. Regex error: {e}"
)
except Exception as e:
raise Exception(f"Failed to compile {rule} regex pattern '{pattern}': {e}")


def add_retry_tag(log):
Expand Down
Loading