Skip to content

Commit

Permalink
elastic: Add traced HTTPClient, tests for v3 and v5.
Browse files Browse the repository at this point in the history
TracedHTTPClient hooks into the http.Transport to trace requests and
capture spans for Elasticsearch. It's generic enough that it could be
used for other Elasticsearch clients that allow you to override the http
client.

The tests and example show the pattern for tracing the elastic client,
specifically using elastic.v5. Older versions do not support calling
`Do` with a context so they won't apply. But in theory this should be
compatible with new version going forward.

docker-compose is updated to run ES for the integration tests.
  • Loading branch information
conorbranagan committed May 16, 2017
1 parent d42fcc5 commit 1b89da3
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 1 deletion.
39 changes: 38 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,45 @@
# File for develipment/ testing purposes
# File for development/ testing purposes
redis:
image: redis:3.2
ports:
- "127.0.0.1:6379:6379"
elasticsearch-v5:
image: elasticsearch:5-alpine
container_name: elasticsearch5
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- bootstrap.ignore_system_bootstrap_checks=true
ports:
- 9200:9200
ulimits:
nofile:
soft: 65536
hard: 65536
mem_limit: 1g
cap_add:
- IPC_LOCK
elasticsearch-v2:
image: elasticsearch:2-alpine
container_name: elasticsearch2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- bootstrap.ignore_system_bootstrap_checks=true
command: "elasticsearch -Des.http.port=9201"
ports:
- 9201:9201
ulimits:
nofile:
soft: 65536
hard: 65536
mem_limit: 1g
cap_add:
- IPC_LOCK
ddagent:
image: datadog/docker-dd-agent
environment:
Expand Down
85 changes: 85 additions & 0 deletions tracer/contrib/elastictraced/elastictraced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Package elastictraced provides tracing for the Elastic Elasticsearch client.
// Supports v3 (gopkg.in/olivere/elastic.v3), v5 (gopkg.in/olivere/elastic.v5)
// but with v3 you must use `DoC` on all requests to capture the request context.
package elastictraced

import (
"bytes"
"errors"
"io/ioutil"
"net/http"
"strconv"

"github.com/DataDog/dd-trace-go/tracer"
"github.com/DataDog/dd-trace-go/tracer/ext"
)

// MaxContentLength is the maximum content length for which we'll read and capture
// the contents of the request body. Anything larger will still be traced but the
// body will not be captured as trace metadata.
const MaxContentLength = 500 * 1024

// TracedTransport is a traced HTTP transport that captures Elasticsearch spans.
type TracedTransport struct {
service string
tracer *tracer.Tracer
*http.Transport
}

// RoundTrip satisfies the RoundTripper interface, wraps the sub Transport and
// captures a span of the Elasticsearch request.
func (t *TracedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
span := t.tracer.NewChildSpanFromContext("elasticsearch.query", req.Context())
span.Service = t.service
span.Type = ext.AppTypeDB
defer span.Finish()
span.SetMeta("elasticsearch.method", req.Method)
span.SetMeta("elasticsearch.url", req.URL.Path)
span.SetMeta("elasticsearch.params", req.URL.Query().Encode())

contentLength, _ := strconv.Atoi(req.Header.Get("Content-Length"))
if req.Body != nil && contentLength < MaxContentLength {
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
span.SetMeta("elasticsearch.body", string(buf))
req.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
}

// Run the request using the standard transport.
res, err := t.Transport.RoundTrip(req)

span.SetMeta(ext.HTTPCode, strconv.Itoa(res.StatusCode))
if err != nil {
span.SetError(err)
return res, err
} else if res.StatusCode < 200 || res.StatusCode > 299 {
buf, err := ioutil.ReadAll(res.Body)
if err != nil {
// Status text is best we can do if if we can't read the body.
span.SetError(errors.New(http.StatusText(res.StatusCode)))
} else {
span.SetError(errors.New(string(buf)))
}
res.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
}
Quantize(span)

return res, err
}

// NewTracedHTTPClient returns a new TracedTransport that traces HTTP requests.
func NewTracedHTTPClient(service string, tracer *tracer.Tracer) *http.Client {
return &http.Client{
Transport: &TracedTransport{service, tracer, &http.Transport{}},
}
}

// NewTracedHTTPClientWithTransport returns a new TracedTransport that traces HTTP requests
// and takes in a Transport to use something other than the default.
func NewTracedHTTPClientWithTransport(service string, tracer *tracer.Tracer, transport *http.Transport) *http.Client {
return &http.Client{
Transport: &TracedTransport{service, tracer, transport},
}
}
121 changes: 121 additions & 0 deletions tracer/contrib/elastictraced/elastictraced_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package elastictraced

import (
"context"
"github.com/DataDog/dd-trace-go/tracer"
"github.com/stretchr/testify/assert"
elasticv3 "gopkg.in/olivere/elastic.v3"
elasticv5 "gopkg.in/olivere/elastic.v5"

"testing"
)

const (
debug = false
)

func TestClientV5(t *testing.T) {
assert := assert.New(t)
testTracer, testTransport := getTestTracer()
testTracer.DebugLoggingEnabled = debug

tc := NewTracedHTTPClient("my-es-service", testTracer)
client, err := elasticv5.NewClient(
elasticv5.SetURL("http://127.0.0.1:9200"),
elasticv5.SetHttpClient(tc),
elasticv5.SetSniff(false),
elasticv5.SetHealthcheck(false),
)
assert.NoError(err)

_, err = client.Index().
Index("twitter").Id("1").
Type("tweet").
BodyString(`{"user": "test", "message": "hello"}`).
Do(context.TODO())
assert.NoError(err)

_, err = client.Get().Index("twitter").Type("tweet").
Id("1").Do(context.TODO())
assert.NoError(err)

checkOKTraces(assert, testTracer, testTransport)

_, err = client.Get().Index("not-real-index").
Id("1").Do(context.TODO())
assert.Error(err)

checkErrorTraces(assert, testTracer, testTransport)
}

func TestClientV3(t *testing.T) {
assert := assert.New(t)
testTracer, testTransport := getTestTracer()
testTracer.DebugLoggingEnabled = debug

tc := NewTracedHTTPClient("my-es-service", testTracer)
client, err := elasticv3.NewClient(
elasticv3.SetURL("http://127.0.0.1:9201"),
elasticv3.SetHttpClient(tc),
elasticv3.SetSniff(false),
elasticv3.SetHealthcheck(false),
)
assert.NoError(err)

_, err = client.Index().
Index("twitter").Id("1").
Type("tweet").
BodyString(`{"user": "test", "message": "hello"}`).
DoC(context.TODO())
assert.NoError(err)

_, err = client.Get().Index("twitter").Type("tweet").
Id("1").DoC(context.TODO())
assert.NoError(err)

checkOKTraces(assert, testTracer, testTransport)

_, err = client.Get().Index("not-real-index").
Id("1").DoC(context.TODO())
assert.Error(err)

checkErrorTraces(assert, testTracer, testTransport)
}

func checkOKTraces(assert *assert.Assertions, tracer *tracer.Tracer, transport *tracer.DummyTransport) {
tracer.FlushTraces()
traces := transport.Traces()
assert.Len(traces, 2)

spans := traces[0]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("PUT /twitter/tweet/?", spans[0].Resource)
assert.Equal("/twitter/tweet/1", spans[0].GetMeta("elasticsearch.url"))
assert.Equal("PUT", spans[0].GetMeta("elasticsearch.method"))

spans = traces[1]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("GET /twitter/tweet/?", spans[0].Resource)
assert.Equal("/twitter/tweet/1", spans[0].GetMeta("elasticsearch.url"))
assert.Equal("GET", spans[0].GetMeta("elasticsearch.method"))
}

func checkErrorTraces(assert *assert.Assertions, tracer *tracer.Tracer, transport *tracer.DummyTransport) {
tracer.FlushTraces()
traces := transport.Traces()
assert.Len(traces, 1)

spans := traces[0]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("GET /not-real-index/_all/?", spans[0].Resource)
assert.Equal("/not-real-index/_all/1", spans[0].GetMeta("elasticsearch.url"))
assert.NotEmpty(spans[0].GetMeta("error.msg"))
assert.Equal("*errors.errorString", spans[0].GetMeta("error.type"))
}

// getTestTracer returns a Tracer with a DummyTransport
func getTestTracer() (*tracer.Tracer, *tracer.DummyTransport) {
transport := &tracer.DummyTransport{}
tracer := tracer.NewTracerTransport(transport)
return tracer, transport
}
57 changes: 57 additions & 0 deletions tracer/contrib/elastictraced/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package elastictraced_test

import (
"context"
"github.com/DataDog/dd-trace-go/tracer"
"github.com/DataDog/dd-trace-go/tracer/contrib/elastictraced"
elasticv3 "gopkg.in/olivere/elastic.v3"
elasticv5 "gopkg.in/olivere/elastic.v5"
)

// To start tracing elastic.v5 requests, create a new TracedHTTPClient that you will
// use when initializing the elastic.Client.
func Example_v5() {
tc := elastictraced.NewTracedHTTPClient("my-elasticsearch-service", tracer.DefaultTracer)
client, _ := elasticv5.NewClient(
elasticv5.SetURL("http://127.0.0.1:9200"),
elasticv5.SetHttpClient(tc),
)

// Spans are emitted for all
client.Index().
Index("twitter").Type("tweet").Index("1").
BodyString(`{"user": "test", "message": "hello"}`).
Do(context.Background())

// Use a context to pass information down the call chain
root := tracer.NewRootSpan("parent.request", "web", "/tweet/1")
ctx := root.Context(context.Background())
client.Get().
Index("twitter").Type("tweet").Index("1").
Do(ctx)
root.Finish()
}

// To trace elastic.v3 you create a TracedHTTPClient in the same way but all requests must use
// the DoC() call to pass the request context.
func Example_v3() {
tc := elastictraced.NewTracedHTTPClient("my-elasticsearch-service", tracer.DefaultTracer)
client, _ := elasticv3.NewClient(
elasticv3.SetURL("http://127.0.0.1:9200"),
elasticv3.SetHttpClient(tc),
)

// Spans are emitted for all
client.Index().
Index("twitter").Type("tweet").Index("1").
BodyString(`{"user": "test", "message": "hello"}`).
DoC(context.Background())

// Use a context to pass information down the call chain
root := tracer.NewRootSpan("parent.request", "web", "/tweet/1")
ctx := root.Context(context.Background())
client.Get().
Index("twitter").Type("tweet").Index("1").
DoC(ctx)
root.Finish()
}
26 changes: 26 additions & 0 deletions tracer/contrib/elastictraced/quantize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package elastictraced

import (
"fmt"
"github.com/DataDog/dd-trace-go/tracer"
"regexp"
)

var (
IdRegexp = regexp.MustCompile("/([0-9]+)([/\\?]|$)")
IdPlaceholder = []byte("/?$2")
IndexRegexp = regexp.MustCompile("[0-9]{2,}")
IndexPlaceholder = []byte("?")
)

// Quantize quantizes an Elasticsearch to extract a meaningful resource from the request.
// We quantize based on the method+url with some cleanup applied to the URL.
// URLs with an ID will be generalized as will (potential) timestamped indices.
func Quantize(span *tracer.Span) {
url := span.GetMeta("elasticsearch.url")
method := span.GetMeta("elasticsearch.method")

quantizedURL := IdRegexp.ReplaceAll([]byte(url), IdPlaceholder)
quantizedURL = IndexRegexp.ReplaceAll(quantizedURL, IndexPlaceholder)
span.Resource = fmt.Sprintf("%s %s", method, quantizedURL)
}
43 changes: 43 additions & 0 deletions tracer/contrib/elastictraced/quantize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package elastictraced

import (
"testing"

"github.com/DataDog/dd-trace-go/tracer"
"github.com/stretchr/testify/assert"
)

func TestQuantize(t *testing.T) {
tr := tracer.NewTracer()
for _, tc := range []struct {
url, method string
expected string
}{
{
url: "/twitter/tweets",
method: "POST",
expected: "POST /twitter/tweets",
},
{
url: "/logs_2016_05/event/_search",
method: "GET",
expected: "GET /logs_?_?/event/_search",
},
{
url: "/twitter/tweets/123",
method: "GET",
expected: "GET /twitter/tweets/?",
},
{
url: "/logs_2016_05/event/123",
method: "PUT",
expected: "PUT /logs_?_?/event/?",
},
} {
span := tracer.NewSpan("name", "elasticsearch", "", 0, 0, 0, tr)
span.SetMeta("elasticsearch.url", tc.url)
span.SetMeta("elasticsearch.method", tc.method)
Quantize(span)
assert.Equal(t, tc.expected, span.Resource)
}
}

0 comments on commit 1b89da3

Please sign in to comment.