diff --git a/CHANGELOG.md b/CHANGELOG.md index e9818321..4683a31f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +28.0.0 +------ +- Internal pipeline refactor removes two internal metrics + 27.0.0 ------ - Simplify internal tracking of the source address of a metric. The current rules are: diff --git a/METRICS.md b/METRICS.md index d419484d..6f271ae9 100644 --- a/METRICS.md +++ b/METRICS.md @@ -16,7 +16,6 @@ Metrics: | Name | type | tags | description | ------------------------------------------- | ------------------- | ---------------------------- | ----------- -| aggregator.metrics_received | gauge (flush) | aggregator_id | The number of datapoints received during the flush interval | aggregator.metricmaps_received | gauge (flush) | aggregator_id | The number of datapoint batches received during the flush interval | aggregator.aggregation_time | gauge (time) | aggregator_id | The time taken (in ms) to aggregate all counter and timer | | | | datapoints in this flush interval @@ -84,7 +83,6 @@ the samples are reset. | Channel name | Additional tags | Description | ------------------------- | --------------- | ----------- -| dispatch_aggregator_batch | aggregator_id | Channel to dispatch metrics to a specific aggregator. | dispatch_aggregator_map | aggregator_id | Channel to dispatch metric maps to a given aggregator. | backend_events_sem | | Semaphore limiting the number of events in flight at once. Corresponds to | | | the `--max-concurrent-events` flag. diff --git a/events.go b/events.go index a0ced8e9..5ffbbd4a 100644 --- a/events.go +++ b/events.go @@ -92,5 +92,10 @@ type Event struct { AlertType AlertType } +func (e *Event) AddTagsSetSource(additionalTags Tags, newSource Source) { + e.Tags = e.Tags.Concat(additionalTags) + e.Source = newSource +} + // Events represents a list of events. -type Events []Event +type Events []*Event diff --git a/fixtures_test.go b/fixtures_test.go index 1730d357..5cc77268 100644 --- a/fixtures_test.go +++ b/fixtures_test.go @@ -2,7 +2,6 @@ package gostatsd import ( "context" - "sync" "testing" "time" @@ -24,44 +23,3 @@ func testContext(t *testing.T) (context.Context, func()) { }() return ctxTest, completeTest } - -type capturingHandler struct { - mu sync.Mutex - m []*Metric - e []*Event -} - -func (ch *capturingHandler) EstimatedTags() int { - return 0 -} - -func (ch *capturingHandler) DispatchMetrics(ctx context.Context, metrics []*Metric) { - ch.mu.Lock() - defer ch.mu.Unlock() - for _, m := range metrics { - m.DoneFunc = nil // Clear DoneFunc because it contains non-predictable variable data which interferes with the tests - ch.m = append(ch.m, m) - } -} - -// DispatchMetricMap re-dispatches a metric map through capturingHandler.DispatchMetrics -func (ch *capturingHandler) DispatchMetricMap(ctx context.Context, mm *MetricMap) { - mm.DispatchMetrics(ctx, ch) -} - -func (ch *capturingHandler) DispatchEvent(ctx context.Context, e *Event) { - ch.mu.Lock() - defer ch.mu.Unlock() - ch.e = append(ch.e, e) -} - -func (ch *capturingHandler) WaitForEvents() { -} - -func (ch *capturingHandler) GetMetrics() []*Metric { - ch.mu.Lock() - defer ch.mu.Unlock() - m := make([]*Metric, len(ch.m)) - copy(m, ch.m) - return m -} diff --git a/internal/fixtures/metrics.go b/internal/fixtures/metrics.go new file mode 100644 index 00000000..39fcc96b --- /dev/null +++ b/internal/fixtures/metrics.go @@ -0,0 +1,80 @@ +package fixtures + +import ( + "fmt" + + "github.com/atlassian/gostatsd" +) + +type MetricOpt func(m *gostatsd.Metric) + +// MakeMetric provides a way to build a metric for tests. Hopefully over +// time this will be used more, bringing more consistency to tests. +func MakeMetric(opts ...MetricOpt) *gostatsd.Metric { + m := &gostatsd.Metric{ + Type: gostatsd.COUNTER, + Name: "name", + Rate: 1, + Tags: gostatsd.Tags{ + "foo:bar", + "host:baz", + }, + Source: "baz", + } + for _, opt := range opts { + opt(m) + } + return m +} + +func Name(n string) MetricOpt { + return func(m *gostatsd.Metric) { + m.Name = n + } +} + +func AddTag(t ...string) MetricOpt { + return func(m *gostatsd.Metric) { + m.Tags = append(m.Tags, t...) + } +} + +func DropSource(m *gostatsd.Metric) { + m.Source = gostatsd.UnknownSource +} + +func DropTag(t string) MetricOpt { + return func(m *gostatsd.Metric) { + next := 0 + found := false + for _, tag := range m.Tags { + if t == tag { + found = true + m.Tags[next] = tag + next++ + } + } + if !found { + panic(fmt.Sprintf("failed to find tag %s while building metric", t)) + } + m.Tags = m.Tags[:next] + } +} + +// SortCompare func for metrics so they can be compared with require.EqualValues +// Invoke with sort.Slice(x, SortCompare(x)) +func SortCompare(ms []*gostatsd.Metric) func(i, j int) bool { + return func(i, j int) bool { + if ms[i].Name == ms[j].Name { + if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data + if ms[i].Type == gostatsd.SET { + return ms[i].StringValue < ms[j].StringValue + } else { + return ms[i].Value < ms[j].Value + } + } + return len(ms[i].Tags) < len(ms[j].Tags) + } + return ms[i].Name < ms[j].Name + } +} diff --git a/metric_consolidator.go b/metric_consolidator.go index 792b4949..6a9ad2fa 100644 --- a/metric_consolidator.go +++ b/metric_consolidator.go @@ -7,8 +7,9 @@ import ( "github.com/tilinna/clock" ) -// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and send the slice to the provided -// channel. Run can be started in a long running goroutine to perform flushing, or Flush can be called externally. +// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and either send the slice to +// the provided channel, or make them available synchronously through Drain/Fill. Run can also be started in a long +// running goroutine to perform flushing, or Flush can be called externally to trigger the channel send. // // Used to consolidate metrics such as: // - counter[name=x, value=1] @@ -30,9 +31,7 @@ type MetricConsolidator struct { func NewMetricConsolidator(spots int, flushInterval time.Duration, sink chan<- []*MetricMap) *MetricConsolidator { mc := &MetricConsolidator{} mc.maps = make(chan *MetricMap, spots) - for i := 0; i < spots; i++ { - mc.maps <- NewMetricMap() - } + mc.Fill() mc.flushInterval = flushInterval mc.sink = sink return mc @@ -52,10 +51,11 @@ func (mc *MetricConsolidator) Run(ctx context.Context) { } } -// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then -// create new MetricMaps for new metrics to land in. Not thread-safe. -func (mc *MetricConsolidator) Flush(ctx context.Context) { - var mms []*MetricMap +// Drain will collect all the MetricMaps in the MetricConsolidator and return them. If the +// context.Context is canceled before everything can be collected, they are returned to the +// MetricConsolidator and nil is returned. +func (mc *MetricConsolidator) Drain(ctx context.Context) []*MetricMap { + mms := make([]*MetricMap, 0, cap(mc.maps)) for i := 0; i < cap(mc.maps); i++ { select { case mm := <-mc.maps: @@ -66,9 +66,19 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) { for _, mm := range mms { mc.maps <- mm } - return + return nil } } + return mms +} + +// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then +// create new MetricMaps for new metrics to land in. Not thread-safe. +func (mc *MetricConsolidator) Flush(ctx context.Context) { + mms := mc.Drain(ctx) + if mms == nil { + return + } // Send the collected data to the sink before putting new maps in place. This allows back-pressure // to propagate through the system, if the sink can't keep up. @@ -77,6 +87,12 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) { case <-ctx.Done(): } + mc.Fill() +} + +// Fill re-populates the MetricConsolidator with empty MetricMaps, it is the pair to Drain and +// must be called after a successful Drain, must not be called after a failed Drain. +func (mc *MetricConsolidator) Fill() { for i := 0; i < cap(mc.maps); i++ { mc.maps <- NewMetricMap() } diff --git a/metric_map.go b/metric_map.go index ffdf61ef..e42e04be 100644 --- a/metric_map.go +++ b/metric_map.go @@ -2,7 +2,6 @@ package gostatsd import ( "bytes" - "context" "fmt" "strings" @@ -46,6 +45,14 @@ func (mm *MetricMap) Receive(m *Metric) { m.Done() } +func MergeMaps(mms []*MetricMap) *MetricMap { + mm := NewMetricMap() + for _, mmFrom := range mms { + mm.Merge(mmFrom) + } + return mm +} + func (mm *MetricMap) Merge(mmFrom *MetricMap) { mmFrom.Counters.Each(func(metricName string, tagsKey string, counterFrom Counter) { v, ok := mm.Counters[metricName] @@ -359,8 +366,8 @@ func (mm *MetricMap) String() string { return buf.String() } -// DispatchMetrics will synthesize Metrics from the MetricMap and push them to the supplied PipelineHandler -func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandler) { +// AsMetrics will synthesize Metrics from the MetricMap and return them as a slice +func (mm *MetricMap) AsMetrics() []*Metric { var metrics []*Metric mm.Counters.Each(func(metricName string, tagsKey string, c Counter) { @@ -426,5 +433,5 @@ func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandl } }) - handler.DispatchMetrics(ctx, metrics) + return metrics } diff --git a/metric_map_test.go b/metric_map_test.go index e5c14aa8..62e632a0 100644 --- a/metric_map_test.go +++ b/metric_map_test.go @@ -143,17 +143,13 @@ func BenchmarkReceives(b *testing.B) { } func TestMetricMapDispatch(t *testing.T) { - ctx, done := testContext(t) - defer done() - mm := NewMetricMap() metrics := metricsFixtures() for _, metric := range metrics { mm.Receive(metric) } - ch := &capturingHandler{} - mm.DispatchMetrics(ctx, ch) + actual := mm.AsMetrics() expected := []*Metric{ {Name: "abc.def.g", Value: 3, Rate: 1, Type: GAUGE, Timestamp: 10}, @@ -173,28 +169,27 @@ func TestMetricMapDispatch(t *testing.T) { {Name: "uniq.usr", StringValue: "john", Rate: 1, TagsKey: "baz,foo:bar", Tags: Tags{"baz", "foo:bar"}, Type: SET, Timestamp: 10}, } - cmpSort := func(slice []*Metric) func(i, j int) bool { - return func(i, j int) bool { - if slice[i].Name == slice[j].Name { - if len(slice[i].Tags) == len(slice[j].Tags) { // This is not exactly accurate, but close enough with our data - if slice[i].Type == SET { - return slice[i].StringValue < slice[j].StringValue - } else { - return slice[i].Value < slice[j].Value - } + sort.Slice(actual, SortCompare(actual)) + sort.Slice(expected, SortCompare(expected)) + + require.EqualValues(t, expected, actual) +} + +// Copied from internal/fixtures because dependency loops +func SortCompare(ms []*Metric) func(i, j int) bool { + return func(i, j int) bool { + if ms[i].Name == ms[j].Name { + if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data + if ms[i].Type == SET { + return ms[i].StringValue < ms[j].StringValue + } else { + return ms[i].Value < ms[j].Value } - return len(slice[i].Tags) < len(slice[j].Tags) } - return slice[i].Name < slice[j].Name + return len(ms[i].Tags) < len(ms[j].Tags) } + return ms[i].Name < ms[j].Name } - - actual := ch.GetMetrics() - - sort.Slice(actual, cmpSort(actual)) - sort.Slice(expected, cmpSort(expected)) - - require.EqualValues(t, expected, actual) } func TestMetricMapMerge(t *testing.T) { diff --git a/metrics.go b/metrics.go index 374a2561..af37de6e 100644 --- a/metrics.go +++ b/metrics.go @@ -53,6 +53,17 @@ type Metric struct { DoneFunc func() // Returns the metric to the pool. May be nil. Call Metric.Done(), not this. } +func (m *Metric) AddTagsSetSource(additionalTags Tags, newSource Source) { + if len(additionalTags) > 0 { + m.Tags = m.Tags.Concat(additionalTags) + m.TagsKey = "" + } + if newSource != m.Source { + m.Source = newSource + m.TagsKey = "" + } +} + // Reset is used to reset a metric to as clean state, called on re-use from the pool. func (m *Metric) Reset() { m.Name = "" diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 00000000..651cd509 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,55 @@ +package gostatsd + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricReset(t *testing.T) { + // this is deliberately constructed without using named fields, + // so that if the fields change it will cause a compiler error. + m := &Metric{ + "metric", + 10, + 1, + Tags{"tag"}, + "something", + "somethingelse", + "source", + 123, + COUNTER, + nil, + } + m.Reset() + // Tags needs to be an empty slice, not a nil slice, because half the reason + // behind having the MetricPool and Reset is to re-use the Tags slice. Therefore + // we don't nil it. + require.EqualValues(t, &Metric{Tags: Tags{}, Rate: 1}, m) +} + +func TestMetricString(t *testing.T) { + types := []MetricType{COUNTER, TIMER, SET, GAUGE, 42} + names := []string{"counter", "timer", "set", "gauge", "unknown"} + for idx, name := range names { + require.Equal(t, name, types[idx].String()) + } +} + +func TestUpdateTags(t *testing.T) { + m := &Metric{} + m.FormatTagsKey() + require.EqualValues(t, "", m.TagsKey) + m.AddTagsSetSource(Tags{"foo"}, "source") + require.EqualValues(t, Tags{"foo"}, m.Tags) + require.EqualValues(t, "source", m.Source) + require.EqualValues(t, "", m.TagsKey) + m.FormatTagsKey() + require.EqualValues(t, "foo,s:source", m.TagsKey) // It's set + m.AddTagsSetSource(Tags{"foo2"}, "source2") + require.EqualValues(t, Tags{"foo", "foo2"}, m.Tags) + require.EqualValues(t, "source2", m.Source) + require.EqualValues(t, "", m.TagsKey) // It's cleared + m.FormatTagsKey() + require.EqualValues(t, "foo,foo2,s:source2", m.TagsKey) +} diff --git a/pkg/stats/flush_notifier.go b/pkg/stats/flush_notifier.go index 09e881a8..356a2645 100644 --- a/pkg/stats/flush_notifier.go +++ b/pkg/stats/flush_notifier.go @@ -1,6 +1,7 @@ package stats import ( + "context" "sync" "time" ) @@ -35,7 +36,7 @@ func (fn *flushNotifier) RegisterFlush() (ch <-chan time.Duration, unregister fu // NotifyFlush will notify any registered channels that a flush has completed. // Non-blocking, thread-safe. -func (fn *flushNotifier) NotifyFlush(d time.Duration) { +func (fn *flushNotifier) NotifyFlush(ctx context.Context, d time.Duration) { fn.lock.RLock() defer fn.lock.RUnlock() for _, hook := range fn.flushTargets { diff --git a/pkg/stats/flush_notifier_test.go b/pkg/stats/flush_notifier_test.go index ad1271d4..f0b7c783 100644 --- a/pkg/stats/flush_notifier_test.go +++ b/pkg/stats/flush_notifier_test.go @@ -1,6 +1,7 @@ package stats import ( + "context" "testing" "time" @@ -68,7 +69,7 @@ func TestFlushNotifierFires(t *testing.T) { go func() { // Just enough to be certain that the select below is ready. time.Sleep(10 * time.Millisecond) - fn.NotifyFlush(0) + fn.NotifyFlush(context.Background(), 0) }() ticker := time.NewTicker(100 * time.Millisecond) @@ -91,7 +92,7 @@ func TestFlushNotifierDoesNotBlock(t *testing.T) { _, unregister := fn.RegisterFlush() deadline := time.Now().Add(10 * time.Millisecond) - fn.NotifyFlush(0) + fn.NotifyFlush(context.Background(), 0) assert.Truef(t, time.Now().Before(deadline), "NotifyFlush ran too long") unregister() diff --git a/pkg/stats/statser.go b/pkg/stats/statser.go index 2b94829b..f841befb 100644 --- a/pkg/stats/statser.go +++ b/pkg/stats/statser.go @@ -1,6 +1,7 @@ package stats import ( + "context" "time" "github.com/atlassian/gostatsd" @@ -9,7 +10,7 @@ import ( // Statser is the interface for sending metrics type Statser interface { // NotifyFlush is called when a flush occurs. It signals all known subscribers. - NotifyFlush(d time.Duration) + NotifyFlush(ctx context.Context, d time.Duration) // RegisterFlush returns a channel which will receive a notification after every flush, and a cleanup // function which should be called to signal the channel is no longer being monitored. If the channel // blocks, the notification will be silently dropped. diff --git a/pkg/stats/statser_internal.go b/pkg/stats/statser_internal.go index e23d61f5..627f2a00 100644 --- a/pkg/stats/statser_internal.go +++ b/pkg/stats/statser_internal.go @@ -2,7 +2,6 @@ package stats import ( "context" - "sync/atomic" "time" "github.com/atlassian/gostatsd" @@ -19,16 +18,13 @@ import ( type InternalStatser struct { flushNotifier - buffer chan *gostatsd.Metric - tags gostatsd.Tags namespace string hostname gostatsd.Source handler gostatsd.PipelineHandler - dropped uint64 -} -const bufferSize = 1000 // estimating this is difficult and tends to cause problems if too small + consolidator *gostatsd.MetricConsolidator +} // NewInternalStatser creates a new Statser which sends metrics to the // supplied InternalHandler. @@ -37,30 +33,26 @@ func NewInternalStatser(tags gostatsd.Tags, namespace string, hostname gostatsd. tags = tags.Concat(gostatsd.Tags{"host:" + string(hostname)}) } return &InternalStatser{ - buffer: make(chan *gostatsd.Metric, bufferSize), tags: tags, namespace: namespace, hostname: hostname, handler: handler, + // We can't just use a MetricMap because everything + // that writes to it is on its own goroutine. + consolidator: gostatsd.NewMetricConsolidator(10, 0, nil), } } -// Run will pull internal metrics off a small buffer, and dispatch them. It -// stops running when the context is closed. -func (is *InternalStatser) Run(ctx context.Context) { - flushed, unregister := is.RegisterFlush() - defer unregister() - - for { - select { - case <-ctx.Done(): - return - case m := <-is.buffer: - is.dispatchMetric(ctx, m) - case <-flushed: - is.Gauge("internal_dropped", float64(atomic.LoadUint64(&is.dropped)), nil) - } +func (is *InternalStatser) NotifyFlush(ctx context.Context, d time.Duration) { + mms := is.consolidator.Drain(ctx) + if mms == nil { + // context is canceled + return } + is.consolidator.Fill() + is.handler.DispatchMetricMap(ctx, gostatsd.MergeMaps(mms)) + is.flushNotifier.NotifyFlush(ctx, d) + } // Gauge sends a gauge metric @@ -73,7 +65,7 @@ func (is *InternalStatser) Gauge(name string, value float64, tags gostatsd.Tags) Rate: 1, Type: gostatsd.GAUGE, } - is.dispatchInternal(g) + is.dispatchMetric(g) } // Count sends a counter metric @@ -86,7 +78,7 @@ func (is *InternalStatser) Count(name string, amount float64, tags gostatsd.Tags Rate: 1, Type: gostatsd.COUNTER, } - is.dispatchInternal(c) + is.dispatchMetric(c) } // Increment sends a counter metric with a value of 1 @@ -104,7 +96,7 @@ func (is *InternalStatser) TimingMS(name string, ms float64, tags gostatsd.Tags) Rate: 1, Type: gostatsd.TIMER, } - is.dispatchInternal(c) + is.dispatchMetric(c) } // TimingDuration sends a timing metric from a time.Duration @@ -122,25 +114,11 @@ func (is *InternalStatser) WithTags(tags gostatsd.Tags) Statser { return NewTaggedStatser(is, tags) } -// Attempts to dispatch a metric via the internal buffer. Non-blocking. -// Failure to send will be tracked, but not propagated to the caller. -func (is *InternalStatser) dispatchInternal(metric *gostatsd.Metric) { - metric.Timestamp = gostatsd.NanoNow() - - select { - case is.buffer <- metric: - // great success - default: - // at least we tried - atomic.AddUint64(&is.dropped, 1) - } -} - -func (is *InternalStatser) dispatchMetric(ctx context.Context, metric *gostatsd.Metric) { +func (is *InternalStatser) dispatchMetric(metric *gostatsd.Metric) { // the metric is owned by this file, we can change it freely because we know its origins if is.namespace != "" { metric.Name = is.namespace + "." + metric.Name } metric.Tags = metric.Tags.Concat(is.tags) - is.handler.DispatchMetrics(ctx, []*gostatsd.Metric{metric}) + is.consolidator.ReceiveMetrics([]*gostatsd.Metric{metric}) } diff --git a/pkg/stats/statser_tagged.go b/pkg/stats/statser_tagged.go index bb49cca9..9a5ec261 100644 --- a/pkg/stats/statser_tagged.go +++ b/pkg/stats/statser_tagged.go @@ -1,6 +1,7 @@ package stats import ( + "context" "time" "github.com/atlassian/gostatsd" @@ -25,8 +26,8 @@ func NewTaggedStatser(statser Statser, tags gostatsd.Tags) Statser { } } -func (ts *TaggedStatser) NotifyFlush(d time.Duration) { - ts.statser.NotifyFlush(d) +func (ts *TaggedStatser) NotifyFlush(ctx context.Context, d time.Duration) { + ts.statser.NotifyFlush(ctx, d) } func (ts *TaggedStatser) RegisterFlush() (<-chan time.Duration, func()) { diff --git a/pkg/statsd/aggregator.go b/pkg/statsd/aggregator.go index 862cbadf..5e10372e 100644 --- a/pkg/statsd/aggregator.go +++ b/pkg/statsd/aggregator.go @@ -23,7 +23,6 @@ type percentStruct struct { // MetricAggregator aggregates metrics. type MetricAggregator struct { - metricsReceived uint64 metricMapsReceived uint64 expiryIntervalCounter time.Duration // How often to expire counters expiryIntervalGauge time.Duration // How often to expire gauges @@ -82,7 +81,6 @@ func round(v float64) float64 { // Flush prepares the contents of a MetricAggregator for sending via the Sender. func (a *MetricAggregator) Flush(flushInterval time.Duration) { - a.statser.Gauge("aggregator.metrics_received", float64(a.metricsReceived), nil) a.statser.Gauge("aggregator.metricmaps_received", float64(a.metricMapsReceived), nil) flushInSeconds := float64(flushInterval) / float64(time.Second) @@ -215,7 +213,6 @@ func deleteMetric(key, tagsKey string, metrics gostatsd.AggregatedMetrics) { // Reset clears the contents of a MetricAggregator. func (a *MetricAggregator) Reset() { - a.metricsReceived = 0 a.metricMapsReceived = 0 nowNano := gostatsd.Nanotime(a.now().UnixNano()) @@ -275,15 +272,6 @@ func (a *MetricAggregator) Reset() { }) } -// Receive takes a batched metrics and will put them on the internal aggregator -// queue to be processed -func (a *MetricAggregator) Receive(ms ...*gostatsd.Metric) { - a.metricsReceived += uint64(len(ms)) - for _, m := range ms { - a.metricMap.Receive(m) - } -} - // ReceiveMap takes a single metric map and will aggregate the values func (a *MetricAggregator) ReceiveMap(mm *gostatsd.MetricMap) { a.metricMapsReceived++ diff --git a/pkg/statsd/aggregator_test.go b/pkg/statsd/aggregator_test.go index 87c07ca4..5d59f636 100644 --- a/pkg/statsd/aggregator_test.go +++ b/pkg/statsd/aggregator_test.go @@ -1,13 +1,10 @@ package statsd import ( - "context" "math" - "runtime" "testing" "time" - "github.com/ash2k/stager" "github.com/stretchr/testify/assert" "github.com/atlassian/gostatsd" @@ -25,12 +22,6 @@ func newFakeAggregator() *MetricAggregator { ) } -type fakeAggregatorFactory struct{} - -func (faf *fakeAggregatorFactory) Create() Aggregator { - return newFakeAggregator() -} - func TestNewAggregator(t *testing.T) { t.Parallel() assrt := assert.New(t) @@ -408,7 +399,9 @@ func TestDisabledCount(t *testing.T) { t.Parallel() ma := newFakeAggregator() ma.disabledSubtypes.CountPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "count_90" { @@ -421,7 +414,9 @@ func TestDisabledMean(t *testing.T) { t.Parallel() ma := newFakeAggregator() ma.disabledSubtypes.MeanPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "mean_90" { @@ -434,7 +429,9 @@ func TestDisabledSum(t *testing.T) { t.Parallel() ma := newFakeAggregator() ma.disabledSubtypes.SumPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "sum_90" { @@ -447,7 +444,9 @@ func TestDisabledSumSquares(t *testing.T) { t.Parallel() ma := newFakeAggregator() ma.disabledSubtypes.SumSquaresPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "sum_squares_90" { @@ -460,7 +459,9 @@ func TestDisabledUpper(t *testing.T) { t.Parallel() ma := newFakeAggregator() ma.disabledSubtypes.UpperPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "upper_90" { @@ -481,7 +482,9 @@ func TestDisabledLower(t *testing.T) { math.MaxUint32, ) ma.disabledSubtypes.LowerPct = true - ma.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + mm := gostatsd.NewMetricMap() + mm.Receive(&gostatsd.Metric{Name: "x", Value: 1, Type: gostatsd.TIMER}) + ma.ReceiveMap(mm) ma.Flush(1 * time.Second) for _, pct := range ma.metricMap.Timers["x"][""].Percentiles { if pct.Str == "lower_-90" { // lower_-90? @@ -489,39 +492,3 @@ func TestDisabledLower(t *testing.T) { } } } - -func BenchmarkHotMetric(b *testing.B) { - beh := NewBackendHandler( - nil, - 1000, - runtime.NumCPU(), - 10000, - &fakeAggregatorFactory{}, - ) - - stgr := stager.New() - stage := stgr.NextStage() - stage.StartWithContext(beh.Run) - stage = stgr.NextStage() - - ctx := context.Background() - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < runtime.NumCPU(); i++ { - stage.Start(func() { - for n := 0; n < b.N; n++ { - m := &gostatsd.Metric{ - Name: "metric.name", - Value: 5, - Tags: gostatsd.Tags{"aaaa:aaaa", "aaab:aaab", "aaac:aaac", "aaad:aaad", "aaae:aaae", "aaaf:aaaf"}, - Source: "local", - Type: gostatsd.GAUGE, - } - beh.DispatchMetrics(ctx, []*gostatsd.Metric{m}) - } - }) - } - - stgr.Shutdown() -} diff --git a/pkg/statsd/flusher.go b/pkg/statsd/flusher.go index 1a0abc66..b243ed62 100644 --- a/pkg/statsd/flusher.go +++ b/pkg/statsd/flusher.go @@ -66,10 +66,10 @@ func (f *MetricFlusher) Run(ctx context.Context) { return case thisFlush := <-ch: // Time to flush to the backends flushDelta := thisFlush.Sub(lastFlush) + statser.NotifyFlush(ctx, flushDelta) if f.aggregateProcesser != AggregateProcesser(nil) { f.flushData(ctx, flushDelta, statser) } - statser.NotifyFlush(flushDelta) lastFlush = thisFlush } } diff --git a/pkg/statsd/handler_backend.go b/pkg/statsd/handler_backend.go index 32fbc4ab..ccd17137 100644 --- a/pkg/statsd/handler_backend.go +++ b/pkg/statsd/handler_backend.go @@ -45,7 +45,6 @@ func NewBackendHandler(backends []gostatsd.Backend, maxConcurrentEvents uint, nu workers[i] = &worker{ aggr: af.Create(), // TODO: Reassess the defaults - metricsQueue: make(chan []*gostatsd.Metric, perWorkerBufferSize), metricMapQueue: make(chan *gostatsd.MetricMap, perWorkerBufferSize), processChan: make(chan *processCommand), id: i, @@ -66,7 +65,6 @@ func (bh *BackendHandler) Run(ctx context.Context) { var wg wait.Group defer func() { for _, worker := range bh.workers { - close(worker.metricsQueue) // Close channel to terminate worker close(worker.metricMapQueue) // Close channel to terminate worker } wg.Wait() // Wait for all workers to finish @@ -123,26 +121,7 @@ func (bh *BackendHandler) EstimatedTags() int { return 0 } -// DispatchMetrics dispatches metric to a corresponding Aggregator. -func (bh *BackendHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { - metricsByAggr := make([][]*gostatsd.Metric, bh.numWorkers) - - for _, m := range metrics { - m.TagsKey = m.FormatTagsKey() // this is expensive, so do it with no aggregator affinity - bucket := m.Bucket(bh.numWorkers) - metricsByAggr[bucket] = append(metricsByAggr[bucket], m) - } - - for aggrIdx, bucketedMetrics := range metricsByAggr { - w := bh.workers[aggrIdx] - select { - case <-ctx.Done(): - case w.metricsQueue <- bucketedMetrics: - } - } -} - -// DispatchMetricMap re-dispatches a metric map through BackendHandler.DispatchMetrics +// DispatchMetricMap splits a MetricMap in to per-aggregator buckets and distributes it. func (bh *BackendHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { maps := mm.Split(bh.numWorkers) diff --git a/pkg/statsd/handler_backend_test.go b/pkg/statsd/handler_backend_test.go index 1ac00dd0..9fd21755 100644 --- a/pkg/statsd/handler_backend_test.go +++ b/pkg/statsd/handler_backend_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "runtime" "sync" "testing" "time" @@ -115,47 +114,6 @@ func TestRunShouldReturnWhenContextCancelled(t *testing.T) { h.Run(ctx) } -func TestDispatchMetricsShouldDistributeMetrics(t *testing.T) { - t.Parallel() - r := rand.New(rand.NewSource(time.Now().UnixNano())) - n := r.Intn(5) + 1 - factory := newTestFactory() - // use a sync channel (perWorkerBufferSize = 0) to force the workers to process events before the context is cancelled - h := NewBackendHandler(nil, 0, n, 0, factory) - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - var wgFinish wait.Group - wgFinish.StartWithContext(ctx, h.Run) - numMetrics := r.Intn(1000) + n*10 - var wg sync.WaitGroup - wg.Add(numMetrics) - for i := 0; i < numMetrics; i++ { - m := &gostatsd.Metric{ - Type: gostatsd.COUNTER, - Name: fmt.Sprintf("counter.metric.%d", r.Int63()), - Tags: nil, - Value: r.Float64(), - } - go func() { - defer wg.Done() - h.DispatchMetrics(ctx, []*gostatsd.Metric{m}) - }() - } - wg.Wait() // Wait for all metrics to be dispatched - cancelFunc() // After all metrics have been dispatched, we signal dispatcher to shut down - wgFinish.Wait() // Wait for dispatcher to shutdown - - receiveInvocations := getTotalInvocations(factory.receiveInvocations) - assert.Equal(t, numMetrics, receiveInvocations) - for agrNum, count := range factory.receiveInvocations { - if count == 0 { - t.Errorf("aggregator %d was never invoked", agrNum) - } else { - t.Logf("aggregator %d was invoked %d time(s)", agrNum, count) - } - } -} - func TestDispatchMetricMapShouldDistributeMetrics(t *testing.T) { t.Parallel() r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -197,36 +155,3 @@ func TestDispatchMetricMapShouldDistributeMetrics(t *testing.T) { } } } - -func getTotalInvocations(inv map[int]int) int { - var counter int - for _, i := range inv { - counter += i - } - return counter -} - -func BenchmarkBackendHandler(b *testing.B) { - rand.Seed(time.Now().UnixNano()) - factory := newTestFactory() - h := NewBackendHandler(nil, 0, runtime.NumCPU(), 10, factory) - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - var wgFinish wait.Group - wgFinish.StartWithContext(ctx, h.Run) - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - m := &gostatsd.Metric{ - Type: gostatsd.COUNTER, - Name: fmt.Sprintf("counter.metric.%d", rand.Int63()), - Tags: nil, - Value: rand.Float64(), - } - h.DispatchMetrics(ctx, []*gostatsd.Metric{m}) - } - }) - cancelFunc() // After all metrics have been dispatched, we signal dispatcher to shut down - wgFinish.Wait() // Wait for dispatcher to shutdown -} diff --git a/pkg/statsd/handler_cloud.go b/pkg/statsd/handler_cloud.go index 0145cc62..468b9798 100644 --- a/pkg/statsd/handler_cloud.go +++ b/pkg/statsd/handler_cloud.go @@ -57,19 +57,19 @@ func (ch *CloudHandler) EstimatedTags() int { return ch.estimatedTags } -func (ch *CloudHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { - var toDispatch []*gostatsd.Metric +func (ch *CloudHandler) processMetrics(ctx context.Context, metrics []*gostatsd.Metric) { + mmToDispatch := gostatsd.NewMetricMap() var toHandle []*gostatsd.Metric for _, m := range metrics { - if ch.updateTagsAndHostname(&m.Tags, &m.Source) { - toDispatch = append(toDispatch, m) + if ch.updateTagsAndHostname(m, m.Source) { + mmToDispatch.Receive(m) } else { toHandle = append(toHandle, m) } } - if len(toDispatch) > 0 { - ch.handler.DispatchMetrics(ctx, toDispatch) + if !mmToDispatch.IsEmpty() { + ch.handler.DispatchMetricMap(ctx, mmToDispatch) } if len(toHandle) > 0 { @@ -80,16 +80,16 @@ func (ch *CloudHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd } } -// DispatchMetricMap re-dispatches a metric map through CloudHandler.DispatchMetrics +// DispatchMetricMap re-dispatches a MetricMap through CloudHandler.processMetrics // TODO: This is inefficient, and should be handled first class, however that is a major re-factor of // the CloudHandler. It is also recommended to not use a CloudHandler in an http receiver based // service, as the IP is not propagated. func (ch *CloudHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { - mm.DispatchMetrics(ctx, ch) + ch.processMetrics(ctx, mm.AsMetrics()) } func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) { - if ch.updateTagsAndHostname(&e.Tags, &e.Source) { + if ch.updateTagsAndHostname(e, e.Source) { ch.handler.DispatchEvent(ctx, e) return } @@ -233,10 +233,12 @@ func (ch *CloudHandler) handleIncomingEvent(e *gostatsd.Event) { } func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *gostatsd.Instance, metrics []*gostatsd.Metric) { + mm := gostatsd.NewMetricMap() for _, m := range metrics { - updateInplace(&m.Tags, &m.Source, instance) + updateInplace(m, instance) + mm.Receive(m) } - ch.handler.DispatchMetrics(ctx, metrics) + ch.handler.DispatchMetricMap(ctx, mm) } func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *gostatsd.Instance, events []*gostatsd.Event) { @@ -245,16 +247,16 @@ func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *g ch.wg.Add(-dispatched) }() for _, e := range events { - updateInplace(&e.Tags, &e.Source, instance) + updateInplace(e, instance) dispatched++ ch.handler.DispatchEvent(ctx, e) } } -func (ch *CloudHandler) updateTagsAndHostname(tags *gostatsd.Tags, source *gostatsd.Source) bool /*is a cache hit*/ { - instance, cacheHit := ch.getInstance(*source) +func (ch *CloudHandler) updateTagsAndHostname(obj TagChanger, source gostatsd.Source) bool /*is a cache hit*/ { + instance, cacheHit := ch.getInstance(source) if cacheHit { - updateInplace(tags, source, instance) + updateInplace(obj, instance) } return cacheHit } @@ -272,11 +274,8 @@ func (ch *CloudHandler) getInstance(ip gostatsd.Source) (*gostatsd.Instance, boo return instance, true } -func updateInplace(tags *gostatsd.Tags, source *gostatsd.Source, instance *gostatsd.Instance) { +func updateInplace(obj TagChanger, instance *gostatsd.Instance) { if instance != nil { // It was a positive cache hit (successful lookup cache, not failed lookup cache) - // Update hostname inplace - *source = instance.ID - // Update tag list inplace - *tags = append(*tags, instance.Tags...) + obj.AddTagsSetSource(instance.Tags, instance.ID) } } diff --git a/pkg/statsd/handler_cloud_test.go b/pkg/statsd/handler_cloud_test.go index 758e8f59..33ad590b 100644 --- a/pkg/statsd/handler_cloud_test.go +++ b/pkg/statsd/handler_cloud_test.go @@ -9,17 +9,19 @@ import ( "github.com/ash2k/stager/wait" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/tilinna/clock" "golang.org/x/time/rate" "github.com/atlassian/gostatsd" + "github.com/atlassian/gostatsd/internal/fixtures" "github.com/atlassian/gostatsd/pkg/cachedinstances/cloudprovider" "github.com/atlassian/gostatsd/pkg/cloudproviders/fakeprovider" ) -// BenchmarkCloudHandlerDispatchMetric is a benchmark intended to (manually) test +// BenchmarkCloudHandlerDispatchMetricMap is a benchmark intended to (manually) test // the impact of the CloudHandler.statsCacheHit field. -func BenchmarkCloudHandlerDispatchMetric(b *testing.B) { +func BenchmarkCloudHandlerDispatchMetricMap(b *testing.B) { fp := &fakeprovider.IP{} nh := &nopHandler{} ci := cloudprovider.NewCachedCloudProvider(logrus.StandardLogger(), rate.NewLimiter(100, 120), fp, gostatsd.CacheOptions{ @@ -40,9 +42,11 @@ func BenchmarkCloudHandlerDispatchMetric(b *testing.B) { ctxBackground := context.Background() b.RunParallel(func(pb *testing.PB) { + mm := gostatsd.NewMetricMap() + mm.Receive(sm1()) + for pb.Next() { - m := sm1() - ch.DispatchMetrics(ctxBackground, []*gostatsd.Metric{&m}) + ch.DispatchMetricMap(ctxBackground, mm) } }) } @@ -79,7 +83,7 @@ func TestTransientInstanceFailure(t *testing.T) { wg.StartWithContext(ctx, ch.Run) wg.StartWithContext(ctx, ci.Run) m1 := sm1() - m2 := sm1() + m2 := sm2() // There's no good way to tell when the Ticker has been created, so we use a hard loop for _, d := clck.AddNext(); d == 0 && ctx.Err() == nil; _, d = clck.AddNext() { @@ -87,8 +91,10 @@ func TestTransientInstanceFailure(t *testing.T) { } // t+0: prime the cache - expecting.Expect(1, 0, 0) - ch.DispatchMetrics(ctx, []*gostatsd.Metric{&m1}) + expecting.Expect(1, 0) + mm := gostatsd.NewMetricMap() + mm.Receive(m1) + ch.DispatchMetricMap(ctx, mm) expecting.WaitAll() clck.Add(50 * time.Millisecond) @@ -96,23 +102,32 @@ func TestTransientInstanceFailure(t *testing.T) { clck.Add(50 * time.Millisecond) // t+100ms: read from cache, must still be valid - expecting.Expect(1, 0, 0) - ch.DispatchMetrics(ctx, []*gostatsd.Metric{&m2}) + expecting.Expect(1, 0) + + mm = gostatsd.NewMetricMap() + mm.Receive(m2) + ch.DispatchMetricMap(ctx, mm) expecting.WaitAll() cancelFunc() wg.Wait() - expectedMetrics := []gostatsd.Metric{ - sm1(), sm1(), - } + expectedMetrics := []*gostatsd.Metric{sm1(), sm2()} expectedMetrics[0].Tags = gostatsd.Tags{"a1", "tag:value"} expectedMetrics[0].Source = "1.2.3.4" - expectedMetrics[1].Tags = gostatsd.Tags{"a1", "tag:value"} + expectedMetrics[1].Tags = gostatsd.Tags{"a4", "tag:value"} expectedMetrics[1].Source = "1.2.3.4" - assert.Equal(t, expectedMetrics, expecting.Metrics()) + actual := gostatsd.MergeMaps(expecting.MetricMaps()).AsMetrics() + + sort.Slice(expectedMetrics, fixtures.SortCompare(expectedMetrics)) + sort.Slice(actual, fixtures.SortCompare(actual)) + for _, em := range expectedMetrics { + em.FormatTagsKey() + } + + require.Equal(t, expectedMetrics, actual) } func TestCloudHandlerDispatch(t *testing.T) { @@ -122,30 +137,32 @@ func TestCloudHandlerDispatch(t *testing.T) { } expectedIps := []gostatsd.Source{"1.2.3.4", "4.3.2.1"} - expectedMetrics := []gostatsd.Metric{ + expectedMetrics := []*gostatsd.Metric{ { Name: "t1", - Value: 42.42, + Value: 42, + Rate: 1, Tags: gostatsd.Tags{"a1", "region:us-west-3", "tag1", "tag2:234"}, Source: "i-1.2.3.4", Type: gostatsd.COUNTER, }, { Name: "t1", - Value: 45.45, + Value: 45, + Rate: 1, Tags: gostatsd.Tags{"a4", "region:us-west-3", "tag1", "tag2:234"}, Source: "i-1.2.3.4", Type: gostatsd.COUNTER, }, } expectedEvents := gostatsd.Events{ - gostatsd.Event{ + { Title: "t12", Text: "asrasdfasdr", Tags: gostatsd.Tags{"a2", "region:us-west-3", "tag1", "tag2:234"}, Source: "i-4.3.2.1", }, - gostatsd.Event{ + { Title: "t1asdas", Text: "asdr", Tags: gostatsd.Tags{"a2-35", "region:us-west-3", "tag1", "tag2:234"}, @@ -159,7 +176,7 @@ func TestCloudHandlerInstanceNotFound(t *testing.T) { t.Parallel() fp := &fakeprovider.NotFound{} expectedIps := []gostatsd.Source{"1.2.3.4", "4.3.2.1"} - expectedMetrics := []gostatsd.Metric{ + expectedMetrics := []*gostatsd.Metric{ sm1(), sm2(), } @@ -174,7 +191,7 @@ func TestCloudHandlerFailingProvider(t *testing.T) { t.Parallel() fp := &fakeprovider.Failing{} expectedIps := []gostatsd.Source{"1.2.3.4", "4.3.2.1"} - expectedMetrics := []gostatsd.Metric{ + expectedMetrics := []*gostatsd.Metric{ sm1(), sm2(), } @@ -185,7 +202,18 @@ func TestCloudHandlerFailingProvider(t *testing.T) { doCheck(t, fp, sm1(), se1(), sm2(), se2(), fp.IPs, expectedIps, expectedMetrics, expectedEvents) } -func doCheck(t *testing.T, cloud CountingProvider, m1 gostatsd.Metric, e1 gostatsd.Event, m2 gostatsd.Metric, e2 gostatsd.Event, ipsFunc func() []gostatsd.Source, expectedIps []gostatsd.Source, expectedM []gostatsd.Metric, expectedE gostatsd.Events) { +func doCheck( + t *testing.T, + cloud CountingProvider, + m1 *gostatsd.Metric, + e1 *gostatsd.Event, + m2 *gostatsd.Metric, + e2 *gostatsd.Event, + ipsFunc func() []gostatsd.Source, + expectedIps []gostatsd.Source, + expectedM []*gostatsd.Metric, + expectedE gostatsd.Events, +) { expecting := &expectingHandler{} ci := cloudprovider.NewCachedCloudProvider(logrus.StandardLogger(), rate.NewLimiter(100, 120), cloud, gostatsd.CacheOptions{ CacheRefreshPeriod: gostatsd.DefaultCacheRefreshPeriod, @@ -202,14 +230,18 @@ func doCheck(t *testing.T, cloud CountingProvider, m1 gostatsd.Metric, e1 gostat wg.StartWithContext(ctx, ch.Run) wg.StartWithContext(ctx, ci.Run) - expecting.Expect(1, 0, 1) - ch.DispatchMetrics(ctx, []*gostatsd.Metric{&m1}) - ch.DispatchEvent(ctx, &e1) + expecting.Expect(1, 1) + mm := gostatsd.NewMetricMap() + mm.Receive(m1) + ch.DispatchMetricMap(ctx, mm) + ch.DispatchEvent(ctx, e1) expecting.WaitAll() - expecting.Expect(1, 0, 1) - ch.DispatchMetrics(ctx, []*gostatsd.Metric{&m2}) - ch.DispatchEvent(ctx, &e2) + expecting.Expect(1, 1) + mm = gostatsd.NewMetricMap() + mm.Receive(m2) + ch.DispatchMetricMap(ctx, mm) + ch.DispatchEvent(ctx, e2) expecting.WaitAll() cancelFunc() @@ -220,33 +252,41 @@ func doCheck(t *testing.T, cloud CountingProvider, m1 gostatsd.Metric, e1 gostat return ips[i] < ips[j] }) assert.Equal(t, expectedIps, ips) - assert.Equal(t, expectedM, expecting.Metrics()) + actual := gostatsd.MergeMaps(expecting.MetricMaps()).AsMetrics() + sort.Slice(expectedM, fixtures.SortCompare(expectedM)) + sort.Slice(actual, fixtures.SortCompare(actual)) + for _, em := range expectedM { + em.FormatTagsKey() + } + assert.Equal(t, expectedM, actual) assert.Equal(t, expectedE, expecting.Events()) assert.LessOrEqual(t, cloud.Invocations(), uint64(2)) } -func sm1() gostatsd.Metric { - return gostatsd.Metric{ +func sm1() *gostatsd.Metric { + return &gostatsd.Metric{ Name: "t1", - Value: 42.42, + Value: 42, + Rate: 1, Tags: gostatsd.Tags{"a1"}, Source: "1.2.3.4", Type: gostatsd.COUNTER, } } -func sm2() gostatsd.Metric { - return gostatsd.Metric{ +func sm2() *gostatsd.Metric { + return &gostatsd.Metric{ Name: "t1", - Value: 45.45, + Value: 45, + Rate: 1, Tags: gostatsd.Tags{"a4"}, Source: "1.2.3.4", Type: gostatsd.COUNTER, } } -func se1() gostatsd.Event { - return gostatsd.Event{ +func se1() *gostatsd.Event { + return &gostatsd.Event{ Title: "t12", Text: "asrasdfasdr", Tags: gostatsd.Tags{"a2"}, @@ -254,8 +294,8 @@ func se1() gostatsd.Event { } } -func se2() gostatsd.Event { - return gostatsd.Event{ +func se2() *gostatsd.Event { + return &gostatsd.Event{ Title: "t1asdas", Text: "asdr", Tags: gostatsd.Tags{"a2-35"}, diff --git a/pkg/statsd/handler_fixtures_test.go b/pkg/statsd/handler_fixtures_test.go index 611d1481..ff789409 100644 --- a/pkg/statsd/handler_fixtures_test.go +++ b/pkg/statsd/handler_fixtures_test.go @@ -8,7 +8,6 @@ import ( ) type capturingHandler struct { - m []*gostatsd.Metric mm []*gostatsd.MetricMap e []*gostatsd.Event } @@ -17,10 +16,6 @@ func (tch *capturingHandler) EstimatedTags() int { return 0 } -func (tch *capturingHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { - tch.m = append(tch.m, metrics...) -} - func (tch *capturingHandler) DispatchMetricMap(ctx context.Context, metrics *gostatsd.MetricMap) { tch.mm = append(tch.mm, metrics) } @@ -38,9 +33,6 @@ func (nh *nopHandler) EstimatedTags() int { return 0 } -func (nh *nopHandler) DispatchMetrics(ctx context.Context, m []*gostatsd.Metric) { -} - func (nh *nopHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { } @@ -53,16 +45,10 @@ func (nh *nopHandler) WaitForEvents() { type expectingHandler struct { countingHandler - wgMetrics sync.WaitGroup wgMetricMaps sync.WaitGroup wgEvents sync.WaitGroup } -func (e *expectingHandler) DispatchMetrics(ctx context.Context, m []*gostatsd.Metric) { - e.countingHandler.DispatchMetrics(ctx, m) - e.wgMetrics.Add(-len(m)) -} - func (e *expectingHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { e.countingHandler.DispatchMetricMap(ctx, mm) e.wgMetricMaps.Done() @@ -73,32 +59,39 @@ func (e *expectingHandler) DispatchEvent(ctx context.Context, event *gostatsd.Ev e.wgEvents.Done() } -func (e *expectingHandler) Expect(ms, mms, es int) { - e.wgMetrics.Add(ms) +func (e *expectingHandler) Expect(mms, es int) { e.wgMetricMaps.Add(mms) e.wgEvents.Add(es) } func (e *expectingHandler) WaitAll() { - e.wgMetrics.Wait() e.wgMetricMaps.Wait() e.wgEvents.Wait() } type countingHandler struct { mu sync.Mutex - metrics []gostatsd.Metric + metrics []*gostatsd.Metric + maps []*gostatsd.MetricMap events gostatsd.Events } -func (ch *countingHandler) Metrics() []gostatsd.Metric { +func (ch *countingHandler) Metrics() []*gostatsd.Metric { ch.mu.Lock() defer ch.mu.Unlock() - result := make([]gostatsd.Metric, len(ch.metrics)) + result := make([]*gostatsd.Metric, len(ch.metrics)) copy(result, ch.metrics) return result } +func (ch *countingHandler) MetricMaps() []*gostatsd.MetricMap { + ch.mu.Lock() + defer ch.mu.Unlock() + result := make([]*gostatsd.MetricMap, len(ch.maps)) + copy(result, ch.maps) + return result +} + func (ch *countingHandler) Events() gostatsd.Events { ch.mu.Lock() defer ch.mu.Unlock() @@ -111,24 +104,16 @@ func (ch *countingHandler) EstimatedTags() int { return 0 } -func (ch *countingHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { +func (ch *countingHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { ch.mu.Lock() defer ch.mu.Unlock() - for _, m := range metrics { - m.DoneFunc = nil // Clear DoneFunc because it contains non-predictable variable data which interferes with the tests - ch.metrics = append(ch.metrics, *m) - } -} - -// DispatchMetricMap re-dispatches a metric map through BackendHandler.DispatchMetrics -func (ch *countingHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { - mm.DispatchMetrics(ctx, ch) + ch.maps = append(ch.maps, mm) } func (ch *countingHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) { ch.mu.Lock() defer ch.mu.Unlock() - ch.events = append(ch.events, *e) + ch.events = append(ch.events, e) } func (ch *countingHandler) WaitForEvents() { diff --git a/pkg/statsd/handler_http_forwarder_v2.go b/pkg/statsd/handler_http_forwarder_v2.go index 6a9419f5..59aef9bc 100644 --- a/pkg/statsd/handler_http_forwarder_v2.go +++ b/pkg/statsd/handler_http_forwarder_v2.go @@ -180,11 +180,7 @@ func (hfh *HttpForwarderHandlerV2) EstimatedTags() int { return 0 } -func (hfh *HttpForwarderHandlerV2) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { - hfh.consolidator.ReceiveMetrics(metrics) -} - -// DispatchMetricMap re-dispatches a metric map through HttpForwarderHandlerV2.DispatchMetrics +// DispatchMetricMap dispatches a metric map to the MetricConsolidator func (hfh *HttpForwarderHandlerV2) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { hfh.consolidator.ReceiveMetricMap(mm) } diff --git a/pkg/statsd/handler_tags.go b/pkg/statsd/handler_tags.go index 18f7db5e..432ed69d 100644 --- a/pkg/statsd/handler_tags.go +++ b/pkg/statsd/handler_tags.go @@ -50,20 +50,6 @@ func (th *TagHandler) EstimatedTags() int { return th.estimatedTags } -// DispatchMetrics adds the unique tags from the TagHandler to the metric and passes it to the next stage in the pipeline -func (th *TagHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { - var toDispatch []*gostatsd.Metric - - for _, m := range metrics { - if th.uniqueFilterMetricAndAddTags(m) { - toDispatch = append(toDispatch, m) - } - } - if len(toDispatch) > 0 { - th.handler.DispatchMetrics(ctx, toDispatch) - } -} - // DispatchMetricMap adds the unique tags from the TagHandler to each consolidated metric in the map and passes it to // the next stage in the pipeline // @@ -205,10 +191,6 @@ func (th *TagHandler) uniqueFilterAndAddTags(mName string, mHostname *gostatsd.S return true } -func (th *TagHandler) uniqueFilterMetricAndAddTags(m *gostatsd.Metric) bool { - return th.uniqueFilterAndAddTags(m.Name, &m.Source, &m.Tags) -} - // DispatchEvent adds the unique tags from the TagHandler to the event and passes it to the next stage in the pipeline func (th *TagHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) { e.Tags = uniqueTags(e.Tags, th.tags) diff --git a/pkg/statsd/handler_tags_test.go b/pkg/statsd/handler_tags_test.go index b0453ccc..473f4124 100644 --- a/pkg/statsd/handler_tags_test.go +++ b/pkg/statsd/handler_tags_test.go @@ -12,9 +12,11 @@ import ( "github.com/stretchr/testify/require" "github.com/atlassian/gostatsd" + . "github.com/atlassian/gostatsd/internal/fixtures" ) func TestTagStripMergesCounters(t *testing.T) { + t.Parallel() tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, []Filter{ {DropTags: gostatsd.StringMatchList{gostatsd.NewStringMatch("key2:*")}}, @@ -32,6 +34,7 @@ func TestTagStripMergesCounters(t *testing.T) { } func TestTagStripMergesGauges(t *testing.T) { + t.Parallel() tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, []Filter{ {DropTags: gostatsd.StringMatchList{gostatsd.NewStringMatch("key2:*")}}, @@ -49,6 +52,7 @@ func TestTagStripMergesGauges(t *testing.T) { } func TestTagStripMergesTimers(t *testing.T) { + t.Parallel() tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, []Filter{ {DropTags: gostatsd.StringMatchList{gostatsd.NewStringMatch("key2:*")}}, @@ -69,6 +73,7 @@ func TestTagStripMergesTimers(t *testing.T) { } func TestTagStripMergesSets(t *testing.T) { + t.Parallel() tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, []Filter{ {DropTags: gostatsd.StringMatchList{gostatsd.NewStringMatch("key2:*")}}, @@ -86,57 +91,39 @@ func TestTagStripMergesSets(t *testing.T) { } func TestFilterPassesNoFilters(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) - m := &gostatsd.Metric{ - Name: "name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - expected := []*gostatsd.Metric{ - { - Name: "name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - }, - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, expected, tch.m) + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric()) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestFilterPassesEmptyFilters(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{} - m := &gostatsd.Metric{ - Name: "name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - expected := []*gostatsd.Metric{ - { - Name: "name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - }, - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, expected, tch.m) + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric()) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestFilterKeepNonMatch(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ @@ -145,115 +132,80 @@ func TestFilterKeepNonMatch(t *testing.T) { DropMetric: true, }, } - m := &gostatsd.Metric{ - Name: "good.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - expected := []*gostatsd.Metric{ - { - Name: "good.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - }, - } - assert.Equal(t, expected, tch.m) + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric()) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestFilterDropsBadName(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ { - MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("bad.name")}, + MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("name")}, DropMetric: true, }, } - m := &gostatsd.Metric{ - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 0, len(tch.m)) + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 0) // nothing is dispatched if the entire MetricMap is dropped } func TestFilterDropsBadPrefix(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ { - MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("bad.*")}, + MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("na*")}, DropMetric: true, }, } - m := &gostatsd.Metric{ - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 0, len(tch.m)) + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 0) // nothing is dispatched if the entire MetricMap is dropped } func TestFilterKeepsWhitelist(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ { - MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("bad.*")}, - ExcludeMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("bad.good")}, + MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("name.*")}, + ExcludeMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("name.good")}, DropMetric: true, }, } - m := &gostatsd.Metric{ - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric(Name("name.bad"))) + mm.Receive(MakeMetric(Name("name.good"))) - m = &gostatsd.Metric{ - Name: "bad.good", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(Name("name.good"))) - expected := []*gostatsd.Metric{ - { - Name: "bad.good", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - }, - } - assert.Equal(t, expected, tch.m) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestFilterDropsTag(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ @@ -263,62 +215,41 @@ func TestFilterDropsTag(t *testing.T) { }, } - m := &gostatsd.Metric{ - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric(Name("bad.name"))) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(Name("bad.name"), DropTag("host:baz"))) - expected := []*gostatsd.Metric{ - { - Name: "bad.name", - Tags: gostatsd.Tags{ - "host:baz", - }, - Source: "baz", - }, - } - assert.Equal(t, expected, tch.m) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestFilterDropsHost(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) th.filters = []Filter{ { - MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("bad.name")}, + MatchMetrics: gostatsd.StringMatchList{gostatsd.NewStringMatch("name")}, DropHost: true, }, } - m := &gostatsd.Metric{ - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "baz", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(DropSource)) - expected := []*gostatsd.Metric{ - { - Name: "bad.name", - Tags: gostatsd.Tags{ - "foo:bar", - "host:baz", - }, - Source: "", - }, - } - assert.Equal(t, expected, tch.m) + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestNewTagHandlerFromViper(t *testing.T) { + t.Parallel() + var data = []byte(` filters='drop-noisy-metric drop-noisy-metric-with-tag drop-noisy-tag drop-noisy-keep-quiet-metric drop-host' @@ -409,60 +340,75 @@ func assertHasAllTags(t *testing.T, actual gostatsd.Tags, expected ...string) { } func TestTagMetricHandlerAddsNoTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) - m := &gostatsd.Metric{} - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 1, len(tch.m)) // Metric tracked - assertHasAllTags(t, tch.m[0].Tags) - assert.Equal(t, gostatsd.UnknownSource, tch.m[0].Source) // No hostname added + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric(DropSource)) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(DropSource)) // No hostname added + + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestTagMetricHandlerAddsSingleTag(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1"}, nil) - m := &gostatsd.Metric{} - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 1, len(tch.m)) // Metric tracked - assertHasAllTags(t, tch.m[0].Tags, "tag1") - assert.Equal(t, gostatsd.UnknownSource, tch.m[0].Source) // No hostname added + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(AddTag("tag1"))) + + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestTagMetricHandlerAddsMultipleTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1", "tag2"}, nil) - m := &gostatsd.Metric{} - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 1, len(tch.m)) // Metric tracked - assertHasAllTags(t, tch.m[0].Tags, "tag1", "tag2") - assert.Equal(t, gostatsd.UnknownSource, tch.m[0].Source) // No hostname added -} -func TestTagMetricHandlerAddsHostname(t *testing.T) { - tch := &capturingHandler{} - th := NewTagHandler(tch, gostatsd.Tags{}, nil) - m := &gostatsd.Metric{ - Source: "1.2.3.4", - } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 1, len(tch.m)) // Metric tracked - assert.Equal(t, 0, len(tch.m[0].Tags)) // No tags added - assert.Equal(t, gostatsd.Source("1.2.3.4"), tch.m[0].Source) // Hostname injected + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(AddTag("tag1", "tag2"))) + + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestTagMetricHandlerAddsDuplicateTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1", "tag2", "tag2", "tag3", "tag1"}, nil) - m := &gostatsd.Metric{} - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) - assert.Equal(t, 1, len(tch.m)) // Metric tracked - assertHasAllTags(t, tch.m[0].Tags, "tag1", "tag2", "tag3") - assert.Equal(t, gostatsd.UnknownSource, tch.m[0].Source) // No hostname added + + mm := gostatsd.NewMetricMap() + mm.Receive(MakeMetric()) + expected := gostatsd.NewMetricMap() + expected.Receive(MakeMetric(AddTag("tag1", "tag2", "tag3"))) + + th.DispatchMetricMap(context.Background(), mm) + require.Len(t, tch.mm, 1) + require.Equal(t, expected, tch.mm[0]) } func TestTagEventHandlerAddsNoTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) + e := &gostatsd.Event{} th.DispatchEvent(context.Background(), e) assert.Equal(t, 1, len(tch.e)) // Metric tracked @@ -471,8 +417,11 @@ func TestTagEventHandlerAddsNoTags(t *testing.T) { } func TestTagEventHandlerAddsSingleTag(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1"}, nil) + e := &gostatsd.Event{} th.DispatchEvent(context.Background(), e) assert.Equal(t, 1, len(tch.e)) // Metric tracked @@ -481,8 +430,11 @@ func TestTagEventHandlerAddsSingleTag(t *testing.T) { } func TestTagEventHandlerAddsMultipleTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1", "tag2"}, nil) + e := &gostatsd.Event{} th.DispatchEvent(context.Background(), e) assert.Equal(t, 1, len(tch.e)) // Metric tracked @@ -491,8 +443,11 @@ func TestTagEventHandlerAddsMultipleTags(t *testing.T) { } func TestTagEventHandlerAddsHostname(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{}, nil) + e := &gostatsd.Event{ Source: "1.2.3.4", } @@ -503,8 +458,11 @@ func TestTagEventHandlerAddsHostname(t *testing.T) { } func TestTagEventHandlerAddsDuplicateTags(t *testing.T) { + t.Parallel() + tch := &capturingHandler{} th := NewTagHandler(tch, gostatsd.Tags{"tag1", "tag2", "tag2", "tag3", "tag1"}, nil) + e := &gostatsd.Event{} th.DispatchEvent(context.Background(), e) assert.Equal(t, 1, len(tch.e)) // Metric tracked @@ -533,9 +491,12 @@ func BenchmarkTagMetricHandlerAddsDuplicateTagsSmall(b *testing.B) { metricTags := make(gostatsd.Tags, 0, len(baseTags)+th.EstimatedTags()) metricTags = append(metricTags, baseTags...) m := &gostatsd.Metric{ + Type: gostatsd.COUNTER, Tags: metricTags, } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + mm := gostatsd.NewMetricMap() + mm.Receive(m) + th.DispatchMetricMap(context.Background(), mm) } } @@ -569,9 +530,12 @@ func BenchmarkTagMetricHandlerAddsDuplicateTagsLarge(b *testing.B) { metricTags := make(gostatsd.Tags, 0, len(baseTags)+th.EstimatedTags()) metricTags = append(metricTags, baseTags...) m := &gostatsd.Metric{ + Type: gostatsd.COUNTER, Tags: metricTags, } - th.DispatchMetrics(context.Background(), []*gostatsd.Metric{m}) + mm := gostatsd.NewMetricMap() + mm.Receive(m) + th.DispatchMetricMap(context.Background(), mm) } } diff --git a/pkg/statsd/parser.go b/pkg/statsd/parser.go index 01e248dc..177d39c8 100644 --- a/pkg/statsd/parser.go +++ b/pkg/statsd/parser.go @@ -110,9 +110,13 @@ func (dp *DatagramParser) Run(ctx context.Context) { accumE += eventCount accumB += badLineCount } + // TODO: Refactor this to use a MetricConsolidator + mm := gostatsd.NewMetricMap() + for _, m := range metrics { + mm.Receive(m) + } if len(metrics) > 0 { - dp.handler.DispatchMetrics(ctx, metrics) - + dp.handler.DispatchMetricMap(ctx, mm) dp.doLogRawMetric(metrics) } atomic.AddUint64(&dp.metricsReceived, uint64(len(metrics))) diff --git a/pkg/statsd/parser_test.go b/pkg/statsd/parser_test.go index 2164c104..67f0353d 100644 --- a/pkg/statsd/parser_test.go +++ b/pkg/statsd/parser_test.go @@ -2,6 +2,7 @@ package statsd import ( "context" + "sort" "strconv" "testing" @@ -10,10 +11,11 @@ import ( "golang.org/x/time/rate" "github.com/atlassian/gostatsd" + "github.com/atlassian/gostatsd/internal/fixtures" ) type metricAndEvent struct { - metrics []gostatsd.Metric + metrics []*gostatsd.Metric events gostatsd.Events } @@ -47,43 +49,43 @@ func TestParseDatagram(t *testing.T) { t.Parallel() input := map[string]metricAndEvent{ "f:2|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c\n": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c|#t": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1, Tags: gostatsd.Tags{"t"}}, }, }, "f:2|c|#host:h": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1, Tags: gostatsd.Tags{"host:h"}}, }, }, "f:2|c\nx:3|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, {Name: "x", Value: 3, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c\nx:3|c\n": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, {Name: "x", Value: 3, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, }, }, "_e{1,1}:a|b\nf:6|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 6, Source: "127.0.0.1", Type: gostatsd.COUNTER, Rate: 1}, }, events: gostatsd.Events{ - gostatsd.Event{Title: "a", Text: "b", Source: "127.0.0.1"}, + {Title: "a", Text: "b", Source: "127.0.0.1"}, }, }, } @@ -94,15 +96,27 @@ func TestParseDatagram(t *testing.T) { t.Parallel() mr, ch := newTestParser(false) metrics, _, _ := mr.handleDatagram(context.Background(), 0, fakeIP, []byte(datagram)) - ch.DispatchMetrics(context.Background(), metrics) + mm := gostatsd.NewMetricMap() + for _, m := range metrics { + mm.Receive(m) + } for i, e := range ch.events { if e.DateHappened <= 0 { t.Errorf("%q: DateHappened should be positive", e) } ch.events[i].DateHappened = 0 } + + ch.DispatchMetricMap(context.Background(), mm) + actual := gostatsd.MergeMaps(ch.MetricMaps()).AsMetrics() + for _, m := range mAndE.metrics { + m.FormatTagsKey() + } + sort.Slice(actual, fixtures.SortCompare(actual)) + sort.Slice(mAndE.metrics, fixtures.SortCompare(mAndE.metrics)) + assert.Equal(t, mAndE.events, ch.events) - assert.Equal(t, mAndE.metrics, ch.metrics) + assert.Equal(t, mAndE.metrics, actual) }) } } @@ -111,48 +125,48 @@ func TestParseDatagramIgnoreHost(t *testing.T) { t.Parallel() input := map[string]metricAndEvent{ "f:2|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c\n": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c|#t": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Type: gostatsd.COUNTER, Rate: 1, Tags: gostatsd.Tags{"t"}}, }, }, "f:2|c|#host:h": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "h", Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c|#host:h1,host:h2": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Source: "h1", Type: gostatsd.COUNTER, Rate: 1, Tags: gostatsd.Tags{"host:h2"}}, }, }, "f:2|c\nx:3|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Type: gostatsd.COUNTER, Rate: 1}, {Name: "x", Value: 3, Type: gostatsd.COUNTER, Rate: 1}, }, }, "f:2|c\nx:3|c\n": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 2, Type: gostatsd.COUNTER, Rate: 1}, {Name: "x", Value: 3, Type: gostatsd.COUNTER, Rate: 1}, }, }, "_e{1,1}:a|b\nf:6|c": { - metrics: []gostatsd.Metric{ + metrics: []*gostatsd.Metric{ {Name: "f", Value: 6, Type: gostatsd.COUNTER, Rate: 1}, }, events: gostatsd.Events{ - gostatsd.Event{Title: "a", Text: "b", Source: "127.0.0.1"}, + {Title: "a", Text: "b", Source: "127.0.0.1"}, }, }, } @@ -169,9 +183,21 @@ func TestParseDatagramIgnoreHost(t *testing.T) { } ch.events[i].DateHappened = 0 } - ch.DispatchMetrics(context.Background(), metrics) + mm := gostatsd.NewMetricMap() + for _, m := range metrics { + mm.Receive(m) + } + ch.DispatchMetricMap(context.Background(), mm) + actual := gostatsd.MergeMaps(ch.MetricMaps()).AsMetrics() + for _, m := range mAndE.metrics { + m.FormatTagsKey() + } + + sort.Slice(actual, fixtures.SortCompare(actual)) + sort.Slice(mAndE.metrics, fixtures.SortCompare(mAndE.metrics)) + assert.Equal(t, mAndE.events, ch.events) - assert.Equal(t, mAndE.metrics, ch.metrics) + assert.Equal(t, mAndE.metrics, actual) }) } } diff --git a/pkg/statsd/types.go b/pkg/statsd/types.go index 33534e52..49445a7d 100644 --- a/pkg/statsd/types.go +++ b/pkg/statsd/types.go @@ -23,9 +23,8 @@ type ProcessFunc func(*gostatsd.MetricMap) // Aggregator is an object that aggregates statsd metrics. // The function NewAggregator should be used to create the objects. // -// Incoming metrics should be passed via Receive function. +// Incoming metrics should be passed via ReceiveMap function. type Aggregator interface { - Receive(metrics ...*gostatsd.Metric) ReceiveMap(mm *gostatsd.MetricMap) Flush(interval time.Duration) Process(ProcessFunc) @@ -45,3 +44,10 @@ type Datagram struct { type MetricEmitter interface { RunMetrics(ctx context.Context, statser stats.Statser) } + +// TagChanger is an interface that Metric/Event can implement to update their tags +// and source. It is so the CloudHandler can change the tags without worrying about +// the TagsKey cache. +type TagChanger interface { + AddTagsSetSource(additionalTags gostatsd.Tags, newSource gostatsd.Source) +} diff --git a/pkg/statsd/worker.go b/pkg/statsd/worker.go index b6c2dc82..90c26a00 100644 --- a/pkg/statsd/worker.go +++ b/pkg/statsd/worker.go @@ -18,7 +18,6 @@ type processCommand struct { type worker struct { aggr Aggregator - metricsQueue chan []*gostatsd.Metric // Batches metricMapQueue chan *gostatsd.MetricMap processChan chan *processCommand id int @@ -27,11 +26,6 @@ type worker struct { func (w *worker) work() { for { select { - case metrics, ok := <-w.metricsQueue: - if !ok { - return - } - w.aggr.Receive(metrics...) case mm, ok := <-w.metricMapQueue: if !ok { return @@ -50,14 +44,6 @@ func (w *worker) executeProcess(cmd *processCommand) { func (w *worker) RunMetrics(ctx context.Context, statser stats.Statser) { wg := &wait.Group{} - wg.StartWithContext(ctx, stats.NewChannelStatsWatcher( - statser, - "dispatch_aggregator_batch", - gostatsd.Tags{fmt.Sprintf("aggregator_id:%d", w.id)}, - cap(w.metricsQueue), - func() int { return len(w.metricsQueue) }, - 1000*time.Millisecond, // TODO: Make configurable - ).Run) wg.StartWithContext(ctx, stats.NewChannelStatsWatcher( statser, "dispatch_aggregator_map", diff --git a/pkg/web/fixtures_test.go b/pkg/web/fixtures_test.go index 72bba8ce..f38a8e68 100644 --- a/pkg/web/fixtures_test.go +++ b/pkg/web/fixtures_test.go @@ -12,26 +12,19 @@ import ( ) type capturingHandler struct { - mu sync.Mutex - m []*gostatsd.Metric - e []*gostatsd.Event + mu sync.Mutex + maps []*gostatsd.MetricMap + e []*gostatsd.Event } func (ch *capturingHandler) EstimatedTags() int { return 0 } -func (ch *capturingHandler) DispatchMetrics(ctx context.Context, metrics []*gostatsd.Metric) { +func (ch *capturingHandler) DispatchMetricMap(ctx context.Context, mm *gostatsd.MetricMap) { ch.mu.Lock() defer ch.mu.Unlock() - for _, m := range metrics { - m.DoneFunc = nil // Clear DoneFunc because it contains non-predictable variable data which interferes with the tests - ch.m = append(ch.m, m) - } -} - -func (ch *capturingHandler) DispatchMetricMap(ctx context.Context, metrics *gostatsd.MetricMap) { - metrics.DispatchMetrics(ctx, ch) + ch.maps = append(ch.maps, mm) } func (ch *capturingHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) { @@ -43,12 +36,12 @@ func (ch *capturingHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event func (ch *capturingHandler) WaitForEvents() { } -func (ch *capturingHandler) GetMetrics() []*gostatsd.Metric { +func (ch *capturingHandler) MetricMaps() []*gostatsd.MetricMap { ch.mu.Lock() defer ch.mu.Unlock() - m := make([]*gostatsd.Metric, len(ch.m)) - copy(m, ch.m) - return m + mms := make([]*gostatsd.MetricMap, len(ch.maps)) + copy(mms, ch.maps) + return mms } func testContext(t *testing.T) (context.Context, func()) { diff --git a/pkg/web/http_receiver_v2_test.go b/pkg/web/http_receiver_v2_test.go index b853e771..527e5124 100644 --- a/pkg/web/http_receiver_v2_test.go +++ b/pkg/web/http_receiver_v2_test.go @@ -13,6 +13,7 @@ import ( "github.com/tilinna/clock" "github.com/atlassian/gostatsd" + "github.com/atlassian/gostatsd/internal/fixtures" "github.com/atlassian/gostatsd/pkg/statsd" "github.com/atlassian/gostatsd/pkg/transport" "github.com/atlassian/gostatsd/pkg/web" @@ -111,17 +112,24 @@ func TestForwardingEndToEndV2(t *testing.T) { Rate: 0.1, } + mm := gostatsd.NewMetricMap() + for i := 0; i < 100; i++ { - hfh.DispatchMetrics(ctxTest, []*gostatsd.Metric{m1, m2, m5, m6, m7, m8}) + mm.Receive(m1) + mm.Receive(m2) + mm.Receive(m5) + mm.Receive(m6) + mm.Receive(m7) + mm.Receive(m8) } // only do timers once, because they're very noisy in the output. - hfh.DispatchMetrics(ctxTest, []*gostatsd.Metric{m3, m4}) + mm.Receive(m3) + mm.Receive(m4) + hfh.DispatchMetricMap(ctxTest, mm) + + fixtures.NextStep(ctxTest, mockClock) + mockClock.Add(1 * time.Second) // Make sure everything gets scheduled - // There's no good way to tell when the Ticker has been created, so we use a hard loop - for _, d := mockClock.AddNext(); d == 0 && ctxTest.Err() == nil; _, d = mockClock.AddNext() { - time.Sleep(time.Millisecond) // Allows the system to actually idle, runtime.Gosched() does not. - } - mockClock.Add(1 * time.Second) // Make sure everything gets scheduled time.Sleep(50 * time.Millisecond) // Give the http call time to actually occur expected := []*gostatsd.Metric{ @@ -136,29 +144,18 @@ func TestForwardingEndToEndV2(t *testing.T) { {Name: "set", Type: gostatsd.SET, StringValue: "def", Rate: 1}, } - actual := ch.GetMetrics() - for _, metric := range actual { - metric.Timestamp = 0 // This isn't propagated through v2, and is set to the time of receive + actual := gostatsd.MergeMaps(ch.MetricMaps()).AsMetrics() + + for _, m := range expected { + m.FormatTagsKey() } - cmpSort := func(slice []*gostatsd.Metric) func(i, j int) bool { - return func(i, j int) bool { - if slice[i].Name == slice[j].Name { - if len(slice[i].Tags) == len(slice[j].Tags) { // This is not exactly accurate, but close enough with our data - if slice[i].Type == gostatsd.SET { - return slice[i].StringValue < slice[j].StringValue - } else { - return slice[i].Value < slice[j].Value - } - } - return len(slice[i].Tags) < len(slice[j].Tags) - } - return slice[i].Name < slice[j].Name - } + for _, metric := range actual { + metric.Timestamp = 0 // This isn't propagated through v2, and is set to the time of receive } - sort.Slice(actual, cmpSort(actual)) - sort.Slice(expected, cmpSort(expected)) + sort.Slice(actual, fixtures.SortCompare(actual)) + sort.Slice(expected, fixtures.SortCompare(expected)) require.EqualValues(t, expected, actual) testDone() diff --git a/types.go b/types.go index 445365e1..27556d71 100644 --- a/types.go +++ b/types.go @@ -72,7 +72,6 @@ func MaybeAppendRunnable(runnables []Runnable, maybeRunner interface{}) []Runnab // RawMetricHandler is an interface that accepts a Metric for processing. Raw refers to pre-aggregation, not // pre-consolidation. type RawMetricHandler interface { - DispatchMetrics(ctx context.Context, m []*Metric) DispatchMetricMap(ctx context.Context, mm *MetricMap) }