From 5dbf8d1103456e3b300c1a83527011e8159ae198 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 11:18:48 -0400 Subject: [PATCH 01/23] X-ray --- aws/logs_monitoring/lambda_function.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 90218c0e5..4527ce8f5 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -24,6 +24,10 @@ from datadog import api from trace_forwarder.connection import TraceConnection +from aws_xray_sdk.core import xray_recorder +from aws_xray_sdk.core import patch_all +patch_all() + log = logging.getLogger() log.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) @@ -556,19 +560,29 @@ def datadog_forwarder(event, context): if log.isEnabledFor(logging.DEBUG): log.debug(f"Received Event:{json.dumps(event)}") + xray_recorder.begin_subsegment('initial parsing') metrics, logs, traces = split(enrich(parse(event, context))) + xray_recorder.end_subsegment() if DD_FORWARD_LOG: + xray_recorder.begin_subsegment('forward logs') forward_logs(filter_logs(map(json.dumps, logs))) + xray_recorder.end_subsegment() if DD_FORWARD_METRIC: + xray_recorder.begin_subsegment('forward metrics') forward_metrics(metrics) + xray_recorder.end_subsegment() if DD_FORWARD_TRACES and len(traces) > 0: + xray_recorder.begin_subsegment('forward traces') forward_traces(traces) + xray_recorder.end_subsegment() if IS_ENHANCED_METRICS_FILE_PRESENT and DD_FORWARD_METRIC: + xray_recorder.begin_subsegment('enhanced metrics') parse_and_submit_enhanced_metrics(logs) + xray_recorder.end_subsegment() if DD_FORWARD_METRIC or DD_FORWARD_TRACES: From e13d1af49ac2e9373383ad5c28f62ad93041e718 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 17:57:59 -0400 Subject: [PATCH 02/23] Started --- aws/logs_monitoring/lambda_function.py | 140 +++++++++--------- .../tests/test_lambda_function.py | 37 +++++ .../trace_forwarder/cmd/trace/main.go | 2 +- .../trace_forwarder/connection.py | 5 +- 4 files changed, 115 insertions(+), 69 deletions(-) create mode 100644 aws/logs_monitoring/tests/test_lambda_function.py diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 4527ce8f5..b6f2dcafa 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -9,6 +9,7 @@ import gzip import json import os +from collections import defaultdict import boto3 import itertools @@ -26,6 +27,7 @@ from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.core import patch_all + patch_all() log = logging.getLogger() @@ -46,23 +48,6 @@ # of requests is removed in botocore 1.13.x. from botocore.vendored import requests -try: - from enhanced_lambda_metrics import ( - get_enriched_lambda_log_tags, - parse_and_submit_enhanced_metrics, - ) - - IS_ENHANCED_METRICS_FILE_PRESENT = True -except ImportError: - IS_ENHANCED_METRICS_FILE_PRESENT = False - log.warn( - "Could not import from enhanced_lambda_metrics so enhanced metrics " - "will not be submitted. Ensure you've included the enhanced_lambda_metrics " - "file in your Lambda project." - ) -finally: - log.debug(f"IS_ENHANCED_METRICS_FILE_PRESENT: {IS_ENHANCED_METRICS_FILE_PRESENT}") - def get_env_var(envvar, default, boolean=False): """ @@ -478,39 +463,39 @@ def __exit__(self, ex_type, ex_value, traceback): class DatadogBatcher(object): - def __init__(self, max_log_size_bytes, max_size_bytes, max_size_count): - self._max_log_size_bytes = max_log_size_bytes - self._max_size_bytes = max_size_bytes - self._max_size_count = max_size_count + def __init__(self, max_item_size_bytes, max_batch_size_bytes, max_items_count): + self._max_item_size_bytes = max_item_size_bytes + self._max_batch_size_bytes = max_batch_size_bytes + self._max_items_count = max_items_count - def _sizeof_bytes(self, log): - return len(log.encode("UTF-8")) + def _sizeof_bytes(self, item): + return len(item.encode("UTF-8")) - def batch(self, logs): + def batch(self, items): """ Returns an array of batches. - Each batch contains at most max_size_count logs and - is not strictly greater than max_size_bytes. - All logs strictly greater than max_log_size_bytes are dropped. + Each batch contains at most max_items_count items and + is not strictly greater than max_batch_size_bytes. + All items strictly greater than max_item_size_bytes are dropped. """ batches = [] batch = [] size_bytes = 0 size_count = 0 - for log in logs: - log_size_bytes = self._sizeof_bytes(log) + for item in items: + item_size_bytes = self._sizeof_bytes(item) if size_count > 0 and ( - size_count >= self._max_size_count - or size_bytes + log_size_bytes > self._max_size_bytes + size_count >= self._max_items_count + or size_bytes + item_size_bytes > self._max_batch_size_bytes ): batches.append(batch) batch = [] size_bytes = 0 size_count = 0 - # all logs exceeding max_log_size_bytes are dropped here - if log_size_bytes <= self._max_log_size_bytes: - batch.append(log) - size_bytes += log_size_bytes + # all items exceeding max_item_size_bytes are dropped here + if item_size_bytes <= self._max_item_size_bytes: + batch.append(item) + size_bytes += item_size_bytes size_count += 1 if size_count > 0: batches.append(batch) @@ -560,27 +545,27 @@ def datadog_forwarder(event, context): if log.isEnabledFor(logging.DEBUG): log.debug(f"Received Event:{json.dumps(event)}") - xray_recorder.begin_subsegment('initial parsing') - metrics, logs, traces = split(enrich(parse(event, context))) + xray_recorder.begin_subsegment("initial parsing") + metrics, logs, trace_payloads = split(enrich(parse(event, context))) xray_recorder.end_subsegment() if DD_FORWARD_LOG: - xray_recorder.begin_subsegment('forward logs') + xray_recorder.begin_subsegment("forward logs") forward_logs(filter_logs(map(json.dumps, logs))) xray_recorder.end_subsegment() if DD_FORWARD_METRIC: - xray_recorder.begin_subsegment('forward metrics') + xray_recorder.begin_subsegment("forward metrics") forward_metrics(metrics) xray_recorder.end_subsegment() if DD_FORWARD_TRACES and len(traces) > 0: - xray_recorder.begin_subsegment('forward traces') - forward_traces(traces) + xray_recorder.begin_subsegment("forward traces") + forward_traces(trace_payloads) xray_recorder.end_subsegment() - if IS_ENHANCED_METRICS_FILE_PRESENT and DD_FORWARD_METRIC: - xray_recorder.begin_subsegment('enhanced metrics') + if DD_FORWARD_METRIC: + xray_recorder.begin_subsegment("enhanced metrics") parse_and_submit_enhanced_metrics(logs) xray_recorder.end_subsegment() @@ -681,18 +666,17 @@ def add_metadata_to_lambda_log(event): tags = ["functionname:{}".format(function_name)] # Add any enhanced tags from metadata - if IS_ENHANCED_METRICS_FILE_PRESENT: - custom_lambda_tags = get_enriched_lambda_log_tags(event) + custom_lambda_tags = get_enriched_lambda_log_tags(event) - # Check if one of the Lambda's custom tags is env - # If an env tag exists, remove the env:none placeholder - custom_env_tag = next( - (tag for tag in custom_lambda_tags if tag.startswith("env:")), None - ) - if custom_env_tag is not None: - event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "") + # Check if one of the Lambda's custom tags is env + # If an env tag exists, remove the env:none placeholder + custom_env_tag = next( + (tag for tag in custom_lambda_tags if tag.startswith("env:")), None + ) + if custom_env_tag is not None: + event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "") - tags += custom_lambda_tags + tags += custom_lambda_tags # Dedup tags, so we don't end up with functionname twice tags = list(set(tags)) @@ -730,8 +714,8 @@ def generate_metadata(context): return metadata -def extract_trace(event): - """Extract traces from an event if possible""" +def extract_trace_payload(event): + """Extract trace payload from an event if possible""" try: message = event["message"] obj = json.loads(event["message"]) @@ -759,19 +743,19 @@ def extract_metric(event): def split(events): - """Split events into metrics, logs, and traces + """Split events into metrics, logs, and trace payloads """ - metrics, logs, traces = [], [], [] + metrics, logs, trace_payloads = [], [], [] for event in events: metric = extract_metric(event) - trace = extract_trace(event) + trace_payload = extract_trace_payload(event) if metric and DD_FORWARD_METRIC: metrics.append(metric) - elif trace: - traces.append(trace) + elif trace_payload: + trace_payloads.append(trace_payload) else: logs.append(event) - return metrics, logs, traces + return metrics, logs, trace_payloads # should only be called when INCLUDE_AT_MATCH and/or EXCLUDE_AT_MATCH exist @@ -819,15 +803,39 @@ def forward_metrics(metrics): log.debug(f"Forwarded metric: {json.dumps(metric)}") -def forward_traces(traces): - for trace in traces: +def forward_traces(trace_payloads): + batched_payloads = batch_trace_payloads(trace_payloads) + + for payload in batched_payloads: try: - trace_connection.send_trace(trace["message"], trace["tags"]) + trace_connection.send_traces(payload["message"], payload["tags"]) except Exception: - log.exception(f"Exception while forwarding trace {json.dumps(trace)}") + log.exception(f"Exception while forwarding traces {json.dumps(payload)}") else: if log.isEnabledFor(logging.DEBUG): - log.debug(f"Forwarded trace: {json.dumps(trace)}") + log.debug(f"Forwarded traces: {json.dumps(payload)}") + + +def batch_trace_payloads(trace_payloads): + """ + To reduce the number of API calls, batch traces that have the same tags + """ + traces_grouped_by_tags = defaultdict(List) + for trace_payload in trace_payloads: + tags = trace_payload["tags"] + traces = json.parse(trace_payload["message"])["traces"] + traces_grouped_by_tags[tags] += traces + + batched_trace_payloads = [] + batcher = DatadogBatcher(256 * 1000, 2 * 1000 * 1000, 200) + for tags, traces in traces_grouped_by_tags.items(): + batches = batcher.batch(traces) + for batch in batches: + batched_trace_payloads.append( + {tags: tags, message: json.dumps({"traces": batch})} + ) + + return batched_trace_payloads # Utility functions diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py new file mode 100644 index 000000000..f0a35646f --- /dev/null +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -0,0 +1,37 @@ +import unittest + +from lambda_function import batch_trace_payloads + + +class TestBatchTracePayloads(unittest.TestCase): + def test_batch_trace_payloads(self): + trace_payloads = [ + {"tags": "tag1:value", "message": '{"traces":[[{"trace_id":"1"}]]}\n',}, + { + "tags": "tag1:value", + "message": '{"traces":[[{"trace_id":"2"}, {"trace_id":"3"}]]}\n', + }, + { + "tags": "tag2:value", + "message": '{"traces":[[{"trace_id":"4"}], [{"trace_id":"5"}]]}\n', + }, + ] + + batched_payloads = batch_trace_payloads(trace_payloads) + + expected_batched_payloads = [ + { + "tags": "tag1:value", + "message": '{"traces":[[{"trace_id":"1"}], [[{"trace_id":"2"}, {"trace_id":"3"}]]}\n', + }, + { + "tags": "tag2:value", + "message": '{"traces":[[{"trace_id":"4"}], [{"trace_id":"5"}]]}\n', + }, + ] + + self.assertEqual(batched_payloads, expected_batched_payloads) + + +if __name__ == "__main__": + unittest.main() diff --git a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go index 5de455ee0..9a39fc614 100644 --- a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go +++ b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go @@ -48,7 +48,7 @@ func Configure(rootURL, apiKey string) { // ForwardTrace will perform filtering and log forwarding to the trace intake // returns 0 on success, 1 on error //export ForwardTrace -func ForwardTrace(content string, tags string) int { +func ForwardTraces(content string, tags string) int { tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags) if err != nil { fmt.Printf("Couldn't forward trace: %v", err) diff --git a/aws/logs_monitoring/trace_forwarder/connection.py b/aws/logs_monitoring/trace_forwarder/connection.py index d40afffaa..f907695fa 100644 --- a/aws/logs_monitoring/trace_forwarder/connection.py +++ b/aws/logs_monitoring/trace_forwarder/connection.py @@ -23,9 +23,10 @@ def __init__(self, root_url, api_key): self.lib = cdll.LoadLibrary("{}/bin/trace-intake.so".format(dir)) self.lib.Configure(make_go_string(root_url), make_go_string(api_key)) - def send_trace(self, trace_str, tags=""): + def send_traces(self, traces_str, tags=""): had_error = ( - self.lib.ForwardTrace(make_go_string(trace_str), make_go_string(tags)) != 0 + self.lib.ForwardTraces(make_go_string(traces_str), make_go_string(tags)) + != 0 ) if had_error: raise Exception("Failed to send to trace intake") From b54d242f7c61ec73978d95405aa385dbdaff0ffa Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 18:15:37 -0400 Subject: [PATCH 03/23] Fixes --- aws/logs_monitoring/lambda_function.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index b6f2dcafa..2ae8ddd19 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -24,6 +24,10 @@ from datadog_lambda.metric import lambda_stats from datadog import api from trace_forwarder.connection import TraceConnection +from enhanced_lambda_metrics import ( + get_enriched_lambda_log_tags, + parse_and_submit_enhanced_metrics, +) from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.core import patch_all @@ -559,7 +563,7 @@ def datadog_forwarder(event, context): forward_metrics(metrics) xray_recorder.end_subsegment() - if DD_FORWARD_TRACES and len(traces) > 0: + if DD_FORWARD_TRACES and len(trace_payloads > 0): xray_recorder.begin_subsegment("forward traces") forward_traces(trace_payloads) xray_recorder.end_subsegment() @@ -820,7 +824,7 @@ def batch_trace_payloads(trace_payloads): """ To reduce the number of API calls, batch traces that have the same tags """ - traces_grouped_by_tags = defaultdict(List) + traces_grouped_by_tags = defaultdict(list) for trace_payload in trace_payloads: tags = trace_payload["tags"] traces = json.parse(trace_payload["message"])["traces"] From a98c16f252e3e9777cc5fb82e84adcebe0b18011 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 18:22:17 -0400 Subject: [PATCH 04/23] Fix dict --- aws/logs_monitoring/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 2ae8ddd19..5bbc694cd 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -836,7 +836,7 @@ def batch_trace_payloads(trace_payloads): batches = batcher.batch(traces) for batch in batches: batched_trace_payloads.append( - {tags: tags, message: json.dumps({"traces": batch})} + {"tags": tags, "message": json.dumps({"traces": batch})} ) return batched_trace_payloads From 5a2a6a79833d3bdc27f0fa341d7a38b8d228a122 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 18:32:38 -0400 Subject: [PATCH 05/23] More fixes --- .github/workflows/lambdachecks.yml | 2 +- aws/logs_monitoring/trace_forwarder/cmd/trace/main.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/lambdachecks.yml b/.github/workflows/lambdachecks.yml index 4f38905e4..ad595360e 100644 --- a/.github/workflows/lambdachecks.yml +++ b/.github/workflows/lambdachecks.yml @@ -32,7 +32,7 @@ jobs: - name: Run trace forwarder integration tests run: | ./aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh - - name: Run enhanced metric unittest + - name: Run unit tests env: AWS_DEFAULT_REGION: us-east-1 run: | diff --git a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go index 9a39fc614..1a23c2132 100644 --- a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go +++ b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go @@ -45,13 +45,13 @@ func Configure(rootURL, apiKey string) { edgeConnection = apm.CreateTraceEdgeConnection(localRootURL, localAPIKey) } -// ForwardTrace will perform filtering and log forwarding to the trace intake +// ForwardTraces will perform filtering and log forwarding to the trace intake // returns 0 on success, 1 on error -//export ForwardTrace +//export ForwardTraces func ForwardTraces(content string, tags string) int { tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags) if err != nil { - fmt.Printf("Couldn't forward trace: %v", err) + fmt.Printf("Couldn't forward traces: %v", err) return 1 } hadErr := false From ea9a3d072cb5dbda2afc6f8ffa6811f2350f537e Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 19:03:43 -0400 Subject: [PATCH 06/23] More fixes --- aws/logs_monitoring/lambda_function.py | 4 ++-- aws/logs_monitoring/tests/test_lambda_function.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 5bbc694cd..0d0cea093 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -473,7 +473,7 @@ def __init__(self, max_item_size_bytes, max_batch_size_bytes, max_items_count): self._max_items_count = max_items_count def _sizeof_bytes(self, item): - return len(item.encode("UTF-8")) + return len(str(item).encode("UTF-8")) def batch(self, items): """ @@ -827,7 +827,7 @@ def batch_trace_payloads(trace_payloads): traces_grouped_by_tags = defaultdict(list) for trace_payload in trace_payloads: tags = trace_payload["tags"] - traces = json.parse(trace_payload["message"])["traces"] + traces = json.loads(trace_payload["message"])["traces"] traces_grouped_by_tags[tags] += traces batched_trace_payloads = [] diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index f0a35646f..7eb198e72 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -2,7 +2,6 @@ from lambda_function import batch_trace_payloads - class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ @@ -22,11 +21,11 @@ def test_batch_trace_payloads(self): expected_batched_payloads = [ { "tags": "tag1:value", - "message": '{"traces":[[{"trace_id":"1"}], [[{"trace_id":"2"}, {"trace_id":"3"}]]}\n', + "message": '{"traces": [[{"trace_id": "1"}], [{"trace_id": "2"}, {"trace_id": "3"}]]}', }, { "tags": "tag2:value", - "message": '{"traces":[[{"trace_id":"4"}], [{"trace_id":"5"}]]}\n', + "message": '{"traces": [[{"trace_id": "4"}], [{"trace_id": "5"}]]}', }, ] From 340dbf6558ecb86823c5593bb0af12c37e5e2737 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 19:04:28 -0400 Subject: [PATCH 07/23] Black --- aws/logs_monitoring/tests/test_lambda_function.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 7eb198e72..fee578da2 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -2,6 +2,7 @@ from lambda_function import batch_trace_payloads + class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From 0e28d99815c43bd0742f241a5c0d93a9c788c750 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Thu, 2 Jul 2020 19:59:49 -0400 Subject: [PATCH 08/23] Fix paren --- aws/logs_monitoring/lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index 0d0cea093..fc676e236 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -563,7 +563,7 @@ def datadog_forwarder(event, context): forward_metrics(metrics) xray_recorder.end_subsegment() - if DD_FORWARD_TRACES and len(trace_payloads > 0): + if DD_FORWARD_TRACES and len(trace_payloads) > 0: xray_recorder.begin_subsegment("forward traces") forward_traces(trace_payloads) xray_recorder.end_subsegment() From d50d9ebd2335bf4398e1f890737e2707ad9eb23d Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 02:28:05 -0400 Subject: [PATCH 09/23] Combine traces in Go --- aws/logs_monitoring/lambda_function.py | 14 ------ .../trace_forwarder/cmd/trace/main.go | 45 ++++++++++++------- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/aws/logs_monitoring/lambda_function.py b/aws/logs_monitoring/lambda_function.py index fc676e236..11e814f60 100644 --- a/aws/logs_monitoring/lambda_function.py +++ b/aws/logs_monitoring/lambda_function.py @@ -29,10 +29,6 @@ parse_and_submit_enhanced_metrics, ) -from aws_xray_sdk.core import xray_recorder -from aws_xray_sdk.core import patch_all - -patch_all() log = logging.getLogger() log.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper())) @@ -549,29 +545,19 @@ def datadog_forwarder(event, context): if log.isEnabledFor(logging.DEBUG): log.debug(f"Received Event:{json.dumps(event)}") - xray_recorder.begin_subsegment("initial parsing") metrics, logs, trace_payloads = split(enrich(parse(event, context))) - xray_recorder.end_subsegment() if DD_FORWARD_LOG: - xray_recorder.begin_subsegment("forward logs") forward_logs(filter_logs(map(json.dumps, logs))) - xray_recorder.end_subsegment() if DD_FORWARD_METRIC: - xray_recorder.begin_subsegment("forward metrics") forward_metrics(metrics) - xray_recorder.end_subsegment() if DD_FORWARD_TRACES and len(trace_payloads) > 0: - xray_recorder.begin_subsegment("forward traces") forward_traces(trace_payloads) - xray_recorder.end_subsegment() if DD_FORWARD_METRIC: - xray_recorder.begin_subsegment("enhanced metrics") parse_and_submit_enhanced_metrics(logs) - xray_recorder.end_subsegment() if DD_FORWARD_METRIC or DD_FORWARD_TRACES: diff --git a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go index 1a23c2132..42143e9f3 100644 --- a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go +++ b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go @@ -14,7 +14,10 @@ import ( "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring/trace_forwarder/internal/apm" ) -import "github.com/DataDog/datadog-agent/pkg/trace/obfuscate" +import ( + "github.com/DataDog/datadog-agent/pkg/trace/obfuscate" + "github.com/DataDog/datadog-agent/pkg/trace/pb" +) var ( obfuscator *obfuscate.Obfuscator @@ -54,26 +57,38 @@ func ForwardTraces(content string, tags string) int { fmt.Printf("Couldn't forward traces: %v", err) return 1 } - hadErr := false - for _, tracePayload := range tracePayloads { + combinedPayload := combinePayloads(tracePayloads) - err = edgeConnection.SendTraces(context.Background(), tracePayload, 3) - if err != nil { - fmt.Printf("Failed to send traces with error %v\n", err) - hadErr = true - } - stats := apm.ComputeAPMStats(tracePayload) - err = edgeConnection.SendStats(context.Background(), stats, 3) - if err != nil { - fmt.Printf("Failed to send trace stats with error %v\n", err) - hadErr = true - } + err = edgeConnection.SendTraces(context.Background(), combinedPayload, 3) + if err != nil { + fmt.Printf("Failed to send traces with error %v\n", err) + return 1 } - if hadErr { + + stats := apm.ComputeAPMStats(combinedPayload) + err = edgeConnection.SendStats(context.Background(), stats, 3) + if err != nil { + fmt.Printf("Failed to send trace stats with error %v\n", err) return 1 } + return 0 } +// Combine payloads into one +// Assumes that all payloads have the same HostName and Env +func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload { + combinedPayload := &pb.TracePayload{ + HostName: tracePayloads[0].HostName, + Env: tracePayloads[0].Env, + Traces: make([]*pb.APITrace, 0), + } + for _, tracePayload := range tracePayloads { + combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...) + } + fmt.Sprintf("aggregated %d traces into single payload", len(combinedPayload.Traces)) + return combinedPayload +} + func main() {} From dc7c6f4fe53d5832a7b9b557c6a051a9dc72dd61 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 02:51:33 -0400 Subject: [PATCH 10/23] Mock datadog_lambda --- aws/logs_monitoring/tests/test_lambda_function.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index fee578da2..e860f3ebe 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -1,4 +1,8 @@ import unittest +from unittest.mock import MagicMock +import sys + +sys.modules["datadog_lambda"] = MagicMock() from lambda_function import batch_trace_payloads From 9cc07c8ae8a5901a9218054e53dd21f85db16e7c Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 03:21:10 -0400 Subject: [PATCH 11/23] Adjust mocking --- .github/workflows/lambdachecks.yml | 2 +- aws/logs_monitoring/tests/test_lambda_function.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/lambdachecks.yml b/.github/workflows/lambdachecks.yml index ad595360e..d66c7cd3e 100644 --- a/.github/workflows/lambdachecks.yml +++ b/.github/workflows/lambdachecks.yml @@ -36,5 +36,5 @@ jobs: env: AWS_DEFAULT_REGION: us-east-1 run: | - pip install boto3 + pip install boto3 mock python -m unittest discover ./aws/logs_monitoring/ diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index e860f3ebe..52235a705 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -1,8 +1,9 @@ import unittest -from unittest.mock import MagicMock +from mock import MagicMock import sys -sys.modules["datadog_lambda"] = MagicMock() +sys.modules["datadog_lambda.wrapper"] = MagicMock() +sys.modules["datadog_lambda.metric"] = MagicMock() from lambda_function import batch_trace_payloads From 129dcd1ead4f1497af769538c1588ee12faba67d Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:11:50 -0400 Subject: [PATCH 12/23] Mock datadog --- aws/logs_monitoring/tests/test_lambda_function.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 52235a705..bdc1c5c8f 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -4,6 +4,7 @@ sys.modules["datadog_lambda.wrapper"] = MagicMock() sys.modules["datadog_lambda.metric"] = MagicMock() +sys.modules["datadog"] = MagicMock() from lambda_function import batch_trace_payloads From 7766140bd1c425d61996ba3121182d44acd8cd42 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:19:50 -0400 Subject: [PATCH 13/23] Mock requests --- aws/logs_monitoring/tests/test_lambda_function.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index bdc1c5c8f..f2822aa9d 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -5,6 +5,7 @@ sys.modules["datadog_lambda.wrapper"] = MagicMock() sys.modules["datadog_lambda.metric"] = MagicMock() sys.modules["datadog"] = MagicMock() +sys.modules["requests"] = MagicMock() from lambda_function import batch_trace_payloads From cc449e61f5ec444c16a699b52de52e400deeecde Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:34:23 -0400 Subject: [PATCH 14/23] Add mock DD_API_KEY --- aws/logs_monitoring/tests/test_lambda_function.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index f2822aa9d..322b6047c 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -1,6 +1,7 @@ -import unittest -from mock import MagicMock +from mock import MagicMock, patch +import os import sys +import unittest sys.modules["datadog_lambda.wrapper"] = MagicMock() sys.modules["datadog_lambda.metric"] = MagicMock() @@ -10,6 +11,7 @@ from lambda_function import batch_trace_payloads +@patch.dict('os.environ', {'DD_API_KEY': '11111111111111111111111111111111'}): class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From f601e960d3003fe0ef137e6648a0204e1245f2c5 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:41:54 -0400 Subject: [PATCH 15/23] Fix syntax error --- aws/logs_monitoring/tests/test_lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 322b6047c..8ce385b4d 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -11,7 +11,7 @@ from lambda_function import batch_trace_payloads -@patch.dict('os.environ', {'DD_API_KEY': '11111111111111111111111111111111'}): +@patch.dict('os.environ', {'DD_API_KEY': '11111111111111111111111111111111'}) class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From d78fb131a899c043a7ddf7a5662383afc45c5d32 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:45:30 -0400 Subject: [PATCH 16/23] Black --- aws/logs_monitoring/tests/test_lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 8ce385b4d..acbcac3f1 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -11,7 +11,7 @@ from lambda_function import batch_trace_payloads -@patch.dict('os.environ', {'DD_API_KEY': '11111111111111111111111111111111'}) +@patch.dict("os.environ", {"DD_API_KEY": "11111111111111111111111111111111"}) class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From dd7f5af3378534ff1dda5006f9370f0b8b47ae3b Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 10:53:51 -0400 Subject: [PATCH 17/23] Fix env patch --- aws/logs_monitoring/tests/test_lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index acbcac3f1..ecabee0fd 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -11,7 +11,7 @@ from lambda_function import batch_trace_payloads -@patch.dict("os.environ", {"DD_API_KEY": "11111111111111111111111111111111"}) +@patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"}) class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From 41d3b24adcf8da930ee0d161fe5e9cb1b0b4cb78 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 11:08:54 -0400 Subject: [PATCH 18/23] Mock API key while importing file --- aws/logs_monitoring/tests/test_lambda_function.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index ecabee0fd..6357127c3 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -8,10 +8,12 @@ sys.modules["datadog"] = MagicMock() sys.modules["requests"] = MagicMock() +env_patch = mock.patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"}) +env_patch.start() from lambda_function import batch_trace_payloads +env_patch.stop() -@patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"}) class TestBatchTracePayloads(unittest.TestCase): def test_batch_trace_payloads(self): trace_payloads = [ From 23bd1330282ad27724e8992b260b1a08159da3db Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 11:09:07 -0400 Subject: [PATCH 19/23] Black --- aws/logs_monitoring/tests/test_lambda_function.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 6357127c3..129128527 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -8,9 +8,12 @@ sys.modules["datadog"] = MagicMock() sys.modules["requests"] = MagicMock() -env_patch = mock.patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"}) +env_patch = mock.patch.dict( + os.environ, {"DD_API_KEY": "11111111111111111111111111111111"} +) env_patch.start() from lambda_function import batch_trace_payloads + env_patch.stop() From fda01965878cffd272e4a2ef6bc334711b25e33c Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 11:12:31 -0400 Subject: [PATCH 20/23] Fix --- aws/logs_monitoring/tests/test_lambda_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index 129128527..fe94846db 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -8,7 +8,7 @@ sys.modules["datadog"] = MagicMock() sys.modules["requests"] = MagicMock() -env_patch = mock.patch.dict( +env_patch = patch.dict( os.environ, {"DD_API_KEY": "11111111111111111111111111111111"} ) env_patch.start() From 4f111ba8d10db90276fd8460a0ae1b85e9cd8063 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 11:13:45 -0400 Subject: [PATCH 21/23] Black --- aws/logs_monitoring/tests/test_lambda_function.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/aws/logs_monitoring/tests/test_lambda_function.py b/aws/logs_monitoring/tests/test_lambda_function.py index fe94846db..f9032390d 100644 --- a/aws/logs_monitoring/tests/test_lambda_function.py +++ b/aws/logs_monitoring/tests/test_lambda_function.py @@ -8,9 +8,7 @@ sys.modules["datadog"] = MagicMock() sys.modules["requests"] = MagicMock() -env_patch = patch.dict( - os.environ, {"DD_API_KEY": "11111111111111111111111111111111"} -) +env_patch = patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"}) env_patch.start() from lambda_function import batch_trace_payloads From 59513d50a4948ea1fbae2d5679814162ad47f932 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 13:17:08 -0400 Subject: [PATCH 22/23] PR Feedback --- aws/logs_monitoring/trace_forwarder/cmd/trace/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go index 42143e9f3..9f6c7432e 100644 --- a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go +++ b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go @@ -82,12 +82,11 @@ func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload { combinedPayload := &pb.TracePayload{ HostName: tracePayloads[0].HostName, Env: tracePayloads[0].Env, - Traces: make([]*pb.APITrace, 0), + Traces: make([]*pb.APITrace, len(tracePayloads)), } for _, tracePayload := range tracePayloads { combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...) } - fmt.Sprintf("aggregated %d traces into single payload", len(combinedPayload.Traces)) return combinedPayload } From e46cac51af2dc099fdb367e8555872060c3801b1 Mon Sep 17 00:00:00 2001 From: Nick Hinsch Date: Tue, 7 Jul 2020 14:28:07 -0400 Subject: [PATCH 23/23] Revert --- aws/logs_monitoring/trace_forwarder/cmd/trace/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go index 9f6c7432e..6be679a10 100644 --- a/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go +++ b/aws/logs_monitoring/trace_forwarder/cmd/trace/main.go @@ -82,7 +82,7 @@ func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload { combinedPayload := &pb.TracePayload{ HostName: tracePayloads[0].HostName, Env: tracePayloads[0].Env, - Traces: make([]*pb.APITrace, len(tracePayloads)), + Traces: make([]*pb.APITrace, 0), } for _, tracePayload := range tracePayloads { combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...)