Skip to content

Commit

Permalink
[xray] Add X-Ray client
Browse files Browse the repository at this point in the history
  • Loading branch information
evalphobia committed Aug 7, 2017
1 parent 38ad474 commit 7a0c8d0
Show file tree
Hide file tree
Showing 7 changed files with 690 additions and 0 deletions.
139 changes: 139 additions & 0 deletions xray/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package xray

import (
"net/http"
"time"

SDK "github.com/aws/aws-sdk-go/service/xray"

"github.com/evalphobia/aws-sdk-go-wrapper/config"
"github.com/evalphobia/aws-sdk-go-wrapper/log"
"github.com/evalphobia/aws-sdk-go-wrapper/private/errors"
"github.com/evalphobia/aws-sdk-go-wrapper/private/pointers"
)

const (
serviceName = "X-Ray"
)

// sampling all request and set sampling limit to 1000 req/s.
var defaultSamplingPolicy, _ = NewLimitedSampler(1, 1000)

// XRay has XRay client.
type XRay struct {
client *SDK.XRay
daemon *Daemon
sampling SamplingPolicy

logger log.Logger
prefix string
}

// New returns initialized *Kinesis.
func New(conf config.Config) (*XRay, error) {
sess, err := conf.Session()
if err != nil {
return nil, err
}

svc := &XRay{
client: SDK.New(sess),
logger: log.DefaultLogger,
prefix: conf.DefaultPrefix,
sampling: defaultSamplingPolicy,
}
return svc, nil
}

// SetLogger sets logger.
func (svc *XRay) SetLogger(logger log.Logger) {
svc.logger = logger
}

// SetSamplingPolicy sets sampling policy.
func (svc *XRay) SetSamplingPolicy(fraction, qps float64) error {
s, err := NewLimitedSampler(fraction, qps)
if err != nil {
svc.Errorf("error on SetSamplingPolicy; fraction=%f; qps=%f; error=%s;", fraction, qps, err.Error())
return err
}
svc.sampling = s
return nil
}

// AddSegment adds the segment dat into background daemon.
func (svc *XRay) AddSegment(segments ...*Segment) {
svc.daemon.Add(segments...)
}

// RunDaemon creates and runs background daemon.
func (svc *XRay) RunDaemon(size int, interval time.Duration) {
svc.daemon = NewDaemon(size, interval, svc.PutTraceSegments)
svc.daemon.Run()
}

// PutTraceSegments executes PutTraceSegments operation.
func (svc *XRay) PutTraceSegments(segments []*Segment) error {
if len(segments) == 0 {
return nil
}

list := make([]*string, len(segments))
for i, s := range segments {
if !s.Trace {
continue
}

byt, err := s.ToJSON()
if err != nil {
svc.Errorf("error on segment.ToJSON(); segment=%+v; error=%s;", s, err.Error())
continue
}
list[i] = pointers.String(string(byt))
}

notProcessed, err := svc.client.PutTraceSegments(&SDK.PutTraceSegmentsInput{
TraceSegmentDocuments: list,
})
if err != nil {
_list := make([]string, len(list))
for i, s := range list {
_list[i] = *s
}
svc.Errorf("error on `PutTraceSegments` operation; segments=%v; error=%s;", _list, err.Error())
}
_ = notProcessed // TODO
return err
}

// NewSegment creates new Segment data with given name.
func (svc *XRay) NewSegment(name string) *Segment {
s := NewSegment(name)
s.service = svc
return s
}

// NewSegmentFromRequest creates new Segment data from *http.Request.
func (svc *XRay) NewSegmentFromRequest(r *http.Request) *Segment {
if !svc.sampling.CanSample() {
return NewEmptySegment()
}

s := NewSegmentFromRequest(r)
s.service = svc
return s
}

// Infof logging information.
func (svc *XRay) Infof(format string, v ...interface{}) {
svc.logger.Infof(serviceName, format, v...)
}

// Errorf logging error information.
func (svc *XRay) Errorf(format string, v ...interface{}) {
svc.logger.Errorf(serviceName, format, v...)
}

func newErrors() *errors.Errors {
return errors.NewErrors(serviceName)
}
57 changes: 57 additions & 0 deletions xray/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package xray

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/evalphobia/aws-sdk-go-wrapper/config"
"github.com/evalphobia/aws-sdk-go-wrapper/log"
)

const defaultEndpoint = "http://localhost:9999"

func getTestConfig() config.Config {
return config.Config{
AccessKey: "access",
SecretKey: "secret",
Endpoint: defaultEndpoint,
}
}

func getTestClient(t *testing.T) *XRay {
svc, err := New(getTestConfig())
if err != nil {
t.Errorf("error on create client; error=%s;", err.Error())
t.FailNow()
}
return svc
}

func TestNew(t *testing.T) {
assert := assert.New(t)

svc, err := New(getTestConfig())
assert.NoError(err)
assert.NotNil(svc.client)
assert.Equal("xray", svc.client.ServiceName)
assert.Equal(defaultEndpoint, svc.client.Endpoint)

region := "us-west-1"
svc, err = New(config.Config{
Region: region,
})
assert.NoError(err)
expectedEndpoint := "https://xray." + region + ".amazonaws.com"
assert.Equal(expectedEndpoint, svc.client.Endpoint)
}

func TestSetLogger(t *testing.T) {
assert := assert.New(t)
svc := getTestClient(t)
assert.Equal(log.DefaultLogger, svc.logger)

stdLogger := &log.StdLogger{}
svc.SetLogger(stdLogger)
assert.Equal(stdLogger, svc.logger)
}
85 changes: 85 additions & 0 deletions xray/daemon.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package xray

import (
"sync"
"time"
)

// Daemon is background daemon for sending segments.
// This struct stores segments and sends segment to AWS X-Ray in each checkpoint timing.
type Daemon struct {
flushSegments func([]*Segment) error

spoolMu sync.Mutex
spool []*Segment
checkpointSize int
checkpointInterval time.Duration
stopSignal chan struct{}
}

// NewDaemon creates new Daemon.
// size is number of segments to send AWS API in single checkpoint.
// interval is the time of checkpoint interval.
// fn is function called in each checkpoint, to sends segments to AWS API.
func NewDaemon(size int, interval time.Duration, fn func([]*Segment) error) *Daemon {
if size < 1 {
size = 10
}
if interval == 0 {
interval = 1 * time.Second
}

return &Daemon{
spool: make([]*Segment, 0, 4096),
checkpointSize: size,
checkpointInterval: interval,
stopSignal: make(chan struct{}),
flushSegments: fn,
}
}

// Add adds segment data into daemon.
func (d *Daemon) Add(segments ...*Segment) {
d.spoolMu.Lock()
d.spool = append(d.spool, segments...)
d.spoolMu.Unlock()
}

// Flush gets segments from the internal spool and execute flushSegments function.
func (d *Daemon) Flush() {
d.spoolMu.Lock()
var segments []*Segment
segments, d.spool = shiftSegment(d.spool, d.checkpointSize)
d.spoolMu.Unlock()
d.flushSegments(segments)
}

// shiftSegment retrieves segments.
func shiftSegment(slice []*Segment, size int) (part []*Segment, all []*Segment) {
l := len(slice)
if l <= size {
return slice, slice[:0]
}
return slice[:size], slice[size:]
}

// Run sets timer to flush data in each checkpoint as a background daemon.
func (d *Daemon) Run() {
ticker := time.NewTicker(d.checkpointInterval)
go func() {
for {
select {
case <-ticker.C:
d.Flush()
case <-d.stopSignal:
ticker.Stop()
return
}
}
}()
}

// Stop stops daemon.
func (d *Daemon) Stop() {
d.stopSignal <- struct{}{}
}
105 changes: 105 additions & 0 deletions xray/sampling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package xray

import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"math/rand"
"sync"
"time"

"golang.org/x/time/rate"
)

// Most code of sampling are copied from GCP's stackdirver trace implements.
// https://github.com/GoogleCloudPlatform/google-cloud-go/blob/master/trace/sampling.go

// SamplingPolicy is policy to determine which data can send to AWS API or not, with rate limit function.
type SamplingPolicy interface {
// Sample returns a Decision.
// If Trace is false in the returned Decision, then the Decision should be
// the zero value.
Sample() Decision
CanSample() bool
}

// Decision is the value returned by a call to a SamplingPolicy's Sample method.
type Decision struct {
Trace bool // Whether to trace the request.
Policy string // Name of the sampling policy.
Weight float64 // Sample weight to be used in statistical calculations.
}

type sampler struct {
fraction float64
skipped float64
*rate.Limiter
*rand.Rand
sync.Mutex
}

func (s *sampler) Sample() Decision {
s.Lock()
x := s.Float64()
d := s.sample(time.Now(), x)
s.Unlock()
return d
}

func (s *sampler) CanSample() bool {
return s.Sample().Trace
}

// sample contains the a deterministic, time-independent logic of Sample.
func (s *sampler) sample(now time.Time, x float64) (d Decision) {
d.Trace = x < s.fraction
if !d.Trace {
// We have no reason to trace this request.
return Decision{}
}
// We test separately that the rate limit is not tiny before calling AllowN,
// because of overflow problems in x/time/rate.
if s.Limit() < 1e-9 || !s.AllowN(now, 1) {
// Rejected by the rate limit.
if d.Trace {
s.skipped++
}
return Decision{}
}
if d.Trace {
d.Policy, d.Weight = "default", (1.0+s.skipped)/s.fraction
s.skipped = 0.0
}
return
}

// NewLimitedSampler returns a sampling policy that randomly samples a given
// fraction of requests. It also enforces a limit on the number of traces per
// second. It tries to trace every request with a trace header, but will not
// exceed the qps limit to do it.
func NewLimitedSampler(fraction, maxqps float64) (SamplingPolicy, error) {
if !(fraction >= 0) {
return nil, fmt.Errorf("invalid fraction %f", fraction)
}
if !(maxqps >= 0) {
return nil, fmt.Errorf("invalid maxqps %f", maxqps)
}
// Set a limit on the number of accumulated "tokens", to limit bursts of
// traced requests. Use one more than a second's worth of tokens, or 100,
// whichever is smaller.
// See https://godoc.org/golang.org/x/time/rate#NewLimiter.
maxTokens := 100
if maxqps < 99.0 {
maxTokens = 1 + int(maxqps)
}
var seed int64
if err := binary.Read(crand.Reader, binary.LittleEndian, &seed); err != nil {
seed = time.Now().UnixNano()
}
s := sampler{
fraction: fraction,
Limiter: rate.NewLimiter(rate.Limit(maxqps), maxTokens),
Rand: rand.New(rand.NewSource(seed)),
}
return &s, nil
}
Loading

0 comments on commit 7a0c8d0

Please sign in to comment.