Skip to content

Commit

Permalink
Merge pull request #21 from palazzem/api-msgpack
Browse files Browse the repository at this point in the history
[core] add support for Msgpack encoder
  • Loading branch information
Emanuele Palazzetti committed Dec 9, 2016
2 parents 004a82f + 753cad3 commit bd99bf2
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 101 deletions.
2 changes: 2 additions & 0 deletions tracer/contrib/gin-gonic/gintrace/gintrace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,5 @@ func (t *dummyTransport) Traces() [][]*tracer.Span {
t.traces = nil
return traces
}

func (t *dummyTransport) SetHeader(key, value string) {}
2 changes: 2 additions & 0 deletions tracer/contrib/gorilla/muxtrace/muxtrace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,5 @@ func (t *dummyTransport) Traces() [][]*tracer.Span {
t.traces = nil
return traces
}

func (t *dummyTransport) SetHeader(key, value string) {}
2 changes: 2 additions & 0 deletions tracer/contrib/tracegrpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,5 @@ func (t *dummyTransport) Traces() [][]*tracer.Span {
t.traces = nil
return traces
}

func (t *dummyTransport) SetHeader(key, value string) {}
123 changes: 99 additions & 24 deletions tracer/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,138 @@ package tracer
import (
"bytes"
"encoding/json"

"github.com/ugorji/go/codec"
)

// jsonEncoder encodes a list of spans in JSON format.
// Encoder is a generic interface that expects an Encode() method
// for the encoding process, and a Read() method that will be used
// by the http handler
type Encoder interface {
Encode(traces [][]*Span) error
Read(p []byte) (int, error)
ContentType() string
}

var mh codec.MsgpackHandle

// msgpackEncoder encodes a list of traces in Msgpack format
type msgpackEncoder struct {
buffer *bytes.Buffer
encoder *codec.Encoder
contentType string
}

func newMsgpackEncoder() *msgpackEncoder {
buffer := &bytes.Buffer{}
encoder := codec.NewEncoder(buffer, &mh)

return &msgpackEncoder{
buffer: buffer,
encoder: encoder,
contentType: "application/msgpack",
}
}

// Encode serializes the given traces list into the internal
// buffer, returning the error if any
func (e *msgpackEncoder) Encode(traces [][]*Span) error {
e.buffer.Reset()
return e.encoder.Encode(traces)
}

// Read values from the internal buffer
func (e *msgpackEncoder) Read(p []byte) (int, error) {
return e.buffer.Read(p)
}

// ContentType return the msgpackEncoder content-type
func (e *msgpackEncoder) ContentType() string {
return e.contentType
}

// jsonEncoder encodes a list of traces in JSON format
type jsonEncoder struct {
j *json.Encoder // the JSON encoder
b *bytes.Buffer // the reusable buffer
buffer *bytes.Buffer
encoder *json.Encoder
contentType string
}

// newJSONEncoder returns a new encoder for the JSON format.
func newJSONEncoder() *jsonEncoder {
b := &bytes.Buffer{}
j := json.NewEncoder(b)
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)

return &jsonEncoder{
j: j,
b: b,
buffer: buffer,
encoder: encoder,
contentType: "application/json",
}
}

// Encode returns a byte array related to the marshalling
// of a list of spans. It resets the JSONEncoder internal buffer
// and proceeds with the encoding.
// Encode serializes the given traces list into the internal
// buffer, returning the error if any
func (e *jsonEncoder) Encode(traces [][]*Span) error {
e.b.Reset()
return e.j.Encode(traces)
e.buffer.Reset()
return e.encoder.Encode(traces)
}

// Read values from the internal buffer
func (e *jsonEncoder) Read(p []byte) (int, error) {
return e.b.Read(p)
return e.buffer.Read(p)
}

// ContentType return the jsonEncoder content-type
func (e *jsonEncoder) ContentType() string {
return e.contentType
}

// encoderPool is a pool meant to share the buffers required to encode traces.
const (
JSON_ENCODER = iota
MSGPACK_ENCODER
)

// EncoderPool is a pool meant to share the buffers required to encode traces.
// It naively tries to cap the number of active encoders, but doesn't enforce
// the limit.
// the limit. To use a pool, you should Borrow() for an encoder and then
// Return() that encoder to the pool. Encoders in that pool should honor
// the Encoder interface.
type encoderPool struct {
pool chan *jsonEncoder
encoderType int
pool chan Encoder
}

func newEncoderPool(size int) *encoderPool {
return &encoderPool{pool: make(chan *jsonEncoder, size)}
func newEncoderPool(encoderType, size int) (*encoderPool, string) {
pool := &encoderPool{
encoderType: encoderType,
pool: make(chan Encoder, size),
}

// Borrow an encoder to retrieve the default ContentType
encoder := pool.Borrow()
pool.Return(encoder)

contentType := encoder.ContentType()
return pool, contentType
}

// Borrow returns an available encoders or creates a new one
func (p *encoderPool) Borrow() *jsonEncoder {
var encoder *jsonEncoder
func (p *encoderPool) Borrow() Encoder {
var encoder Encoder

select {
case encoder = <-p.pool:
default:
encoder = newJSONEncoder()
switch p.encoderType {
case JSON_ENCODER:
encoder = newJSONEncoder()
case MSGPACK_ENCODER:
encoder = newMsgpackEncoder()
}
}
return encoder
}

// Return is called when from the code an Encoder is released in the pool.
func (p *encoderPool) Return(e *jsonEncoder) {
func (p *encoderPool) Return(e Encoder) {
select {
case p.pool <- e:
default:
Expand Down
177 changes: 118 additions & 59 deletions tracer/encoder_test.go
Original file line number Diff line number Diff line change
@@ -1,100 +1,159 @@
package tracer

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/ugorji/go/codec"
)

func TestJSONEncoder(t *testing.T) {
func TestEncoderContentType(t *testing.T) {
assert := assert.New(t)

// create a traces list with a single span
var traces [][]*Span
var spans []*Span
span := NewSpan("pylons.request", "pylons", "/", 0, 0, 0, nil)
span.Start = 0
spans = append(spans, span)
traces = append(traces, spans)

// the encoder must return a valid JSON byte array that ends with a \n
want := `[[{"name":"pylons.request","service":"pylons","resource":"/","type":"","start":0,"duration":0,"span_id":0,"trace_id":0,"parent_id":0,"error":0}]]`
want += "\n"

encoder := newJSONEncoder()
err := encoder.Encode(traces)
assert.Nil(err)
assert.Equal(want, encoder.b.String())
testCases := []struct {
encoder Encoder
contentType string
}{
{newJSONEncoder(), "application/json"},
{newMsgpackEncoder(), "application/msgpack"},
}

for _, tc := range testCases {
assert.Equal(tc.contentType, tc.encoder.ContentType())
}
}

func TestJSONRead(t *testing.T) {
func TestJSONEncoding(t *testing.T) {
assert := assert.New(t)

// create a traces list with a single span
var traces [][]*Span
var spans []*Span
span := NewSpan("pylons.request", "pylons", "/", 0, 0, 0, nil)
span.Start = 0
spans = append(spans, span)
traces = append(traces, spans)

// fill the encoder internal buffer
encoder := newJSONEncoder()
_ = encoder.Encode(traces)
expectedSize := encoder.b.Len()

// the Read function must be used to get the value of the internal buffer
buff := make([]byte, expectedSize)
_, err := encoder.Read(buff)

// it should match the encoding payload
want := `[[{"name":"pylons.request","service":"pylons","resource":"/","type":"","start":0,"duration":0,"span_id":0,"trace_id":0,"parent_id":0,"error":0}]]`
want += "\n"
assert.Nil(err)
assert.Equal(want, string(buff))
testCases := []struct {
traces int
size int
}{
{1, 1},
{3, 1},
{1, 3},
{3, 3},
}

for _, tc := range testCases {
payload := getTestTrace(tc.traces, tc.size)
encoder := newJSONEncoder()
err := encoder.Encode(payload)
assert.Nil(err)

// decode to check the right encoding
var traces [][]*Span
dec := json.NewDecoder(encoder.buffer)
err = dec.Decode(&traces)
assert.Nil(err)
assert.Len(traces, tc.traces)

for _, trace := range traces {
assert.Len(trace, tc.size)
span := trace[0]
assert.Equal(uint64(42), span.TraceID)
assert.Equal(uint64(52), span.SpanID)
assert.Equal(uint64(42), span.ParentID)
assert.Equal("web", span.Type)
assert.Equal("high.throughput", span.Service)
assert.Equal("sending.events", span.Name)
assert.Equal("SEND /data", span.Resource)
assert.Equal(int64(1481215590883401105), span.Start)
assert.Equal(int64(1000000000), span.Duration)
assert.Equal("192.168.0.1", span.Meta["http.host"])
assert.Equal(float64(41.99), span.Metrics["http.monitor"])
}
}
}

func TestPoolBorrowCreate(t *testing.T) {
func TestMsgpackEncoding(t *testing.T) {
assert := assert.New(t)

// borrow an encoder from the pool
pool := newEncoderPool(1)
encoder := pool.Borrow()
assert.NotNil(encoder)
testCases := []struct {
traces int
size int
}{
{1, 1},
{3, 1},
{1, 3},
{3, 3},
}

for _, tc := range testCases {
payload := getTestTrace(tc.traces, tc.size)
encoder := newMsgpackEncoder()
err := encoder.Encode(payload)
assert.Nil(err)

// decode to check the right encoding
var traces [][]*Span
var mh codec.MsgpackHandle
dec := codec.NewDecoder(encoder.buffer, &mh)
err = dec.Decode(&traces)
assert.Nil(err)
assert.Len(traces, tc.traces)

for _, trace := range traces {
assert.Len(trace, tc.size)
span := trace[0]
assert.Equal(uint64(42), span.TraceID)
assert.Equal(uint64(52), span.SpanID)
assert.Equal(uint64(42), span.ParentID)
assert.Equal("web", span.Type)
assert.Equal("high.throughput", span.Service)
assert.Equal("sending.events", span.Name)
assert.Equal("SEND /data", span.Resource)
assert.Equal(int64(1481215590883401105), span.Start)
assert.Equal(int64(1000000000), span.Duration)
assert.Equal("192.168.0.1", span.Meta["http.host"])
assert.Equal(float64(41.99), span.Metrics["http.monitor"])
}
}
}

func TestPoolReturn(t *testing.T) {
func TestPoolBorrowCreate(t *testing.T) {
assert := assert.New(t)

// an encoder can return in the pool
pool := newEncoderPool(1)
encoder := newJSONEncoder()
pool.pool <- encoder
pool.Return(encoder)

// the encoder is the one we get before
returnedEncoder := <-pool.pool
assert.Equal(returnedEncoder, encoder)
// borrow an encoder from the pool
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := pool.Borrow()
assert.NotNil(encoder)
}

func TestPoolReuseEncoder(t *testing.T) {
assert := assert.New(t)

// borrow, return and borrow again an encoder from the pool
pool := newEncoderPool(1)
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := pool.Borrow()
pool.Return(encoder)
anotherEncoder := pool.Borrow()
assert.Equal(anotherEncoder, encoder)
}

func TestPoolSize(t *testing.T) {
pool := newEncoderPool(1)
encoder := newJSONEncoder()
anotherEncoder := newJSONEncoder()
pool, _ := newEncoderPool(MSGPACK_ENCODER, 1)
encoder := newMsgpackEncoder()
anotherEncoder := newMsgpackEncoder()

// put two encoders in the pool with a maximum size of 1
// doesn't hang the caller
pool.Return(encoder)
pool.Return(anotherEncoder)
}

func TestPoolReturn(t *testing.T) {
assert := assert.New(t)

// an encoder can return in the pool
pool, _ := newEncoderPool(MSGPACK_ENCODER, 5)
encoder := newMsgpackEncoder()
pool.pool <- encoder
pool.Return(encoder)

// the encoder is the one we get before
returnedEncoder := <-pool.pool
assert.Equal(returnedEncoder, encoder)
}
Loading

0 comments on commit bd99bf2

Please sign in to comment.