Skip to content

Commit

Permalink
Remove encoder pool
Browse files Browse the repository at this point in the history
  • Loading branch information
gabsn committed Oct 4, 2017
1 parent 4bca965 commit 9cb2eca
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 127 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ List of integrations: https://godoc.org/github.com/DataDog/dd-trace-go/tracer#pk
- [tracer](https://github.com/DataDog/dd-trace-go/tree/master/tracer): contains the low level API used to trace the different libraries.

- [libs](https://github.com/DataDog/dd-trace-go/tree/master/libs): contains the different libraries supported by our APM solution.

88 changes: 28 additions & 60 deletions tracer/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Encoder interface {
EncodeServices(services map[string]Service) error
Read(p []byte) (int, error)
ContentType() string
Reset()
}

var mh codec.MsgpackHandle
Expand All @@ -33,7 +32,7 @@ func newMsgpackEncoder() *msgpackEncoder {
return &msgpackEncoder{
buffer: buffer,
encoder: encoder,
contentType: "application/msgpack",
contentType: contentType(msgpackType),
}
}

Expand All @@ -58,12 +57,6 @@ func (e *msgpackEncoder) ContentType() string {
return e.contentType
}

// Reset redirects the output of the encoder to a newly allocated buffer
func (e *msgpackEncoder) Reset() {
var b bytes.Buffer
e.encoder.Reset(&b)
}

// jsonEncoder encodes a list of traces in JSON format
type jsonEncoder struct {
buffer *bytes.Buffer
Expand All @@ -79,7 +72,7 @@ func newJSONEncoder() *jsonEncoder {
return &jsonEncoder{
buffer: buffer,
encoder: encoder,
contentType: "application/json",
contentType: contentType(jsonType),
}
}

Expand All @@ -104,66 +97,41 @@ func (e *jsonEncoder) ContentType() string {
return e.contentType
}

// Reset will allocate a new JSON encoder and a new Buffer
func (e *jsonEncoder) Reset() {
var b bytes.Buffer
e.buffer = &b
e.encoder = json.NewEncoder(&b)
}

const (
JSON_ENCODER = iota
MSGPACK_ENCODER
jsonType = iota
msgpackType
)

// EncoderPool is a pool meant to share the buffers required to encode traces.
// It naively tries to cap the number of active encoders, but doesn't enforce
// the limit. To use a pool, you should Borrow() for an encoder and then
// Return() that encoder to the pool. Encoders in that pool should honor
// the Encoder interface.
type encoderPool struct {
encoderType int
pool chan Encoder
}

func newEncoderPool(encoderType, size int) (*encoderPool, string) {
pool := &encoderPool{
encoderType: encoderType,
pool: make(chan Encoder, size),
func contentType(encoderType int) string {
switch encoderType {
case jsonType:
return "application/json"
case msgpackType:
return "application/msgpack"
default:
return ""
}

// Borrow an encoder to retrieve the default ContentType
encoder := pool.Borrow()
pool.Return(encoder)

contentType := encoder.ContentType()
return pool, contentType
}

func (p *encoderPool) Borrow() Encoder {
var encoder Encoder
// encoderFactory will provide a new encoder each time we want to flush traces or services.
type encoderFactory struct {
EncoderType int
}

select {
case encoder = <-p.pool:
// When we send the encoder as the request body, the persistConn.writeLoop() goroutine
// can theoretically read the underlying buffer whereas the encoder has been returned to the pool.
// This can lead to a race condition and make the app panicking.
// That's why we reset the encoder here to use a newly allocated buffer.
encoder.Reset()
default:
switch p.encoderType {
case JSON_ENCODER:
encoder = newJSONEncoder()
case MSGPACK_ENCODER:
encoder = newMsgpackEncoder()
}
}
return encoder
func newEncoderFactory(encoderType int) (*encoderFactory, string) {
return &encoderFactory{
EncoderType: encoderType,
}, contentType(encoderType)
}

func (p *encoderPool) Return(e Encoder) {
select {
case p.pool <- e:
// Get allocates and returns a new encoder
func (f *encoderFactory) Get() Encoder {
switch f.EncoderType {
case jsonType:
return newJSONEncoder()
case msgpackType:
return newMsgpackEncoder()
default:
return nil
}
}
43 changes: 3 additions & 40 deletions tracer/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,47 +113,10 @@ func TestMsgpackEncoding(t *testing.T) {
}
}

func TestPoolBorrowCreate(t *testing.T) {
func TestEncoderFactoryGet(t *testing.T) {
assert := assert.New(t)

// borrow an encoder from the pool
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := pool.Borrow()
factory, _ := newEncoderFactory(msgpackType)
encoder := factory.Get()
assert.NotNil(encoder)
}

func TestPoolReuseEncoder(t *testing.T) {
assert := assert.New(t)

// borrow, return and borrow again an encoder from the pool
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := pool.Borrow()
pool.Return(encoder)
anotherEncoder := pool.Borrow()
assert.Equal(anotherEncoder, encoder)
}

func TestPoolSize(t *testing.T) {
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := newMsgpackEncoder()
anotherEncoder := newMsgpackEncoder()

// put two encoders in the pool with a maximum size of 1
// doesn't hang the caller
pool.Return(encoder)
pool.Return(anotherEncoder)
}

func TestPoolReturn(t *testing.T) {
assert := assert.New(t)

// an encoder can return in the pool
pool, _ := newEncoderPool(MSGPACK_ENCODER, 5)
encoder := newMsgpackEncoder()
pool.pool <- encoder
pool.Return(encoder)

// the encoder is the one we get before
returnedEncoder := <-pool.pool
assert.Equal(returnedEncoder, encoder)
}
16 changes: 7 additions & 9 deletions tracer/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,17 +604,17 @@ func BenchmarkTracerAddSpans(b *testing.B) {

// getTestTracer returns a Tracer with a DummyTransport
func getTestTracer() (*Tracer, *dummyTransport) {
pool, _ := newEncoderPool(MSGPACK_ENCODER, encoderPoolSize)
transport := &dummyTransport{pool: pool}
factory, _ := newEncoderFactory(msgpackType)
transport := &dummyTransport{encoderFactory: factory}
tracer := NewTracerTransport(transport)
return tracer, transport
}

// Mock Transport with a real Encoder
type dummyTransport struct {
pool *encoderPool
traces [][]*Span
services map[string]Service
encoderFactory *encoderFactory
traces [][]*Span
services map[string]Service

sync.RWMutex // required because of some poll-testing (eg: worker)
}
Expand All @@ -624,8 +624,7 @@ func (t *dummyTransport) SendTraces(traces [][]*Span) (*http.Response, error) {
t.traces = append(t.traces, traces...)
t.Unlock()

encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoder := t.encoderFactory.Get()
return nil, encoder.EncodeTraces(traces)
}

Expand All @@ -634,8 +633,7 @@ func (t *dummyTransport) SendServices(services map[string]Service) (*http.Respon
t.services = services
t.Unlock()

encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoder := t.encoderFactory.Get()
return nil, encoder.EncodeServices(services)
}

Expand Down
31 changes: 16 additions & 15 deletions tracer/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import (
const (
defaultHostname = "localhost"
defaultPort = "8126"
defaultEncoder = MSGPACK_ENCODER // defines the default encoder used when the Transport is initialized
legacyEncoder = JSON_ENCODER // defines the legacy encoder used with earlier agent versions
defaultEncoder = msgpackType // defines the default encoder used when the Transport is initialized
legacyEncoder = jsonType // defines the legacy encoder used with earlier agent versions
defaultHTTPTimeout = time.Second // defines the current timeout before giving up with the send process
encoderPoolSize = 5 // how many encoders are available
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
)

Expand Down Expand Up @@ -55,16 +54,23 @@ type httpTransport struct {
legacyTraceURL string // the legacy delivery URL for traces
serviceURL string // the delivery URL for services
legacyServiceURL string // the legacy delivery URL for services
pool *encoderPool // encoding allocates lot of buffers (which might then be resized) so we use a pool so they can be re-used
client *http.Client // the HTTP client used in the POST
headers map[string]string // the Transport headers
compatibilityMode bool // the Agent targets a legacy API for compatibility reasons

// [WARNING] We tried to reuse encoders thanks to a pool, but that led us to having race conditions.
// Indeed, when we send the encoder as the request body, the persistConn.writeLoop() goroutine
// can theoretically read the underlying buffer whereas the encoder has been returned to the pool.
// Since the underlying bytes.Buffer is not thread safe, this can make the app panicking.
// since this method will later on spawn a goroutine referencing this buffer.
// That's why we prefer the less performant yet SAFE implementation of allocating a new encoder every time we flush.
encoderFactory *encoderFactory
}

// newHTTPTransport returns an httpTransport for the given endpoint
func newHTTPTransport(hostname, port string) *httpTransport {
// initialize the default EncoderPool with Encoder headers
pool, contentType := newEncoderPool(defaultEncoder, encoderPoolSize)
encoderFactory, contentType := newEncoderFactory(defaultEncoder)
defaultHeaders := map[string]string{
"Content-Type": contentType,
"Datadog-Meta-Lang": ext.Lang,
Expand All @@ -78,7 +84,7 @@ func newHTTPTransport(hostname, port string) *httpTransport {
legacyTraceURL: fmt.Sprintf("http://%s:%s/v0.2/traces", hostname, port),
serviceURL: fmt.Sprintf("http://%s:%s/v0.3/services", hostname, port),
legacyServiceURL: fmt.Sprintf("http://%s:%s/v0.2/services", hostname, port),
pool: pool,
encoderFactory: encoderFactory,
client: &http.Client{
Timeout: defaultHTTPTimeout,
},
Expand All @@ -92,9 +98,7 @@ func (t *httpTransport) SendTraces(traces [][]*Span) (*http.Response, error) {
return nil, errors.New("provided an empty URL, giving up")
}

// borrow an encoder
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoder := t.encoderFactory.Get()

// encode the spans and return the error if any
err := encoder.EncodeTraces(traces)
Expand Down Expand Up @@ -135,9 +139,7 @@ func (t *httpTransport) SendServices(services map[string]Service) (*http.Respons
return nil, errors.New("provided an empty URL, giving up")
}

// Encode the service table
encoder := t.pool.Borrow()
defer t.pool.Return(encoder)
encoder := t.encoderFactory.Get()

if err := encoder.EncodeServices(services); err != nil {
return nil, err
Expand Down Expand Up @@ -180,9 +182,8 @@ func (t *httpTransport) SetHeader(key, value string) {
// changeEncoder switches the internal encoders pool so that a different API with different
// format can be targeted, preventing failures because of outdated agents
func (t *httpTransport) changeEncoder(encoderType int) {
pool, contentType := newEncoderPool(encoderType, encoderPoolSize)
t.pool = pool
t.headers["Content-Type"] = contentType
t.encoderFactory.EncoderType = encoderType
t.headers["Content-Type"] = contentType(encoderType)
}

// apiDowngrade downgrades the used encoder and API level. This method must fallback to a safe
Expand Down
6 changes: 3 additions & 3 deletions tracer/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,17 @@ func TestTransportEncoderPool(t *testing.T) {
transport := newHTTPTransport(defaultHostname, defaultPort)

// MsgpackEncoder is the default encoder of the pool
encoder := transport.pool.Borrow()
encoder := transport.encoderFactory.Get()
assert.Equal("application/msgpack", encoder.ContentType())
}

func TestTransportSwitchEncoder(t *testing.T) {
assert := assert.New(t)
transport := newHTTPTransport(defaultHostname, defaultPort)
transport.changeEncoder(JSON_ENCODER)
transport.changeEncoder(jsonType)

// MsgpackEncoder is the default encoder of the pool
encoder := transport.pool.Borrow()
encoder := transport.encoderFactory.Get()
contentType := transport.headers["Content-Type"]
assert.Equal("application/json", encoder.ContentType())
assert.Equal("application/json", contentType)
Expand Down

0 comments on commit 9cb2eca

Please sign in to comment.