Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dispatch metrics #331

Merged
merged 15 commits into from
Aug 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
28.0.0
------
- Internal pipeline refactor removes two internal metrics

27.0.0
------
- Simplify internal tracking of the source address of a metric. The current rules are:
Expand Down
2 changes: 0 additions & 2 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Metrics:

| Name | type | tags | description
| ------------------------------------------- | ------------------- | ---------------------------- | -----------
| aggregator.metrics_received | gauge (flush) | aggregator_id | The number of datapoints received during the flush interval
| aggregator.metricmaps_received | gauge (flush) | aggregator_id | The number of datapoint batches received during the flush interval
| aggregator.aggregation_time | gauge (time) | aggregator_id | The time taken (in ms) to aggregate all counter and timer
| | | | datapoints in this flush interval
Expand Down Expand Up @@ -84,7 +83,6 @@ the samples are reset.

| Channel name | Additional tags | Description
| ------------------------- | --------------- | -----------
| dispatch_aggregator_batch | aggregator_id | Channel to dispatch metrics to a specific aggregator.
| dispatch_aggregator_map | aggregator_id | Channel to dispatch metric maps to a given aggregator.
| backend_events_sem | | Semaphore limiting the number of events in flight at once. Corresponds to
| | | the `--max-concurrent-events` flag.
Expand Down
7 changes: 6 additions & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@ type Event struct {
AlertType AlertType
}

func (e *Event) AddTagsSetSource(additionalTags Tags, newSource Source) {
e.Tags = e.Tags.Concat(additionalTags)
e.Source = newSource
}

// Events represents a list of events.
type Events []Event
type Events []*Event
42 changes: 0 additions & 42 deletions fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gostatsd

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -24,44 +23,3 @@ func testContext(t *testing.T) (context.Context, func()) {
}()
return ctxTest, completeTest
}

type capturingHandler struct {
mu sync.Mutex
m []*Metric
e []*Event
}

func (ch *capturingHandler) EstimatedTags() int {
return 0
}

func (ch *capturingHandler) DispatchMetrics(ctx context.Context, metrics []*Metric) {
ch.mu.Lock()
defer ch.mu.Unlock()
for _, m := range metrics {
m.DoneFunc = nil // Clear DoneFunc because it contains non-predictable variable data which interferes with the tests
ch.m = append(ch.m, m)
}
}

// DispatchMetricMap re-dispatches a metric map through capturingHandler.DispatchMetrics
func (ch *capturingHandler) DispatchMetricMap(ctx context.Context, mm *MetricMap) {
mm.DispatchMetrics(ctx, ch)
}

func (ch *capturingHandler) DispatchEvent(ctx context.Context, e *Event) {
ch.mu.Lock()
defer ch.mu.Unlock()
ch.e = append(ch.e, e)
}

func (ch *capturingHandler) WaitForEvents() {
}

func (ch *capturingHandler) GetMetrics() []*Metric {
ch.mu.Lock()
defer ch.mu.Unlock()
m := make([]*Metric, len(ch.m))
copy(m, ch.m)
return m
}
80 changes: 80 additions & 0 deletions internal/fixtures/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package fixtures

import (
"fmt"

"github.com/atlassian/gostatsd"
)

type MetricOpt func(m *gostatsd.Metric)

// MakeMetric provides a way to build a metric for tests. Hopefully over
// time this will be used more, bringing more consistency to tests.
func MakeMetric(opts ...MetricOpt) *gostatsd.Metric {
m := &gostatsd.Metric{
Type: gostatsd.COUNTER,
Name: "name",
Rate: 1,
Tags: gostatsd.Tags{
"foo:bar",
"host:baz",
},
Source: "baz",
}
for _, opt := range opts {
opt(m)
}
return m
}

func Name(n string) MetricOpt {
return func(m *gostatsd.Metric) {
m.Name = n
}
}

func AddTag(t ...string) MetricOpt {
return func(m *gostatsd.Metric) {
m.Tags = append(m.Tags, t...)
}
}

func DropSource(m *gostatsd.Metric) {
m.Source = gostatsd.UnknownSource
}

func DropTag(t string) MetricOpt {
return func(m *gostatsd.Metric) {
next := 0
found := false
for _, tag := range m.Tags {
if t == tag {
found = true
m.Tags[next] = tag
next++
}
}
if !found {
panic(fmt.Sprintf("failed to find tag %s while building metric", t))
}
m.Tags = m.Tags[:next]
}
}

// SortCompare func for metrics so they can be compared with require.EqualValues
// Invoke with sort.Slice(x, SortCompare(x))
func SortCompare(ms []*gostatsd.Metric) func(i, j int) bool {
return func(i, j int) bool {
if ms[i].Name == ms[j].Name {
if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data
if ms[i].Type == gostatsd.SET {
return ms[i].StringValue < ms[j].StringValue
} else {
return ms[i].Value < ms[j].Value
}
}
return len(ms[i].Tags) < len(ms[j].Tags)
}
return ms[i].Name < ms[j].Name
}
}
36 changes: 26 additions & 10 deletions metric_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"github.com/tilinna/clock"
)

// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and send the slice to the provided
// channel. Run can be started in a long running goroutine to perform flushing, or Flush can be called externally.
// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and either send the slice to
// the provided channel, or make them available synchronously through Drain/Fill. Run can also be started in a long
// running goroutine to perform flushing, or Flush can be called externally to trigger the channel send.
//
// Used to consolidate metrics such as:
// - counter[name=x, value=1]
Expand All @@ -30,9 +31,7 @@ type MetricConsolidator struct {
func NewMetricConsolidator(spots int, flushInterval time.Duration, sink chan<- []*MetricMap) *MetricConsolidator {
mc := &MetricConsolidator{}
mc.maps = make(chan *MetricMap, spots)
for i := 0; i < spots; i++ {
mc.maps <- NewMetricMap()
}
mc.Fill()
mc.flushInterval = flushInterval
mc.sink = sink
return mc
Expand All @@ -52,10 +51,11 @@ func (mc *MetricConsolidator) Run(ctx context.Context) {
}
}

// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then
// create new MetricMaps for new metrics to land in. Not thread-safe.
func (mc *MetricConsolidator) Flush(ctx context.Context) {
var mms []*MetricMap
// Drain will collect all the MetricMaps in the MetricConsolidator and return them. If the
// context.Context is canceled before everything can be collected, they are returned to the
// MetricConsolidator and nil is returned.
func (mc *MetricConsolidator) Drain(ctx context.Context) []*MetricMap {
mms := make([]*MetricMap, 0, cap(mc.maps))
for i := 0; i < cap(mc.maps); i++ {
select {
case mm := <-mc.maps:
Expand All @@ -66,9 +66,19 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) {
for _, mm := range mms {
mc.maps <- mm
}
return
return nil
}
}
return mms
}

// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then
// create new MetricMaps for new metrics to land in. Not thread-safe.
func (mc *MetricConsolidator) Flush(ctx context.Context) {
mms := mc.Drain(ctx)
if mms == nil {
return
}

// Send the collected data to the sink before putting new maps in place. This allows back-pressure
// to propagate through the system, if the sink can't keep up.
Expand All @@ -77,6 +87,12 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) {
case <-ctx.Done():
}

mc.Fill()
}

// Fill re-populates the MetricConsolidator with empty MetricMaps, it is the pair to Drain and
// must be called after a successful Drain, must not be called after a failed Drain.
func (mc *MetricConsolidator) Fill() {
for i := 0; i < cap(mc.maps); i++ {
mc.maps <- NewMetricMap()
}
Expand Down
15 changes: 11 additions & 4 deletions metric_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gostatsd

import (
"bytes"
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -46,6 +45,14 @@ func (mm *MetricMap) Receive(m *Metric) {
m.Done()
}

func MergeMaps(mms []*MetricMap) *MetricMap {
mm := NewMetricMap()
for _, mmFrom := range mms {
mm.Merge(mmFrom)
}
return mm
}

func (mm *MetricMap) Merge(mmFrom *MetricMap) {
mmFrom.Counters.Each(func(metricName string, tagsKey string, counterFrom Counter) {
v, ok := mm.Counters[metricName]
Expand Down Expand Up @@ -359,8 +366,8 @@ func (mm *MetricMap) String() string {
return buf.String()
}

// DispatchMetrics will synthesize Metrics from the MetricMap and push them to the supplied PipelineHandler
func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandler) {
// AsMetrics will synthesize Metrics from the MetricMap and return them as a slice
func (mm *MetricMap) AsMetrics() []*Metric {
var metrics []*Metric

mm.Counters.Each(func(metricName string, tagsKey string, c Counter) {
Expand Down Expand Up @@ -426,5 +433,5 @@ func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandl
}
})

handler.DispatchMetrics(ctx, metrics)
return metrics
}
41 changes: 18 additions & 23 deletions metric_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,13 @@ func BenchmarkReceives(b *testing.B) {
}

func TestMetricMapDispatch(t *testing.T) {
ctx, done := testContext(t)
defer done()

mm := NewMetricMap()
metrics := metricsFixtures()
for _, metric := range metrics {
mm.Receive(metric)
}
ch := &capturingHandler{}

mm.DispatchMetrics(ctx, ch)
actual := mm.AsMetrics()

expected := []*Metric{
{Name: "abc.def.g", Value: 3, Rate: 1, Type: GAUGE, Timestamp: 10},
Expand All @@ -173,28 +169,27 @@ func TestMetricMapDispatch(t *testing.T) {
{Name: "uniq.usr", StringValue: "john", Rate: 1, TagsKey: "baz,foo:bar", Tags: Tags{"baz", "foo:bar"}, Type: SET, Timestamp: 10},
}

cmpSort := func(slice []*Metric) func(i, j int) bool {
return func(i, j int) bool {
if slice[i].Name == slice[j].Name {
if len(slice[i].Tags) == len(slice[j].Tags) { // This is not exactly accurate, but close enough with our data
if slice[i].Type == SET {
return slice[i].StringValue < slice[j].StringValue
} else {
return slice[i].Value < slice[j].Value
}
sort.Slice(actual, SortCompare(actual))
sort.Slice(expected, SortCompare(expected))

require.EqualValues(t, expected, actual)
}

// Copied from internal/fixtures because dependency loops
func SortCompare(ms []*Metric) func(i, j int) bool {
return func(i, j int) bool {
if ms[i].Name == ms[j].Name {
if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data
if ms[i].Type == SET {
return ms[i].StringValue < ms[j].StringValue
} else {
return ms[i].Value < ms[j].Value
}
return len(slice[i].Tags) < len(slice[j].Tags)
}
return slice[i].Name < slice[j].Name
return len(ms[i].Tags) < len(ms[j].Tags)
}
return ms[i].Name < ms[j].Name
}

actual := ch.GetMetrics()

sort.Slice(actual, cmpSort(actual))
sort.Slice(expected, cmpSort(expected))

require.EqualValues(t, expected, actual)
}

func TestMetricMapMerge(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ type Metric struct {
DoneFunc func() // Returns the metric to the pool. May be nil. Call Metric.Done(), not this.
}

func (m *Metric) AddTagsSetSource(additionalTags Tags, newSource Source) {
if len(additionalTags) > 0 {
m.Tags = m.Tags.Concat(additionalTags)
m.TagsKey = ""
}
if newSource != m.Source {
m.Source = newSource
m.TagsKey = ""
}
}

// Reset is used to reset a metric to as clean state, called on re-use from the pool.
func (m *Metric) Reset() {
m.Name = ""
Expand Down
Loading