Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
*.py[cod]

# Layer zips
.layers

# C extensions
*.so

Expand Down Expand Up @@ -28,6 +31,7 @@ pip-log.txt
nosetests.xml

#Misc
.cache/
.DS_Store
.eggs/
.env/
Expand Down
18 changes: 18 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
ARG image
FROM $image

ARG runtime

# Create the directory structure required for AWS Lambda Layer
RUN mkdir -p /build/python/lib/$runtime/site-packages
WORKDIR /build

# Install dependencies
COPY requirements.txt requirements.txt
RUN pip install -r requirements.txt -t ./python/lib/$runtime/site-packages

# Install datadog_lambda
COPY datadog_lambda ./python/lib/$runtime/site-packages

# Remove *.pyc files
RUN find ./python/lib/$runtime/site-packages -name \*.pyc -delete
1 change: 1 addition & 0 deletions datadog_lambda/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = "1"
20 changes: 20 additions & 0 deletions datadog_lambda/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Datadog trace sampling priority
class SamplingPriority(object):
USER_REJECT = -1
AUTO_REJECT = 0
AUTO_KEEP = 1
USER_KEEP = 2


# Datadog trace headers
class TraceHeader(object):
TRACE_ID = 'x-datadog-trace-id'
PARENT_ID = 'x-datadog-parent-id'
SAMPLING_PRIORITY = 'x-datadog-sampling-priority'


# X-Ray subsegment to save Datadog trace metadata
class XraySubsegment(object):
NAME = 'datadog-metadata'
KEY = 'trace'
NAMESPACE = 'datadog'
1 change: 1 addition & 0 deletions datadog_lambda/datadog_lambda
60 changes: 60 additions & 0 deletions datadog_lambda/metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import sys

from datadog import api
from datadog.threadstats import ThreadStats
from datadog_lambda import __version__

lambda_stats = ThreadStats()
lambda_stats.start()


def _format_dd_lambda_layer_tag():
"""
Formats the dd_lambda_layer tag, e.g., 'dd_lambda_layer:datadog-python27_1'
"""
runtime = "python{}{}".format(sys.version_info[0], sys.version_info[1])
return "dd_lambda_layer:datadog-{}_{}".format(runtime, __version__)


def _tag_dd_lambda_layer(args, kwargs):
"""
Used by lambda_metric to insert the dd_lambda_layer tag
"""
dd_lambda_layer_tag = _format_dd_lambda_layer_tag()
if 'tags' in kwargs:
kwargs['tags'].append(dd_lambda_layer_tag)
elif len(args) >= 4:
args[3].append(dd_lambda_layer_tag)
else:
kwargs['tags'] = [dd_lambda_layer_tag]


def lambda_metric(*args, **kwargs):
"""
Submit a data point to Datadog distribution metrics.
https://docs.datadoghq.com/graphing/metrics/distributions/
"""
_tag_dd_lambda_layer(args, kwargs)
lambda_stats.distribution(*args, **kwargs)


def init_api_client():
"""
No-op GET to initialize the requests connection with DD's endpoints,
to make the final flush faster.

We keep alive the Requests session, this means that we can re-use
the connection. The consequence is that the HTTP Handshake, which
can take hundreds of ms, is now made at the beginning of a lambda
instead of at the end.

By making the initial request async, we spare a lot of execution
time in the lambdas.
"""
api._api_key = os.environ.get('DATADOG_API_KEY')
api._api_host = os.environ.get('DATADOG_HOST', 'https://api.datadoghq.com')
try:
api.api_client.APIClient.submit('GET', 'validate')
except Exception:
pass
105 changes: 105 additions & 0 deletions datadog_lambda/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import sys

from wrapt import wrap_function_wrapper as wrap

from datadog_lambda.tracing import get_dd_trace_context

if sys.version_info >= (3, 0, 0):
httplib_module = 'http.client'
else:
httplib_module = 'httplib'

_httplib_patched = False
_requests_patched = False
_botocore_requests_patched = False


def patch_all():
"""
Patch the widely-used HTTP clients to automatically inject
Datadog trace context.
"""
_patch_httplib()
_patch_requests()
_patch_botocore_requests()


def _patch_httplib():
"""
Patch the Python built-in `httplib` (Python 2) or
`http.client` (Python 3) module.
"""
global _httplib_patched
if not _httplib_patched:
_httplib_patched = True
wrap(
httplib_module,
'HTTPConnection.request',
_wrap_httplib_request
)


def _patch_requests():
"""
Patch the high-level HTTP client module `requests`
if it's installed.
"""
global _requests_patched
if not _requests_patched:
_requests_patched = True
try:
wrap(
'requests',
'Session.request',
_wrap_requests_request
)
except ImportError:
pass


def _patch_botocore_requests():
"""
Patch the `requests` module that is packaged into `botocore`.
https://stackoverflow.com/questions/40741282/cannot-use-requests-module-on-aws-lambda
"""
global _botocore_requests_patched
if not _botocore_requests_patched:
_botocore_requests_patched = True
try:
wrap(
'botocore.vendored.requests',
'Session.request',
_wrap_requests_request
)
except ImportError:
pass


def _wrap_requests_request(func, instance, args, kwargs):
"""
Wrap `requests.Session.request` to inject the Datadog trace headers
into the outgoing requests.
"""
context = get_dd_trace_context()
if 'headers' in kwargs:
kwargs['headers'].update(context)
elif len(args) >= 5:
args[4].update(context)
else:
kwargs['headers'] = context
return func(*args, **kwargs)


def _wrap_httplib_request(func, instance, args, kwargs):
"""
Wrap `httplib` (python2) or `http.client` (python3) to inject
the Datadog trace headers into the outgoing requests.
"""
context = get_dd_trace_context()
if 'headers' in kwargs:
kwargs['headers'].update(context)
elif len(args) >= 4:
args[3].update(context)
else:
kwargs['headers'] = context
return func(*args, **kwargs)
102 changes: 102 additions & 0 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from aws_xray_sdk.core import xray_recorder

from datadog_lambda.constants import (
SamplingPriority,
TraceHeader,
XraySubsegment,
)

dd_trace_context = {}


def extract_dd_trace_context(event):
"""
Extract Datadog trace context from the Lambda `event` object.

Write the context to a global `dd_trace_context`, so the trace
can be continued on the outgoing requests with the context injected.

Save the context to an X-Ray subsegment's metadata field, so the X-Ray
trace can be converted to a Datadog trace in the Datadog backend with
the correct context.
"""
global dd_trace_context
headers = event.get('headers', {})
trace_id = headers.get(TraceHeader.TRACE_ID)
parent_id = headers.get(TraceHeader.PARENT_ID)
sampling_priority = headers.get(TraceHeader.SAMPLING_PRIORITY)
if trace_id and parent_id and sampling_priority:
dd_trace_context = {
'trace-id': trace_id,
'parent-id': parent_id,
'sampling-priority': sampling_priority,
}
xray_recorder.begin_subsegment(XraySubsegment.NAME)
subsegment = xray_recorder.current_subsegment()
subsegment.put_metadata(
XraySubsegment.KEY,
dd_trace_context,
XraySubsegment.NAMESPACE
)
xray_recorder.end_subsegment()
else:
# AWS Lambda runtime caches global variables between invocations,
# reset to avoid using the context from the last invocation.
dd_trace_context = {}


def get_dd_trace_context():
"""
Return the Datadog trace context to be propogated on the outgoing requests.

If the Lambda function is invoked by a Datadog-traced service, a Datadog
trace
context may already exist, and it should be used. Otherwise, use the
current X-Ray trace entity.

Most of widely-used HTTP clients are patched to inject the context
automatically, but this function can be used to manually inject the trace
context to an outgoing request.
"""
global dd_trace_context
xray_trace_entity = xray_recorder.get_trace_entity() # xray (sub)segment
if dd_trace_context:
return {
TraceHeader.TRACE_ID:
dd_trace_context['trace-id'],
TraceHeader.PARENT_ID: _convert_xray_entity_id(
xray_trace_entity.id),
TraceHeader.SAMPLING_PRIORITY:
dd_trace_context['sampling-priority'],
}
else:
return {
TraceHeader.TRACE_ID: _convert_xray_trace_id(
xray_trace_entity.trace_id),
TraceHeader.PARENT_ID: _convert_xray_entity_id(
xray_trace_entity.id),
TraceHeader.SAMPLING_PRIORITY: _convert_xray_sampling(
xray_trace_entity.sampled),
}


def _convert_xray_trace_id(xray_trace_id):
"""
Convert X-Ray trace id (hex)'s last 63 bits to a Datadog trace id (int).
"""
return str(0x7FFFFFFFFFFFFFFF & int(xray_trace_id[-16:], 16))


def _convert_xray_entity_id(xray_entity_id):
"""
Convert X-Ray (sub)segement id (hex) to a Datadog span id (int).
"""
return str(int(xray_entity_id, 16))


def _convert_xray_sampling(xray_sampled):
"""
Convert X-Ray sampled (True/False) to its Datadog counterpart.
"""
return str(SamplingPriority.USER_KEEP) if xray_sampled \
else str(SamplingPriority.USER_REJECT)
60 changes: 60 additions & 0 deletions datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import traceback
from threading import Thread

from datadog_lambda.metric import init_api_client, lambda_stats
from datadog_lambda.tracing import extract_dd_trace_context
from datadog_lambda.patch import patch_all


"""
Usage:

import requests
from datadog_lambda.wrapper import datadog_lambda_wrapper
from datadog_lambda.metric import lambda_metric

@datadog_lambda_wrapper
def my_lambda_handle(event, context):
lambda_metric("my_metric", 10)
requests.get("https://www.datadoghq.com")
"""


class _LambdaDecorator(object):
"""
Decorator to automatically initialize Datadog API client, flush metrics,
and extracts/injects trace context.
"""

def __init__(self, func):
self.func = func

def _before(self, event, context):
try:
# Async initialization of the TLS connection with Datadog API,
# and reduces the overhead to the final metric flush at the end.
Thread(target=init_api_client).start()

# Extract Datadog trace context from incoming requests
extract_dd_trace_context(event)

# Patch HTTP clients to propogate Datadog trace context
patch_all()
except Exception:
traceback.print_exc()

def _after(self, event, context):
try:
lambda_stats.flush(float("inf"))
except Exception:
traceback.print_exc()

def __call__(self, event, context):
self._before(event, context)
try:
return self.func(event, context)
finally:
self._after(event, context)


datadog_lambda_wrapper = _LambdaDecorator
3 changes: 3 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
nose2==0.9.1
flake8==3.7.7
requests==2.21.0
Loading