Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/lambdachecks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ jobs:
- name: Run trace forwarder integration tests
run: |
./aws/logs_monitoring/trace_forwarder/scripts/run_tests.sh
- name: Run enhanced metric unittest
- name: Run unit tests
env:
AWS_DEFAULT_REGION: us-east-1
run: |
pip install boto3
pip install boto3 mock
python -m unittest discover ./aws/logs_monitoring/
136 changes: 74 additions & 62 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import gzip
import json
import os
from collections import defaultdict

import boto3
import itertools
Expand All @@ -23,6 +24,11 @@
from datadog_lambda.metric import lambda_stats
from datadog import api
from trace_forwarder.connection import TraceConnection
from enhanced_lambda_metrics import (
get_enriched_lambda_log_tags,
parse_and_submit_enhanced_metrics,
)


log = logging.getLogger()
log.setLevel(logging.getLevelName(os.environ.get("DD_LOG_LEVEL", "INFO").upper()))
Expand All @@ -42,23 +48,6 @@
# of requests is removed in botocore 1.13.x.
from botocore.vendored import requests

try:
from enhanced_lambda_metrics import (
get_enriched_lambda_log_tags,
parse_and_submit_enhanced_metrics,
)

IS_ENHANCED_METRICS_FILE_PRESENT = True
except ImportError:
IS_ENHANCED_METRICS_FILE_PRESENT = False
log.warn(
"Could not import from enhanced_lambda_metrics so enhanced metrics "
"will not be submitted. Ensure you've included the enhanced_lambda_metrics "
"file in your Lambda project."
)
finally:
log.debug(f"IS_ENHANCED_METRICS_FILE_PRESENT: {IS_ENHANCED_METRICS_FILE_PRESENT}")


def get_env_var(envvar, default, boolean=False):
"""
Expand Down Expand Up @@ -474,39 +463,39 @@ def __exit__(self, ex_type, ex_value, traceback):


class DatadogBatcher(object):
def __init__(self, max_log_size_bytes, max_size_bytes, max_size_count):
self._max_log_size_bytes = max_log_size_bytes
self._max_size_bytes = max_size_bytes
self._max_size_count = max_size_count
def __init__(self, max_item_size_bytes, max_batch_size_bytes, max_items_count):
self._max_item_size_bytes = max_item_size_bytes
self._max_batch_size_bytes = max_batch_size_bytes
self._max_items_count = max_items_count

def _sizeof_bytes(self, log):
return len(log.encode("UTF-8"))
def _sizeof_bytes(self, item):
return len(str(item).encode("UTF-8"))

def batch(self, logs):
def batch(self, items):
"""
Returns an array of batches.
Each batch contains at most max_size_count logs and
is not strictly greater than max_size_bytes.
All logs strictly greater than max_log_size_bytes are dropped.
Each batch contains at most max_items_count items and
is not strictly greater than max_batch_size_bytes.
All items strictly greater than max_item_size_bytes are dropped.
"""
batches = []
batch = []
size_bytes = 0
size_count = 0
for log in logs:
log_size_bytes = self._sizeof_bytes(log)
for item in items:
item_size_bytes = self._sizeof_bytes(item)
if size_count > 0 and (
size_count >= self._max_size_count
or size_bytes + log_size_bytes > self._max_size_bytes
size_count >= self._max_items_count
or size_bytes + item_size_bytes > self._max_batch_size_bytes
):
batches.append(batch)
batch = []
size_bytes = 0
size_count = 0
# all logs exceeding max_log_size_bytes are dropped here
if log_size_bytes <= self._max_log_size_bytes:
batch.append(log)
size_bytes += log_size_bytes
# all items exceeding max_item_size_bytes are dropped here
if item_size_bytes <= self._max_item_size_bytes:
batch.append(item)
size_bytes += item_size_bytes
size_count += 1
if size_count > 0:
batches.append(batch)
Expand Down Expand Up @@ -556,18 +545,18 @@ def datadog_forwarder(event, context):
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Received Event:{json.dumps(event)}")

metrics, logs, traces = split(enrich(parse(event, context)))
metrics, logs, trace_payloads = split(enrich(parse(event, context)))

if DD_FORWARD_LOG:
forward_logs(filter_logs(map(json.dumps, logs)))

if DD_FORWARD_METRIC:
forward_metrics(metrics)

if DD_FORWARD_TRACES and len(traces) > 0:
forward_traces(traces)
if DD_FORWARD_TRACES and len(trace_payloads) > 0:
forward_traces(trace_payloads)

if IS_ENHANCED_METRICS_FILE_PRESENT and DD_FORWARD_METRIC:
if DD_FORWARD_METRIC:
parse_and_submit_enhanced_metrics(logs)


Expand Down Expand Up @@ -667,18 +656,17 @@ def add_metadata_to_lambda_log(event):
tags = ["functionname:{}".format(function_name)]

# Add any enhanced tags from metadata
if IS_ENHANCED_METRICS_FILE_PRESENT:
custom_lambda_tags = get_enriched_lambda_log_tags(event)
custom_lambda_tags = get_enriched_lambda_log_tags(event)

# Check if one of the Lambda's custom tags is env
# If an env tag exists, remove the env:none placeholder
custom_env_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")
# Check if one of the Lambda's custom tags is env
# If an env tag exists, remove the env:none placeholder
custom_env_tag = next(
(tag for tag in custom_lambda_tags if tag.startswith("env:")), None
)
if custom_env_tag is not None:
event[DD_CUSTOM_TAGS] = event[DD_CUSTOM_TAGS].replace("env:none", "")

tags += custom_lambda_tags
tags += custom_lambda_tags

# Dedup tags, so we don't end up with functionname twice
tags = list(set(tags))
Expand Down Expand Up @@ -716,8 +704,8 @@ def generate_metadata(context):
return metadata


def extract_trace(event):
"""Extract traces from an event if possible"""
def extract_trace_payload(event):
"""Extract trace payload from an event if possible"""
try:
message = event["message"]
obj = json.loads(event["message"])
Expand Down Expand Up @@ -745,19 +733,19 @@ def extract_metric(event):


def split(events):
"""Split events into metrics, logs, and traces
"""Split events into metrics, logs, and trace payloads
"""
metrics, logs, traces = [], [], []
metrics, logs, trace_payloads = [], [], []
for event in events:
metric = extract_metric(event)
trace = extract_trace(event)
trace_payload = extract_trace_payload(event)
if metric and DD_FORWARD_METRIC:
metrics.append(metric)
elif trace:
traces.append(trace)
elif trace_payload:
trace_payloads.append(trace_payload)
else:
logs.append(event)
return metrics, logs, traces
return metrics, logs, trace_payloads


# should only be called when INCLUDE_AT_MATCH and/or EXCLUDE_AT_MATCH exist
Expand Down Expand Up @@ -805,15 +793,39 @@ def forward_metrics(metrics):
log.debug(f"Forwarded metric: {json.dumps(metric)}")


def forward_traces(traces):
for trace in traces:
def forward_traces(trace_payloads):
batched_payloads = batch_trace_payloads(trace_payloads)

for payload in batched_payloads:
try:
trace_connection.send_trace(trace["message"], trace["tags"])
trace_connection.send_traces(payload["message"], payload["tags"])
except Exception:
log.exception(f"Exception while forwarding trace {json.dumps(trace)}")
log.exception(f"Exception while forwarding traces {json.dumps(payload)}")
else:
if log.isEnabledFor(logging.DEBUG):
log.debug(f"Forwarded trace: {json.dumps(trace)}")
log.debug(f"Forwarded traces: {json.dumps(payload)}")


def batch_trace_payloads(trace_payloads):
"""
To reduce the number of API calls, batch traces that have the same tags
"""
traces_grouped_by_tags = defaultdict(list)
for trace_payload in trace_payloads:
tags = trace_payload["tags"]
traces = json.loads(trace_payload["message"])["traces"]
traces_grouped_by_tags[tags] += traces

batched_trace_payloads = []
batcher = DatadogBatcher(256 * 1000, 2 * 1000 * 1000, 200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these constants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are:

  • The max allowed item size in bytes (256 KB)
  • The max allowed batch size in bytes (2 MB)
  • The max number of items per batch (200)

I copy-pasted these specific numbers from the other usage of DatadogBatcher in this file, which is for batching logs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we refactor them into constant variables?

for tags, traces in traces_grouped_by_tags.items():
batches = batcher.batch(traces)
for batch in batches:
batched_trace_payloads.append(
{"tags": tags, "message": json.dumps({"traces": batch})}
)

return batched_trace_payloads


# Utility functions
Expand Down
49 changes: 49 additions & 0 deletions aws/logs_monitoring/tests/test_lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from mock import MagicMock, patch
import os
import sys
import unittest

sys.modules["datadog_lambda.wrapper"] = MagicMock()
sys.modules["datadog_lambda.metric"] = MagicMock()
sys.modules["datadog"] = MagicMock()
sys.modules["requests"] = MagicMock()

env_patch = patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"})
env_patch.start()
from lambda_function import batch_trace_payloads

env_patch.stop()


class TestBatchTracePayloads(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test

def test_batch_trace_payloads(self):
trace_payloads = [
{"tags": "tag1:value", "message": '{"traces":[[{"trace_id":"1"}]]}\n',},
{
"tags": "tag1:value",
"message": '{"traces":[[{"trace_id":"2"}, {"trace_id":"3"}]]}\n',
},
{
"tags": "tag2:value",
"message": '{"traces":[[{"trace_id":"4"}], [{"trace_id":"5"}]]}\n',
},
]

batched_payloads = batch_trace_payloads(trace_payloads)

expected_batched_payloads = [
{
"tags": "tag1:value",
"message": '{"traces": [[{"trace_id": "1"}], [{"trace_id": "2"}, {"trace_id": "3"}]]}',
},
{
"tags": "tag2:value",
"message": '{"traces": [[{"trace_id": "4"}], [{"trace_id": "5"}]]}',
},
]

self.assertEqual(batched_payloads, expected_batched_payloads)


if __name__ == "__main__":
unittest.main()
52 changes: 33 additions & 19 deletions aws/logs_monitoring/trace_forwarder/cmd/trace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring/trace_forwarder/internal/apm"
)
import "github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
import (
"github.com/DataDog/datadog-agent/pkg/trace/obfuscate"
"github.com/DataDog/datadog-agent/pkg/trace/pb"
)

var (
obfuscator *obfuscate.Obfuscator
Expand Down Expand Up @@ -45,35 +48,46 @@ func Configure(rootURL, apiKey string) {
edgeConnection = apm.CreateTraceEdgeConnection(localRootURL, localAPIKey)
}

// ForwardTrace will perform filtering and log forwarding to the trace intake
// ForwardTraces will perform filtering and log forwarding to the trace intake
// returns 0 on success, 1 on error
//export ForwardTrace
func ForwardTrace(content string, tags string) int {
//export ForwardTraces
func ForwardTraces(content string, tags string) int {
tracePayloads, err := apm.ProcessTrace(content, obfuscator, tags)
if err != nil {
fmt.Printf("Couldn't forward trace: %v", err)
fmt.Printf("Couldn't forward traces: %v", err)
return 1
}
hadErr := false

for _, tracePayload := range tracePayloads {
combinedPayload := combinePayloads(tracePayloads)

err = edgeConnection.SendTraces(context.Background(), tracePayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
hadErr = true
}
stats := apm.ComputeAPMStats(tracePayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
hadErr = true
}
err = edgeConnection.SendTraces(context.Background(), combinedPayload, 3)
if err != nil {
fmt.Printf("Failed to send traces with error %v\n", err)
return 1
}
if hadErr {

stats := apm.ComputeAPMStats(combinedPayload)
err = edgeConnection.SendStats(context.Background(), stats, 3)
if err != nil {
fmt.Printf("Failed to send trace stats with error %v\n", err)
return 1
}

return 0
}

// Combine payloads into one
// Assumes that all payloads have the same HostName and Env
func combinePayloads(tracePayloads []*pb.TracePayload) *pb.TracePayload {
combinedPayload := &pb.TracePayload{
HostName: tracePayloads[0].HostName,
Env: tracePayloads[0].Env,
Traces: make([]*pb.APITrace, 0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we already know the size this slice will be? Maybe we can pre-allocate

}
for _, tracePayload := range tracePayloads {
combinedPayload.Traces = append(combinedPayload.Traces, tracePayload.Traces...)
}
return combinedPayload
}

func main() {}
5 changes: 3 additions & 2 deletions aws/logs_monitoring/trace_forwarder/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ def __init__(self, root_url, api_key):
self.lib = cdll.LoadLibrary("{}/bin/trace-intake.so".format(dir))
self.lib.Configure(make_go_string(root_url), make_go_string(api_key))

def send_trace(self, trace_str, tags=""):
def send_traces(self, traces_str, tags=""):
had_error = (
self.lib.ForwardTrace(make_go_string(trace_str), make_go_string(tags)) != 0
self.lib.ForwardTraces(make_go_string(traces_str), make_go_string(tags))
!= 0
)
if had_error:
raise Exception("Failed to send to trace intake")