diff --git a/aws/logs_monitoring/forwarder.py b/aws/logs_monitoring/forwarder.py index 60fe8a70b..11108c2d5 100644 --- a/aws/logs_monitoring/forwarder.py +++ b/aws/logs_monitoring/forwarder.py @@ -4,34 +4,35 @@ # Copyright 2021 Datadog, Inc. -import logging import json +import logging import os -from telemetry import send_event_metric, send_log_metric -from trace_forwarder.connection import TraceConnection -from logs.datadog_http_client import DatadogHTTPClient from logs.datadog_batcher import DatadogBatcher from logs.datadog_client import DatadogClient -from logs.datadog_tcp_client import DatadogTCPClient +from logs.datadog_http_client import DatadogHTTPClient +from logs.datadog_matcher import DatadogMatcher from logs.datadog_scrubber import DatadogScrubber -from logs.helpers import filter_logs, add_retry_tag -from retry.storage import Storage +from logs.datadog_tcp_client import DatadogTCPClient +from logs.helpers import add_retry_tag from retry.enums import RetryPrefix +from retry.storage import Storage from settings import ( DD_API_KEY, - DD_USE_TCP, + DD_FORWARD_LOG, DD_NO_SSL, - DD_SKIP_SSL_VALIDATION, - DD_URL, DD_PORT, - DD_TRACE_INTAKE_URL, - DD_FORWARD_LOG, + DD_SKIP_SSL_VALIDATION, DD_STORE_FAILED_EVENTS, - SCRUBBING_RULE_CONFIGS, - INCLUDE_AT_MATCH, + DD_TRACE_INTAKE_URL, + DD_URL, + DD_USE_TCP, EXCLUDE_AT_MATCH, + INCLUDE_AT_MATCH, + SCRUBBING_RULE_CONFIGS, ) +from telemetry import send_event_metric, send_log_metric +from trace_forwarder.connection import TraceConnection logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) @@ -83,25 +84,29 @@ def _forward_logs(self, logs, key=None): logger.debug(f"Forwarding {len(logs)} logs") scrubber = DatadogScrubber(SCRUBBING_RULE_CONFIGS) + matcher = DatadogMatcher( + include_pattern=INCLUDE_AT_MATCH, exclude_pattern=EXCLUDE_AT_MATCH + ) + logs_to_forward = [] for log in logs: if key: log = add_retry_tag(log) - # apply scrubbing rules to inner log message if exists + evaluated_log = log + + # apply scrubbing rules to inner log message if isinstance(log, dict) and log.get("message"): try: log["message"] = scrubber.scrub(log["message"]) + evaluated_log = log["message"] except Exception as e: logger.exception( f"Exception while scrubbing log message {log['message']}: {e}" ) - logs_to_forward.append(json.dumps(log, ensure_ascii=False)) - - logs_to_forward = filter_logs( - logs_to_forward, INCLUDE_AT_MATCH, EXCLUDE_AT_MATCH - ) + if matcher.match(evaluated_log): + logs_to_forward.append(json.dumps(log, ensure_ascii=False)) if DD_USE_TCP: batcher = DatadogBatcher(256 * 1000, 256 * 1000, 1) diff --git a/aws/logs_monitoring/logs/datadog_matcher.py b/aws/logs_monitoring/logs/datadog_matcher.py new file mode 100644 index 000000000..fd448cbfd --- /dev/null +++ b/aws/logs_monitoring/logs/datadog_matcher.py @@ -0,0 +1,48 @@ +# Unless explicitly stated otherwise all files in this repository are licensed +# under the Apache License Version 2.0. +# This product includes software developed at Datadog (https://www.datadoghq.com/). +# Copyright 2021 Datadog, Inc. + + +import logging +import os +import re + +from logs.exceptions import ScrubbingException +from logs.helpers import compileRegex + +logger = logging.getLogger() +logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) + + +class DatadogMatcher(object): + def __init__(self, include_pattern=None, exclude_pattern=None): + self._include_regex = None + self._exclude_regex = None + + if include_pattern is not None: + logger.debug(f"Applying include pattern: {include_pattern}") + self._include_regex = compileRegex("INCLUDE_AT_MATCH", include_pattern) + + if exclude_pattern is not None: + logger.debug(f"Applying exclude pattern: {exclude_pattern}") + self._exclude_regex = compileRegex("EXCLUDE_AT_MATCH", exclude_pattern) + + def match(self, log): + try: + if self._exclude_regex is not None and re.search( + self._exclude_regex, str(log) + ): + logger.debug("Exclude pattern matched, excluding log event") + return False + + if self._include_regex is not None and not re.search( + self._include_regex, str(log) + ): + logger.debug("Include pattern did not match, excluding log event") + return False + + return True + + except ScrubbingException: + raise Exception("could not filter the payload") diff --git a/aws/logs_monitoring/logs/helpers.py b/aws/logs_monitoring/logs/helpers.py index 2dee129f6..a6ed14ede 100644 --- a/aws/logs_monitoring/logs/helpers.py +++ b/aws/logs_monitoring/logs/helpers.py @@ -12,47 +12,10 @@ from settings import DD_CUSTOM_TAGS, DD_RETRY_KEYWORD -from logs.exceptions import ScrubbingException - logger = logging.getLogger() logger.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) -def filter_logs(logs, include_pattern=None, exclude_pattern=None): - """ - Applies log filtering rules. - If no filtering rules exist, return all the logs. - """ - if include_pattern is None and exclude_pattern is None: - return logs - - logger.debug(f"Applying exclude pattern: {exclude_pattern}") - exclude_regex = compileRegex("EXCLUDE_AT_MATCH", exclude_pattern) - - logger.debug(f"Applying include pattern: {include_pattern}") - include_regex = compileRegex("INCLUDE_AT_MATCH", include_pattern) - - # Add logs that should be sent to logs_to_send - logs_to_send = [] - - for log in logs: - try: - if exclude_regex is not None and re.search(exclude_regex, log): - logger.debug("Exclude pattern matched, excluding log event") - continue - - if include_regex is not None and not re.search(include_regex, log): - logger.debug("Include pattern did not match, excluding log event") - continue - - logs_to_send.append(log) - - except ScrubbingException: - raise Exception("could not filter the payload") - - return logs_to_send - - def compress_logs(batch, level): if level < 0: compression_level = 0 diff --git a/aws/logs_monitoring/tests/test_logs.py b/aws/logs_monitoring/tests/test_logs.py index d245b4af7..09487b8e8 100644 --- a/aws/logs_monitoring/tests/test_logs.py +++ b/aws/logs_monitoring/tests/test_logs.py @@ -1,12 +1,12 @@ -import unittest import os import sys -from importlib import reload +import unittest import unittest.mock +from importlib import reload -from logs.datadog_scrubber import DatadogScrubber from logs.datadog_batcher import DatadogBatcher -from logs.helpers import filter_logs +from logs.datadog_matcher import DatadogMatcher +from logs.datadog_scrubber import DatadogScrubber class TestScrubLogs(unittest.TestCase): @@ -65,10 +65,13 @@ class TestFilterLogs(unittest.TestCase): "This is not a REPORT log", "END RequestId: ...", "REPORT RequestId: ...", + {"message": "It should work"}, ] def test_include_at_match(self): - filtered_logs = filter_logs(self.example_logs, include_pattern=r"^(START|END)") + filtered_logs = filter_logs( + DatadogMatcher(include_pattern="^(START|END)"), self.example_logs + ) self.assertEqual( filtered_logs, @@ -79,19 +82,23 @@ def test_include_at_match(self): ) def test_exclude_at_match(self): - filtered_logs = filter_logs(self.example_logs, exclude_pattern=r"^(START|END)") + filtered_logs = filter_logs( + DatadogMatcher(exclude_pattern="^(START|END)"), self.example_logs + ) self.assertEqual( filtered_logs, [ "This is not a REPORT log", "REPORT RequestId: ...", + {"message": "It should work"}, ], ) def test_exclude_overrides_include(self): filtered_logs = filter_logs( - self.example_logs, include_pattern=r"^(START|END)", exclude_pattern=r"^END" + DatadogMatcher(include_pattern="^(START|END)", exclude_pattern="^END"), + self.example_logs, ) self.assertEqual( @@ -102,9 +109,19 @@ def test_exclude_overrides_include(self): ) def test_no_filtering_rules(self): - filtered_logs = filter_logs(self.example_logs) + filtered_logs = filter_logs(DatadogMatcher(), self.example_logs) self.assertEqual(filtered_logs, self.example_logs) +def filter_logs(matcher, logs): + filtered = [] + + for log in logs: + if matcher.match(log): + filtered.append(log) + + return filtered + + if __name__ == "__main__": unittest.main()