Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ddtrace/compat.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
import sys

PY2 = sys.version_info[0] == 2
if PY2:
from urllib import urlencode
else:
from urllib.parse import urlencode

try:
from queue import Queue
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/contrib/elasticsearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .transport import get_traced_transport

__all__ = ['get_traced_transport']
5 changes: 5 additions & 0 deletions ddtrace/contrib/elasticsearch/metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
URL = 'elasticsearch.url'
METHOD = 'elasticsearch.method'
TOOK = 'elasticsearch.took'
PARAMS = 'elasticsearch.params'
BODY = 'elasticsearch.body'
35 changes: 35 additions & 0 deletions ddtrace/contrib/elasticsearch/quantize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import re
from . import metadata

# Replace any ID
ID_REGEXP = re.compile(r'/([0-9]+)([/\?]|$)')
ID_PLACEHOLDER = r'/?\2'

# Remove digits from potential timestamped indexes (should be an option).
# For now, let's say 2+ digits
INDEX_REGEXP = re.compile(r'[0-9]{2,}')
INDEX_PLACEHOLDER = r'?'

def quantize(span):
"""Quantize an elasticsearch span

We want to extract a meaningful `resource` from the request.
We do it based on the method + url, with some cleanup applied to the URL.

The URL might a ID, but also it is common to have timestamped indexes.
While the first is easy to catch, the second should probably be configurable.

All of this should probably be done in the Agent. Later.
"""
url = span.get_tag(metadata.URL)
method = span.get_tag(metadata.METHOD)

quantized_url = ID_REGEXP.sub(ID_PLACEHOLDER, url)
quantized_url = INDEX_REGEXP.sub(INDEX_PLACEHOLDER, quantized_url)

span.resource = '{method} {url}'.format(
method=method,
url=quantized_url
)

return span
95 changes: 95 additions & 0 deletions ddtrace/contrib/elasticsearch/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import unittest
from nose.tools import eq_

# We should probably be smarter than that
try:
import elasticsearch
except ImportError:
elasticsearch = None

from . import metadata
from .transport import get_traced_transport
from ...tracer import Tracer
from ...test_tracer import DummyWriter



class ElasticsearchTest(unittest.TestCase):
"""Elasticsearch integration test suite

Need a running ES on localhost:9200
"""

ES_INDEX = 'ddtrace_index'
ES_TYPE = 'ddtrace_type'

TEST_SERVICE = 'test'

def setUp(self):
"""Prepare ES"""
if not elasticsearch:
unittest.SkipTest("elasticsearch module isn't available")

es = elasticsearch.Elasticsearch()
es.indices.delete(index=self.ES_INDEX, ignore=[400, 404])

def tearDown(self):
"""Clean ES"""
es = elasticsearch.Elasticsearch()
es.indices.delete(index=self.ES_INDEX, ignore=[400, 404])

def test_elasticsearch(self):
"""Test the elasticsearch integration

All in this for now. Will split it later.
"""
writer = DummyWriter()
tracer = Tracer(writer=writer)
transport_class = get_traced_transport(datadog_tracer=tracer, datadog_service=self.TEST_SERVICE)

es = elasticsearch.Elasticsearch(transport_class=transport_class)

# Test index creation
es.indices.create(index=self.ES_INDEX, ignore=400)

spans = writer.pop()
assert spans
eq_(len(spans), 1)
span = spans[0]
eq_(span.service, self.TEST_SERVICE)
eq_(span.name, "elasticsearch.query")
eq_(span.span_type, "elasticsearch")
eq_(span.error, 0)
eq_(span.get_tag(metadata.METHOD), "PUT")
eq_(span.get_tag(metadata.URL), "/%s" % self.ES_INDEX)
eq_(span.resource, "PUT /%s" % self.ES_INDEX)

# Put data
es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=10, body={'name': 'ten'})
es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=11, body={'name': 'eleven'})
es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=12, body={'name': 'twelve'})

spans = writer.pop()
assert spans
eq_(len(spans), 3)
span = spans[0]
eq_(span.error, 0)
eq_(span.get_tag(metadata.METHOD), "PUT")
eq_(span.get_tag(metadata.URL), "/%s/%s/%s" % (self.ES_INDEX, self.ES_TYPE, 10))
eq_(span.resource, "PUT /%s/%s/(id)" % (self.ES_INDEX, self.ES_TYPE))

# Search data
es.search(index=self.ES_INDEX, doc_type=self.ES_TYPE, sort=['name:desc'], size=100, body={"query":{"match_all":{}}})

spans = writer.pop()
assert spans
eq_(len(spans), 1)
span = spans[0]
eq_(span.resource, "GET /%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE))
eq_(span.get_tag(metadata.METHOD), "GET")
eq_(span.get_tag(metadata.URL), "/%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE))
eq_(span.get_tag(metadata.PARAMS), 'sort=name%3Adesc&size=100')
eq_(span.get_tag(metadata.BODY), '{"query":{"match_all":{}}}')

self.assertTrue(int(span.get_tag(metadata.TOOK)) > 0)

49 changes: 49 additions & 0 deletions ddtrace/contrib/elasticsearch/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
try:
from elasticsearch import Transport
except ImportError:
Transport = object

from .quantize import quantize
from . import metadata
from ...compat import json, urlencode

DEFAULT_SERVICE = 'elasticsearch'
SPAN_TYPE = 'elasticsearch'


def get_traced_transport(datadog_tracer, datadog_service=DEFAULT_SERVICE):

class TracedTransport(Transport):
"""Extend elasticseach transport layer to allow Datadog tracer to catch any performed request"""

_datadog_tracer = datadog_tracer
_datadog_service = datadog_service

def perform_request(self, method, url, params=None, body=None):
"""Wrap any request with a span

We need to parse the URL to extract index/type/endpoints, but this catches all requests.
This is ConnectionClass-agnostic.
"""
with self._datadog_tracer.trace("elasticsearch.query") as s:
s.service = self._datadog_service
s.span_type = SPAN_TYPE
s.set_tag(metadata.METHOD, method)
s.set_tag(metadata.URL, url)
s.set_tag(metadata.PARAMS, urlencode(params))
if method == "GET":
s.set_tag(metadata.BODY, json.dumps(body))

s = quantize(s)

try:
result = super(TracedTransport, self).perform_request(method, url, params=params, body=body)
return result
finally:
_, data = result
took = data.get("took")
if took:
# TODO: move that to a metric instead
s.set_tag(metadata.TOOK, took)

return TracedTransport