From 60c51645cee92b4241d574859c9fd97c57910584 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 22 Jun 2016 17:49:55 -0400 Subject: [PATCH 1/3] Introduce a the elasticsearch integration --- ddtrace/contrib/elasticsearch/__init__.py | 3 + ddtrace/contrib/elasticsearch/metadata.py | 3 + ddtrace/contrib/elasticsearch/quantize.py | 35 ++++++++ ddtrace/contrib/elasticsearch/test.py | 92 ++++++++++++++++++++++ ddtrace/contrib/elasticsearch/transport.py | 44 +++++++++++ 5 files changed, 177 insertions(+) create mode 100644 ddtrace/contrib/elasticsearch/__init__.py create mode 100644 ddtrace/contrib/elasticsearch/metadata.py create mode 100644 ddtrace/contrib/elasticsearch/quantize.py create mode 100644 ddtrace/contrib/elasticsearch/test.py create mode 100644 ddtrace/contrib/elasticsearch/transport.py diff --git a/ddtrace/contrib/elasticsearch/__init__.py b/ddtrace/contrib/elasticsearch/__init__.py new file mode 100644 index 00000000000..c20ca59237a --- /dev/null +++ b/ddtrace/contrib/elasticsearch/__init__.py @@ -0,0 +1,3 @@ +from .transport import get_traced_transport + +__all__ = ['get_traced_transport'] diff --git a/ddtrace/contrib/elasticsearch/metadata.py b/ddtrace/contrib/elasticsearch/metadata.py new file mode 100644 index 00000000000..209889dee93 --- /dev/null +++ b/ddtrace/contrib/elasticsearch/metadata.py @@ -0,0 +1,3 @@ +URL = 'elasticsearch.url' +METHOD = 'elasticsearch.method' +TOOK = 'elasticsearch.took' diff --git a/ddtrace/contrib/elasticsearch/quantize.py b/ddtrace/contrib/elasticsearch/quantize.py new file mode 100644 index 00000000000..40ab1b6e0c5 --- /dev/null +++ b/ddtrace/contrib/elasticsearch/quantize.py @@ -0,0 +1,35 @@ +import re +from . import metadata + +# Replace any ID +ID_REGEXP = re.compile(r'/([0-9]+)([/\?]|$)') +ID_PLACEHOLDER = r'/(id)\2' + +# Remove digits from potential timestamped indexes (should be an option). +# For now, let's say 2+ digits +INDEX_REGEXP = re.compile(r'[0-9]{2,}') +INDEX_PLACEHOLDER = r'(d)' + +def quantize(span): + """Quantize an elasticsearch span + + We want to extract a meaningful `resource` from the request. + We do it based on the method + url, with some cleanup applied to the URL. + + The URL might a ID, but also it is common to have timestamped indexes. + While the first is easy to catch, the second should probably be configurable. + + All of this should probably be done in the Agent. Later. + """ + url = span.get_tag(metadata.URL) + method = span.get_tag(metadata.METHOD) + + quantized_url = ID_REGEXP.sub(ID_PLACEHOLDER, url) + quantized_url = INDEX_REGEXP.sub(INDEX_PLACEHOLDER, quantized_url) + + span.resource = '{method} {url}'.format( + method=method, + url=quantized_url + ) + + return span diff --git a/ddtrace/contrib/elasticsearch/test.py b/ddtrace/contrib/elasticsearch/test.py new file mode 100644 index 00000000000..7ef5d93b21b --- /dev/null +++ b/ddtrace/contrib/elasticsearch/test.py @@ -0,0 +1,92 @@ +import unittest +from nose.tools import eq_ + +# We should probably be smarter than that +try: + import elasticsearch +except ImportError: + elasticsearch = None + +from . import metadata +from .transport import get_traced_transport +from ...tracer import Tracer +from ...test_tracer import DummyWriter + + + +class ElasticsearchTest(unittest.TestCase): + """Elasticsearch integration test suite + + Need a running ES on localhost:9200 + """ + + ES_INDEX = 'ddtrace_index' + ES_TYPE = 'ddtrace_type' + + TEST_SERVICE = 'test' + + def setUp(self): + """Prepare ES""" + if not elasticsearch: + unittest.SkipTest("elasticsearch module isn't available") + + es = elasticsearch.Elasticsearch() + es.indices.delete(index=self.ES_INDEX, ignore=[400, 404]) + + def tearDown(self): + """Clean ES""" + es = elasticsearch.Elasticsearch() + es.indices.delete(index=self.ES_INDEX, ignore=[400, 404]) + + def test_elasticsearch(self): + """Test the elasticsearch integration + + All in this for now. Will split it later. + """ + writer = DummyWriter() + tracer = Tracer(writer=writer) + transport_class = get_traced_transport(datadog_tracer=tracer, datadog_service=self.TEST_SERVICE) + + es = elasticsearch.Elasticsearch(transport_class=transport_class) + + # Test index creation + es.indices.create(index=self.ES_INDEX, ignore=400) + + spans = writer.pop() + assert spans + eq_(len(spans), 1) + span = spans[0] + eq_(span.service, self.TEST_SERVICE) + eq_(span.name, "elasticsearch.query") + eq_(span.span_type, "elasticsearch") + eq_(span.error, 0) + eq_(span.get_tag(metadata.METHOD), "PUT") + eq_(span.get_tag(metadata.URL), "/%s" % self.ES_INDEX) + eq_(span.resource, "PUT /%s" % self.ES_INDEX) + + # Put data + es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=10, body={'name': 'ten'}) + es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=11, body={'name': 'eleven'}) + es.index(index=self.ES_INDEX, doc_type=self.ES_TYPE, id=12, body={'name': 'twelve'}) + + spans = writer.pop() + assert spans + eq_(len(spans), 3) + span = spans[0] + eq_(span.error, 0) + eq_(span.get_tag(metadata.METHOD), "PUT") + eq_(span.get_tag(metadata.URL), "/%s/%s/%s" % (self.ES_INDEX, self.ES_TYPE, 10)) + eq_(span.resource, "PUT /%s/%s/(id)" % (self.ES_INDEX, self.ES_TYPE)) + + # Search data + es.search(index=self.ES_INDEX, doc_type=self.ES_TYPE, body={"query":{"match_all":{}}}) + + spans = writer.pop() + assert spans + eq_(len(spans), 1) + span = spans[0] + eq_(span.get_tag(metadata.METHOD), "GET") + eq_(span.get_tag(metadata.URL), "/%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE)) + eq_(span.resource, "GET /%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE)) + self.assertTrue(int(span.get_tag(metadata.TOOK)) > 0) + diff --git a/ddtrace/contrib/elasticsearch/transport.py b/ddtrace/contrib/elasticsearch/transport.py new file mode 100644 index 00000000000..0808c3c2ca4 --- /dev/null +++ b/ddtrace/contrib/elasticsearch/transport.py @@ -0,0 +1,44 @@ +try: + from elasticsearch import Transport +except ImportError: + Transport = object + +from .quantize import quantize +from . import metadata + +DEFAULT_SERVICE = 'elasticsearch' +SPAN_TYPE = 'elasticsearch' + + +def get_traced_transport(datadog_tracer, datadog_service=DEFAULT_SERVICE): + + class TracedTransport(Transport): + """Extend elasticseach transport layer to allow Datadog tracer to catch any performed request""" + + _datadog_tracer = datadog_tracer + _datadog_service = datadog_service + + def perform_request(self, method, url, params=None, body=None): + """Wrap any request with a span + + We need to parse the URL to extract index/type/endpoints, but this catches all requests. + This is ConnectionClass-agnostic. + """ + with self._datadog_tracer.trace("elasticsearch.query") as s: + s.service = self._datadog_service + s.span_type = SPAN_TYPE + s.set_tag(metadata.METHOD, method) + s.set_tag(metadata.URL, url) + s = quantize(s) + + try: + result = super(TracedTransport, self).perform_request(method, url, params=params, body=body) + return result + finally: + _, data = result + took = data.get("took") + if took: + # TODO: move that to a metric instead + s.set_tag(metadata.TOOK, took) + + return TracedTransport From 922488bcc466100d5b1a4fe57ff8748b0fac27c4 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 22 Jun 2016 18:57:39 -0400 Subject: [PATCH 2/3] Put params and body in ES meta, extend tests --- ddtrace/compat.py | 7 +++++++ ddtrace/contrib/elasticsearch/metadata.py | 2 ++ ddtrace/contrib/elasticsearch/test.py | 7 +++++-- ddtrace/contrib/elasticsearch/transport.py | 5 +++++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/ddtrace/compat.py b/ddtrace/compat.py index 1155d3cfc82..36529f10fac 100644 --- a/ddtrace/compat.py +++ b/ddtrace/compat.py @@ -1,3 +1,10 @@ +import sys + +PY2 = sys.version_info[0] == 2 +if PY2: + from urllib import urlencode +else: + from urllib.parse import urlencode try: from queue import Queue diff --git a/ddtrace/contrib/elasticsearch/metadata.py b/ddtrace/contrib/elasticsearch/metadata.py index 209889dee93..49398671e01 100644 --- a/ddtrace/contrib/elasticsearch/metadata.py +++ b/ddtrace/contrib/elasticsearch/metadata.py @@ -1,3 +1,5 @@ URL = 'elasticsearch.url' METHOD = 'elasticsearch.method' TOOK = 'elasticsearch.took' +PARAMS = 'elasticsearch.params' +BODY = 'elasticsearch.body' diff --git a/ddtrace/contrib/elasticsearch/test.py b/ddtrace/contrib/elasticsearch/test.py index 7ef5d93b21b..a4d2d1d401f 100644 --- a/ddtrace/contrib/elasticsearch/test.py +++ b/ddtrace/contrib/elasticsearch/test.py @@ -79,14 +79,17 @@ def test_elasticsearch(self): eq_(span.resource, "PUT /%s/%s/(id)" % (self.ES_INDEX, self.ES_TYPE)) # Search data - es.search(index=self.ES_INDEX, doc_type=self.ES_TYPE, body={"query":{"match_all":{}}}) + es.search(index=self.ES_INDEX, doc_type=self.ES_TYPE, sort=['name:desc'], size=100, body={"query":{"match_all":{}}}) spans = writer.pop() assert spans eq_(len(spans), 1) span = spans[0] + eq_(span.resource, "GET /%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE)) eq_(span.get_tag(metadata.METHOD), "GET") eq_(span.get_tag(metadata.URL), "/%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE)) - eq_(span.resource, "GET /%s/%s/_search" % (self.ES_INDEX, self.ES_TYPE)) + eq_(span.get_tag(metadata.PARAMS), 'sort=name%3Adesc&size=100') + eq_(span.get_tag(metadata.BODY), '{"query":{"match_all":{}}}') + self.assertTrue(int(span.get_tag(metadata.TOOK)) > 0) diff --git a/ddtrace/contrib/elasticsearch/transport.py b/ddtrace/contrib/elasticsearch/transport.py index 0808c3c2ca4..72e98167132 100644 --- a/ddtrace/contrib/elasticsearch/transport.py +++ b/ddtrace/contrib/elasticsearch/transport.py @@ -5,6 +5,7 @@ from .quantize import quantize from . import metadata +from ...compat import json, urlencode DEFAULT_SERVICE = 'elasticsearch' SPAN_TYPE = 'elasticsearch' @@ -29,6 +30,10 @@ def perform_request(self, method, url, params=None, body=None): s.span_type = SPAN_TYPE s.set_tag(metadata.METHOD, method) s.set_tag(metadata.URL, url) + s.set_tag(metadata.PARAMS, urlencode(params)) + if method == "GET": + s.set_tag(metadata.BODY, json.dumps(body)) + s = quantize(s) try: From 770db6cf63bce76c5a0ca0c7b2b75a9a9c38f1ce Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 22 Jun 2016 19:08:42 -0400 Subject: [PATCH 3/3] Replace ES placeholder with ? --- ddtrace/contrib/elasticsearch/quantize.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/elasticsearch/quantize.py b/ddtrace/contrib/elasticsearch/quantize.py index 40ab1b6e0c5..4946c1060f0 100644 --- a/ddtrace/contrib/elasticsearch/quantize.py +++ b/ddtrace/contrib/elasticsearch/quantize.py @@ -3,12 +3,12 @@ # Replace any ID ID_REGEXP = re.compile(r'/([0-9]+)([/\?]|$)') -ID_PLACEHOLDER = r'/(id)\2' +ID_PLACEHOLDER = r'/?\2' # Remove digits from potential timestamped indexes (should be an option). # For now, let's say 2+ digits INDEX_REGEXP = re.compile(r'[0-9]{2,}') -INDEX_PLACEHOLDER = r'(d)' +INDEX_PLACEHOLDER = r'?' def quantize(span): """Quantize an elasticsearch span