From 46e3566a7787850b59d9d07b5e59832d0c6d830e Mon Sep 17 00:00:00 2001 From: Roman Date: Sun, 25 Nov 2018 18:21:33 +0800 Subject: [PATCH] improved statsd --- go.mod | 2 +- internal/broker/service.go | 2 +- internal/provider/monitor/statsd.go | 58 ++++++++++++++++-------- internal/provider/monitor/statsd_test.go | 14 +++--- 4 files changed, 50 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index c6d80415..ed902308 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/emitter-io/emitter require ( github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 // indirect - github.com/aws/aws-sdk-go v0.0.0-20181019205654-b2427922671b // indirect + github.com/aws/aws-sdk-go v0.0.0-20181019205654-b2427922671b github.com/axiomhq/hyperloglog v0.0.0-20180317131949-fe9507de0228 github.com/dgraph-io/badger v0.0.0-20181020042726-fbb27786246d github.com/dgryski/go-farm v0.0.0-20180109070241-2de33835d102 // indirect diff --git a/internal/broker/service.go b/internal/broker/service.go index ecfe586c..751741bf 100644 --- a/internal/broker/service.go +++ b/internal/broker/service.go @@ -149,7 +149,7 @@ func NewService(ctx context.Context, cfg *config.Config) (s *Service, err error) monitor.NewSelf(sampler, s.selfPublish), monitor.NewNoop(), monitor.NewHTTP(sampler), - monitor.NewStatsd(sampler), + monitor.NewStatsd(sampler, cfg.Addr().String()), ).(monitor.Storage) logging.LogTarget("service", "configured monitoring sink", s.monitor.Name()) diff --git a/internal/provider/monitor/statsd.go b/internal/provider/monitor/statsd.go index 0ad7681a..7480c1c1 100644 --- a/internal/provider/monitor/statsd.go +++ b/internal/provider/monitor/statsd.go @@ -16,6 +16,7 @@ package monitor import ( "context" + "strings" "time" "github.com/emitter-io/emitter/internal/async" @@ -31,12 +32,14 @@ type Statsd struct { reader stats.Snapshotter // The reader which reads the snapshot of stats. client *statsd.Client // The statsd client to use. cancel context.CancelFunc // The cancellation function. + nodeID string // The ID of the node for tagging. } // NewStatsd creates a new statsd sink. -func NewStatsd(snapshotter stats.Snapshotter) *Statsd { +func NewStatsd(snapshotter stats.Snapshotter, nodeID string) *Statsd { return &Statsd{ reader: snapshotter, + nodeID: nodeID, } } @@ -65,10 +68,13 @@ func (s *Statsd) Configure(config map[string]interface{}) (err error) { } // Create statsd client - if s.client, err = statsd.New(statsd.Address(url), statsd.Prefix("emitter")); err == nil { + if s.client, err = statsd.New( + statsd.Address(url), + statsd.Prefix("emitter"), + statsd.Tags("node", s.nodeID), + ); err == nil { s.cancel = async.Repeat(context.Background(), interval, s.write) } - return } @@ -77,32 +83,48 @@ func (s *Statsd) write() { // Create a snapshot and restore it straight away snapshot := s.reader.Snapshot() - metrics, err := stats.Restore(snapshot) + m, err := stats.Restore(snapshot) if err != nil { return } + // Send the node and process-level metrics through + metrics := m.ToMap() + s.gauge(metrics, "node.peers") + s.gauge(metrics, "node.conns") + s.gauge(metrics, "node.subs") + // Send everything to statsd - for _, v := range metrics { - q := v.Quantile(25, 50, 75, 90, 95, 99) - s.client.Gauge(v.Name()+".p25", q[0]) - s.client.Gauge(v.Name()+".p50", q[1]) - s.client.Gauge(v.Name()+".p75", q[2]) - s.client.Gauge(v.Name()+".p90", q[3]) - s.client.Gauge(v.Name()+".p95", q[4]) - s.client.Gauge(v.Name()+".p99", q[5]) - s.client.Gauge(v.Name()+".min", v.Min()) - s.client.Gauge(v.Name()+".max", v.Max()) - s.client.Gauge(v.Name()+".avg", v.Mean()) - s.client.Gauge(v.Name()+".var", v.Variance()) - s.client.Gauge(v.Name()+".stddev", v.StdDev()) - s.client.Count(v.Name()+".count", v.Count()) + for name := range metrics { + prefix := strings.Split(name, ".")[0] + switch prefix { + case "proc", "heap", " mcache", "mspan", "stack", "gc", "go": + s.gauge(metrics, name) + case "rcv", "send": + s.histogram(metrics, name) + } } // Flush the client as well s.client.Flush() } +// Gauge sends the metric as a gauge +func (s *Statsd) gauge(source map[string]stats.Snapshot, metric string) { + if v, ok := source[metric]; ok { + s.client.Gauge(metric, v.Max()) + } +} + +// Gauge sends the metric as a gauge +func (s *Statsd) histogram(source map[string]stats.Snapshot, metric string) { + if v, ok := source[metric]; ok { + for _, sample := range v.Sample { + s.client.Histogram(metric, sample) + } + } +} + // Close gracefully terminates the storage and ensures that every related // resource is properly disposed. func (s *Statsd) Close() error { diff --git a/internal/provider/monitor/statsd_test.go b/internal/provider/monitor/statsd_test.go index 60c0f62a..c27308af 100644 --- a/internal/provider/monitor/statsd_test.go +++ b/internal/provider/monitor/statsd_test.go @@ -24,10 +24,12 @@ import ( func TestStatsd_HappyPath(t *testing.T) { m := stats.New() for i := int32(0); i < 100; i++ { - m.Measure("test.metric", i) + m.Measure("proc.test", i) + m.Measure("node.test", i) + m.Measure("rcv.test", i) } - s := NewStatsd(m) + s := NewStatsd(m, "") defer s.Close() err := s.Configure(map[string]interface{}{ @@ -42,7 +44,7 @@ func TestStatsd_HappyPath(t *testing.T) { func TestStatsd_BadSnapshot(t *testing.T) { r := snapshot("test") - s := NewStatsd(r) + s := NewStatsd(r, "") defer s.Close() err := s.Configure(map[string]interface{}{ @@ -57,7 +59,7 @@ func TestStatsd_BadSnapshot(t *testing.T) { func TestStatsd_Configure(t *testing.T) { { - s := NewStatsd(nil) + s := NewStatsd(nil, "") defer s.Close() assert.Equal(t, "statsd", s.Name()) @@ -66,7 +68,7 @@ func TestStatsd_Configure(t *testing.T) { } { - s := NewStatsd(nil) + s := NewStatsd(nil, "") defer s.Close() assert.Equal(t, "statsd", s.Name()) @@ -75,7 +77,7 @@ func TestStatsd_Configure(t *testing.T) { } { - s := NewStatsd(nil) + s := NewStatsd(nil, "") defer s.Close() assert.Equal(t, "statsd", s.Name())