Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions aws/logs_monitoring/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,35 @@
# Copyright 2021 Datadog, Inc.


import logging
import json
import logging
import os

from telemetry import send_event_metric, send_log_metric
from trace_forwarder.connection import TraceConnection
from logs.datadog_http_client import DatadogHTTPClient
from logs.datadog_batcher import DatadogBatcher
from logs.datadog_client import DatadogClient
from logs.datadog_tcp_client import DatadogTCPClient
from logs.datadog_http_client import DatadogHTTPClient
from logs.datadog_matcher import DatadogMatcher
from logs.datadog_scrubber import DatadogScrubber
from logs.helpers import filter_logs, add_retry_tag
from retry.storage import Storage
from logs.datadog_tcp_client import DatadogTCPClient
from logs.helpers import add_retry_tag
from retry.enums import RetryPrefix
from retry.storage import Storage
from settings import (
DD_API_KEY,
DD_USE_TCP,
DD_FORWARD_LOG,
DD_NO_SSL,
DD_SKIP_SSL_VALIDATION,
DD_URL,
DD_PORT,
DD_TRACE_INTAKE_URL,
DD_FORWARD_LOG,
DD_SKIP_SSL_VALIDATION,
DD_STORE_FAILED_EVENTS,
SCRUBBING_RULE_CONFIGS,
INCLUDE_AT_MATCH,
DD_TRACE_INTAKE_URL,
DD_URL,
DD_USE_TCP,
EXCLUDE_AT_MATCH,
INCLUDE_AT_MATCH,
SCRUBBING_RULE_CONFIGS,
)
from telemetry import send_event_metric, send_log_metric
from trace_forwarder.connection import TraceConnection

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
Expand Down Expand Up @@ -83,25 +84,29 @@ def _forward_logs(self, logs, key=None):
logger.debug(f"Forwarding {len(logs)} logs")

scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS)
matcher = DatadogMatcher(
include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH
)

logs_to_forward = []
for log in logs:
if key:
log = add_retry_tag(log)

# apply scrubbing rules to inner log message if exists
evaluated_log = log

# apply scrubbing rules to inner log message
if isinstance(log, dict) and log.get("message"):
try:
log["message"] = scrubber.scrub(log["message"])
evaluated_log = log["message"]
except Exception as e:
logger.exception(
f"Exception while scrubbing log message {log['message']}: {e}"
)

logs_to_forward.append(json.dumps(log, ensure_ascii=False))

logs_to_forward = filter_logs(
logs_to_forward, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH
)
if matcher.match(evaluated_log):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should properly catch the exception raised by the matcher here and maybe log a warn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous behavior was to catch the exception and raise a new one. I think it's better to crash in this case. Otherwise Lambda will "succeed" but with bad filtering which is hidding a problem.

logs_to_forward.append(json.dumps(log, ensure_ascii=False))

if DD_USE_TCP:
batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1)
Expand Down
48 changes: 48 additions & 0 deletions aws/logs_monitoring/logs/datadog_matcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2021 Datadog, Inc.


import logging
import os
import re

from logs.exceptions import ScrubbingException
from logs.helpers import compileRegex

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))


class DatadogMatcher(object):
def __init__(self, include_pattern=None, exclude_pattern=None):
self._include_regex = None
self._exclude_regex = None

if include_pattern is not None:
logger.debug(f"Applying include pattern: {include_pattern}")
self._include_regex = compileRegex("INCLUDE_AT_MATCH", include_pattern)

if exclude_pattern is not None:
logger.debug(f"Applying exclude pattern: {exclude_pattern}")
self._exclude_regex = compileRegex("EXCLUDE_AT_MATCH", exclude_pattern)

def match(self, log):
try:
if self._exclude_regex is not None and re.search(
self._exclude_regex, str(log)
):
logger.debug("Exclude pattern matched, excluding log event")
return False

if self._include_regex is not None and not re.search(
self._include_regex, str(log)
):
logger.debug("Include pattern did not match, excluding log event")
return False

return True

except ScrubbingException:
raise Exception("could not filter the payload")
37 changes: 0 additions & 37 deletions aws/logs_monitoring/logs/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,10 @@

from settings import DD_CUSTOM_TAGS, DD_RETRY_KEYWORD

from logs.exceptions import ScrubbingException

logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))


def filter_logs(logs, include_pattern=None, exclude_pattern=None):
"""
Applies log filtering rules.
If no filtering rules exist, return all the logs.
"""
if include_pattern is None and exclude_pattern is None:
return logs

logger.debug(f"Applying exclude pattern: {exclude_pattern}")
exclude_regex = compileRegex("EXCLUDE_AT_MATCH", exclude_pattern)

logger.debug(f"Applying include pattern: {include_pattern}")
include_regex = compileRegex("INCLUDE_AT_MATCH", include_pattern)

# Add logs that should be sent to logs_to_send
logs_to_send = []

for log in logs:
try:
if exclude_regex is not None and re.search(exclude_regex, log):
logger.debug("Exclude pattern matched, excluding log event")
continue

if include_regex is not None and not re.search(include_regex, log):
logger.debug("Include pattern did not match, excluding log event")
continue

logs_to_send.append(log)

except ScrubbingException:
raise Exception("could not filter the payload")

return logs_to_send


def compress_logs(batch, level):
if level < 0:
compression_level = 0
Expand Down
33 changes: 25 additions & 8 deletions aws/logs_monitoring/tests/test_logs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import unittest
import os
import sys
from importlib import reload
import unittest
import unittest.mock
from importlib import reload

from logs.datadog_scrubber import DatadogScrubber
from logs.datadog_batcher import DatadogBatcher
from logs.helpers import filter_logs
from logs.datadog_matcher import DatadogMatcher
from logs.datadog_scrubber import DatadogScrubber


class TestScrubLogs(unittest.TestCase):
Expand Down Expand Up @@ -65,10 +65,13 @@ class TestFilterLogs(unittest.TestCase):
"This is not a REPORT log",
"END RequestId: ...",
"REPORT RequestId: ...",
{"message": "It should work"},
]

def test_include_at_match(self):
filtered_logs = filter_logs(self.example_logs, include_pattern=r"^(START|END)")
filtered_logs = filter_logs(
DatadogMatcher(include_pattern="^(START|END)"), self.example_logs
)

self.assertEqual(
filtered_logs,
Expand All @@ -79,19 +82,23 @@ def test_include_at_match(self):
)

def test_exclude_at_match(self):
filtered_logs = filter_logs(self.example_logs, exclude_pattern=r"^(START|END)")
filtered_logs = filter_logs(
DatadogMatcher(exclude_pattern="^(START|END)"), self.example_logs
)

self.assertEqual(
filtered_logs,
[
"This is not a REPORT log",
"REPORT RequestId: ...",
{"message": "It should work"},
],
)

def test_exclude_overrides_include(self):
filtered_logs = filter_logs(
self.example_logs, include_pattern=r"^(START|END)", exclude_pattern=r"^END"
DatadogMatcher(include_pattern="^(START|END)", exclude_pattern="^END"),
self.example_logs,
)

self.assertEqual(
Expand All @@ -102,9 +109,19 @@ def test_exclude_overrides_include(self):
)

def test_no_filtering_rules(self):
filtered_logs = filter_logs(self.example_logs)
filtered_logs = filter_logs(DatadogMatcher(), self.example_logs)
self.assertEqual(filtered_logs, self.example_logs)


def filter_logs(matcher, logs):
filtered = []

for log in logs:
if matcher.match(log):
filtered.append(log)

return filtered


if __name__ == "__main__":
unittest.main()