Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

elastic: Add traced HTTPClient for client #68

Merged
merged 1 commit into from
May 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,45 @@
# File for develipment/ testing purposes
# File for development/ testing purposes
redis:
image: redis:3.2
ports:
- "127.0.0.1:6379:6379"
elasticsearch-v5:
image: elasticsearch:5-alpine
container_name: elasticsearch5
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- bootstrap.ignore_system_bootstrap_checks=true
ports:
- 9200:9200
ulimits:
nofile:
soft: 65536
hard: 65536
mem_limit: 1g
cap_add:
- IPC_LOCK
elasticsearch-v2:
image: elasticsearch:2-alpine
container_name: elasticsearch2
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=false
- bootstrap.ignore_system_bootstrap_checks=true
command: "elasticsearch -Des.http.port=9201"
ports:
- 9201:9201
ulimits:
nofile:
soft: 65536
hard: 65536
mem_limit: 1g
cap_add:
- IPC_LOCK
ddagent:
image: datadog/docker-dd-agent
environment:
Expand Down
85 changes: 85 additions & 0 deletions tracer/contrib/elastictraced/elastictraced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Package elastictraced provides tracing for the Elastic Elasticsearch client.
// Supports v3 (gopkg.in/olivere/elastic.v3), v5 (gopkg.in/olivere/elastic.v5)
// but with v3 you must use `DoC` on all requests to capture the request context.
package elastictraced

import (
"bytes"
"errors"
"io/ioutil"
"net/http"
"strconv"

"github.com/DataDog/dd-trace-go/tracer"
"github.com/DataDog/dd-trace-go/tracer/ext"
)

// MaxContentLength is the maximum content length for which we'll read and capture
// the contents of the request body. Anything larger will still be traced but the
// body will not be captured as trace metadata.
const MaxContentLength = 500 * 1024

// TracedTransport is a traced HTTP transport that captures Elasticsearch spans.
type TracedTransport struct {
service string
tracer *tracer.Tracer
*http.Transport
}

// RoundTrip satisfies the RoundTripper interface, wraps the sub Transport and
// captures a span of the Elasticsearch request.
func (t *TracedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
span := t.tracer.NewChildSpanFromContext("elasticsearch.query", req.Context())
span.Service = t.service
span.Type = ext.AppTypeDB
defer span.Finish()
span.SetMeta("elasticsearch.method", req.Method)
span.SetMeta("elasticsearch.url", req.URL.Path)
span.SetMeta("elasticsearch.params", req.URL.Query().Encode())

contentLength, _ := strconv.Atoi(req.Header.Get("Content-Length"))
if req.Body != nil && contentLength < MaxContentLength {
buf, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
span.SetMeta("elasticsearch.body", string(buf))
req.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
}

// Run the request using the standard transport.
res, err := t.Transport.RoundTrip(req)

span.SetMeta(ext.HTTPCode, strconv.Itoa(res.StatusCode))
if err != nil {
span.SetError(err)
return res, err
} else if res.StatusCode < 200 || res.StatusCode > 299 {
buf, err := ioutil.ReadAll(res.Body)
if err != nil {
// Status text is best we can do if if we can't read the body.
span.SetError(errors.New(http.StatusText(res.StatusCode)))
} else {
span.SetError(errors.New(string(buf)))
}
res.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
}
Quantize(span)

return res, err
}

// NewTracedHTTPClient returns a new TracedTransport that traces HTTP requests.
func NewTracedHTTPClient(service string, tracer *tracer.Tracer) *http.Client {
return &http.Client{
Transport: &TracedTransport{service, tracer, &http.Transport{}},
}
}

// NewTracedHTTPClientWithTransport returns a new TracedTransport that traces HTTP requests
// and takes in a Transport to use something other than the default.
func NewTracedHTTPClientWithTransport(service string, tracer *tracer.Tracer, transport *http.Transport) *http.Client {
return &http.Client{
Transport: &TracedTransport{service, tracer, transport},
}
}
121 changes: 121 additions & 0 deletions tracer/contrib/elastictraced/elastictraced_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package elastictraced

import (
"context"
"github.com/DataDog/dd-trace-go/tracer"
"github.com/stretchr/testify/assert"
elasticv3 "gopkg.in/olivere/elastic.v3"
elasticv5 "gopkg.in/olivere/elastic.v5"

"testing"
)

const (
debug = false
)

func TestClientV5(t *testing.T) {
assert := assert.New(t)
testTracer, testTransport := getTestTracer()
testTracer.DebugLoggingEnabled = debug

tc := NewTracedHTTPClient("my-es-service", testTracer)
client, err := elasticv5.NewClient(
elasticv5.SetURL("http://127.0.0.1:9200"),
elasticv5.SetHttpClient(tc),
elasticv5.SetSniff(false),
elasticv5.SetHealthcheck(false),
)
assert.NoError(err)

_, err = client.Index().
Index("twitter").Id("1").
Type("tweet").
BodyString(`{"user": "test", "message": "hello"}`).
Do(context.TODO())
assert.NoError(err)

_, err = client.Get().Index("twitter").Type("tweet").
Id("1").Do(context.TODO())
assert.NoError(err)

checkOKTraces(assert, testTracer, testTransport)

_, err = client.Get().Index("not-real-index").
Id("1").Do(context.TODO())
assert.Error(err)

checkErrorTraces(assert, testTracer, testTransport)
}

func TestClientV3(t *testing.T) {
assert := assert.New(t)
testTracer, testTransport := getTestTracer()
testTracer.DebugLoggingEnabled = debug

tc := NewTracedHTTPClient("my-es-service", testTracer)
client, err := elasticv3.NewClient(
elasticv3.SetURL("http://127.0.0.1:9201"),
elasticv3.SetHttpClient(tc),
elasticv3.SetSniff(false),
elasticv3.SetHealthcheck(false),
)
assert.NoError(err)

_, err = client.Index().
Index("twitter").Id("1").
Type("tweet").
BodyString(`{"user": "test", "message": "hello"}`).
DoC(context.TODO())
assert.NoError(err)

_, err = client.Get().Index("twitter").Type("tweet").
Id("1").DoC(context.TODO())
assert.NoError(err)

checkOKTraces(assert, testTracer, testTransport)

_, err = client.Get().Index("not-real-index").
Id("1").DoC(context.TODO())
assert.Error(err)

checkErrorTraces(assert, testTracer, testTransport)
}

func checkOKTraces(assert *assert.Assertions, tracer *tracer.Tracer, transport *tracer.DummyTransport) {
tracer.FlushTraces()
traces := transport.Traces()
assert.Len(traces, 2)

spans := traces[0]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("PUT /twitter/tweet/?", spans[0].Resource)
assert.Equal("/twitter/tweet/1", spans[0].GetMeta("elasticsearch.url"))
assert.Equal("PUT", spans[0].GetMeta("elasticsearch.method"))

spans = traces[1]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("GET /twitter/tweet/?", spans[0].Resource)
assert.Equal("/twitter/tweet/1", spans[0].GetMeta("elasticsearch.url"))
assert.Equal("GET", spans[0].GetMeta("elasticsearch.method"))
}

func checkErrorTraces(assert *assert.Assertions, tracer *tracer.Tracer, transport *tracer.DummyTransport) {
tracer.FlushTraces()
traces := transport.Traces()
assert.Len(traces, 1)

spans := traces[0]
assert.Equal("my-es-service", spans[0].Service)
assert.Equal("GET /not-real-index/_all/?", spans[0].Resource)
assert.Equal("/not-real-index/_all/1", spans[0].GetMeta("elasticsearch.url"))
assert.NotEmpty(spans[0].GetMeta("error.msg"))
assert.Equal("*errors.errorString", spans[0].GetMeta("error.type"))
}

// getTestTracer returns a Tracer with a DummyTransport
func getTestTracer() (*tracer.Tracer, *tracer.DummyTransport) {
transport := &tracer.DummyTransport{}
tracer := tracer.NewTracerTransport(transport)
return tracer, transport
}
57 changes: 57 additions & 0 deletions tracer/contrib/elastictraced/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package elastictraced_test

import (
"context"
"github.com/DataDog/dd-trace-go/tracer"
"github.com/DataDog/dd-trace-go/tracer/contrib/elastictraced"
elasticv3 "gopkg.in/olivere/elastic.v3"
elasticv5 "gopkg.in/olivere/elastic.v5"
)

// To start tracing elastic.v5 requests, create a new TracedHTTPClient that you will
// use when initializing the elastic.Client.
func Example_v5() {
tc := elastictraced.NewTracedHTTPClient("my-elasticsearch-service", tracer.DefaultTracer)
client, _ := elasticv5.NewClient(
elasticv5.SetURL("http://127.0.0.1:9200"),
elasticv5.SetHttpClient(tc),
)

// Spans are emitted for all
client.Index().
Index("twitter").Type("tweet").Index("1").
BodyString(`{"user": "test", "message": "hello"}`).
Do(context.Background())

// Use a context to pass information down the call chain
root := tracer.NewRootSpan("parent.request", "web", "/tweet/1")
ctx := root.Context(context.Background())
client.Get().
Index("twitter").Type("tweet").Index("1").
Do(ctx)
root.Finish()
}

// To trace elastic.v3 you create a TracedHTTPClient in the same way but all requests must use
// the DoC() call to pass the request context.
func Example_v3() {
tc := elastictraced.NewTracedHTTPClient("my-elasticsearch-service", tracer.DefaultTracer)
client, _ := elasticv3.NewClient(
elasticv3.SetURL("http://127.0.0.1:9200"),
elasticv3.SetHttpClient(tc),
)

// Spans are emitted for all
client.Index().
Index("twitter").Type("tweet").Index("1").
BodyString(`{"user": "test", "message": "hello"}`).
DoC(context.Background())

// Use a context to pass information down the call chain
root := tracer.NewRootSpan("parent.request", "web", "/tweet/1")
ctx := root.Context(context.Background())
client.Get().
Index("twitter").Type("tweet").Index("1").
DoC(ctx)
root.Finish()
}
26 changes: 26 additions & 0 deletions tracer/contrib/elastictraced/quantize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package elastictraced

import (
"fmt"
"github.com/DataDog/dd-trace-go/tracer"
"regexp"
)

var (
IdRegexp = regexp.MustCompile("/([0-9]+)([/\\?]|$)")
IdPlaceholder = []byte("/?$2")
IndexRegexp = regexp.MustCompile("[0-9]{2,}")
IndexPlaceholder = []byte("?")
)

// Quantize quantizes an Elasticsearch to extract a meaningful resource from the request.
// We quantize based on the method+url with some cleanup applied to the URL.
// URLs with an ID will be generalized as will (potential) timestamped indices.
func Quantize(span *tracer.Span) {
url := span.GetMeta("elasticsearch.url")
method := span.GetMeta("elasticsearch.method")

quantizedURL := IdRegexp.ReplaceAll([]byte(url), IdPlaceholder)
quantizedURL = IndexRegexp.ReplaceAll(quantizedURL, IndexPlaceholder)
span.Resource = fmt.Sprintf("%s %s", method, quantizedURL)
}
43 changes: 43 additions & 0 deletions tracer/contrib/elastictraced/quantize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package elastictraced

import (
"testing"

"github.com/DataDog/dd-trace-go/tracer"
"github.com/stretchr/testify/assert"
)

func TestQuantize(t *testing.T) {
tr := tracer.NewTracer()
for _, tc := range []struct {
url, method string
expected string
}{
{
url: "/twitter/tweets",
method: "POST",
expected: "POST /twitter/tweets",
},
{
url: "/logs_2016_05/event/_search",
method: "GET",
expected: "GET /logs_?_?/event/_search",
},
{
url: "/twitter/tweets/123",
method: "GET",
expected: "GET /twitter/tweets/?",
},
{
url: "/logs_2016_05/event/123",
method: "PUT",
expected: "PUT /logs_?_?/event/?",
},
} {
span := tracer.NewSpan("name", "elasticsearch", "", 0, 0, 0, tr)
span.SetMeta("elasticsearch.url", tc.url)
span.SetMeta("elasticsearch.method", tc.method)
Quantize(span)
assert.Equal(t, tc.expected, span.Resource)
}
}