Skip to content

Commit

Permalink
improved statsd
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Nov 25, 2018
1 parent 1f64cf1 commit 46e3566
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 26 deletions.
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -2,7 +2,7 @@ module github.com/emitter-io/emitter

require (
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 // indirect
github.com/aws/aws-sdk-go v0.0.0-20181019205654-b2427922671b // indirect
github.com/aws/aws-sdk-go v0.0.0-20181019205654-b2427922671b
github.com/axiomhq/hyperloglog v0.0.0-20180317131949-fe9507de0228
github.com/dgraph-io/badger v0.0.0-20181020042726-fbb27786246d
github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/broker/service.go
Expand Up @@ -149,7 +149,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error)
monitor.NewSelf(sampler, s.selfPublish),
monitor.NewNoop(),
monitor.NewHTTP(sampler),
monitor.NewStatsd(sampler),
monitor.NewStatsd(sampler, cfg.Addr().String()),
).(monitor.Storage)
logging.LogTarget("service", "configured monitoring sink", s.monitor.Name())

Expand Down
58 changes: 40 additions & 18 deletions internal/provider/monitor/statsd.go
Expand Up @@ -16,6 +16,7 @@ package monitor

import (
"context"
"strings"
"time"

"github.com/emitter-io/emitter/internal/async"
Expand All @@ -31,12 +32,14 @@ type Statsd struct {
reader stats.Snapshotter // The reader which reads the snapshot of stats.
client *statsd.Client // The statsd client to use.
cancel context.CancelFunc // The cancellation function.
nodeID string // The ID of the node for tagging.
}

// NewStatsd creates a new statsd sink.
func NewStatsd(snapshotter stats.Snapshotter) *Statsd {
func NewStatsd(snapshotter stats.Snapshotter, nodeID string) *Statsd {
return &Statsd{
reader: snapshotter,
nodeID: nodeID,
}
}

Expand Down Expand Up @@ -65,10 +68,13 @@ func (s *Statsd) Configure(config map[string]interface{}) (err error) {
}

// Create statsd client
if s.client, err = statsd.New(statsd.Address(url), statsd.Prefix("emitter")); err == nil {
if s.client, err = statsd.New(
statsd.Address(url),
statsd.Prefix("emitter"),
statsd.Tags("node", s.nodeID),
); err == nil {
s.cancel = async.Repeat(context.Background(), interval, s.write)
}

return
}

Expand All @@ -77,32 +83,48 @@ func (s *Statsd) write() {

// Create a snapshot and restore it straight away
snapshot := s.reader.Snapshot()
metrics, err := stats.Restore(snapshot)
m, err := stats.Restore(snapshot)
if err != nil {
return
}

// Send the node and process-level metrics through
metrics := m.ToMap()
s.gauge(metrics, "node.peers")
s.gauge(metrics, "node.conns")
s.gauge(metrics, "node.subs")

// Send everything to statsd
for _, v := range metrics {
q := v.Quantile(25, 50, 75, 90, 95, 99)
s.client.Gauge(v.Name()+".p25", q[0])
s.client.Gauge(v.Name()+".p50", q[1])
s.client.Gauge(v.Name()+".p75", q[2])
s.client.Gauge(v.Name()+".p90", q[3])
s.client.Gauge(v.Name()+".p95", q[4])
s.client.Gauge(v.Name()+".p99", q[5])
s.client.Gauge(v.Name()+".min", v.Min())
s.client.Gauge(v.Name()+".max", v.Max())
s.client.Gauge(v.Name()+".avg", v.Mean())
s.client.Gauge(v.Name()+".var", v.Variance())
s.client.Gauge(v.Name()+".stddev", v.StdDev())
s.client.Count(v.Name()+".count", v.Count())
for name := range metrics {
prefix := strings.Split(name, ".")[0]
switch prefix {
case "proc", "heap", " mcache", "mspan", "stack", "gc", "go":
s.gauge(metrics, name)
case "rcv", "send":
s.histogram(metrics, name)
}
}

// Flush the client as well
s.client.Flush()
}

// Gauge sends the metric as a gauge
func (s *Statsd) gauge(source map[string]stats.Snapshot, metric string) {
if v, ok := source[metric]; ok {
s.client.Gauge(metric, v.Max())
}
}

// Gauge sends the metric as a gauge
func (s *Statsd) histogram(source map[string]stats.Snapshot, metric string) {
if v, ok := source[metric]; ok {
for _, sample := range v.Sample {
s.client.Histogram(metric, sample)
}
}
}

// Close gracefully terminates the storage and ensures that every related
// resource is properly disposed.
func (s *Statsd) Close() error {
Expand Down
14 changes: 8 additions & 6 deletions internal/provider/monitor/statsd_test.go
Expand Up @@ -24,10 +24,12 @@ import (
func TestStatsd_HappyPath(t *testing.T) {
m := stats.New()
for i := int32(0); i < 100; i++ {
m.Measure("test.metric", i)
m.Measure("proc.test", i)
m.Measure("node.test", i)
m.Measure("rcv.test", i)
}

s := NewStatsd(m)
s := NewStatsd(m, "")
defer s.Close()

err := s.Configure(map[string]interface{}{
Expand All @@ -42,7 +44,7 @@ func TestStatsd_HappyPath(t *testing.T) {

func TestStatsd_BadSnapshot(t *testing.T) {
r := snapshot("test")
s := NewStatsd(r)
s := NewStatsd(r, "")
defer s.Close()

err := s.Configure(map[string]interface{}{
Expand All @@ -57,7 +59,7 @@ func TestStatsd_BadSnapshot(t *testing.T) {

func TestStatsd_Configure(t *testing.T) {
{
s := NewStatsd(nil)
s := NewStatsd(nil, "")
defer s.Close()
assert.Equal(t, "statsd", s.Name())

Expand All @@ -66,7 +68,7 @@ func TestStatsd_Configure(t *testing.T) {
}

{
s := NewStatsd(nil)
s := NewStatsd(nil, "")
defer s.Close()
assert.Equal(t, "statsd", s.Name())

Expand All @@ -75,7 +77,7 @@ func TestStatsd_Configure(t *testing.T) {
}

{
s := NewStatsd(nil)
s := NewStatsd(nil, "")
defer s.Close()
assert.Equal(t, "statsd", s.Name())

Expand Down

0 comments on commit 46e3566

Please sign in to comment.