Skip to content

Commit

Permalink
Merge edc29b1 into cc12661
Browse files Browse the repository at this point in the history
  • Loading branch information
tiedotguy committed Jul 10, 2020
2 parents cc12661 + edc29b1 commit 44207f6
Show file tree
Hide file tree
Showing 32 changed files with 564 additions and 672 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
28.0.0
------
- Internal pipeline refactor removes two internal metrics

27.0.0
------
- Simplify internal tracking of the source address of a metric. The current rules are:
Expand Down
2 changes: 0 additions & 2 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ Metrics:

| Name | type | tags | description
| ------------------------------------------- | ------------------- | ---------------------------- | -----------
| aggregator.metrics_received | gauge (flush) | aggregator_id | The number of datapoints received during the flush interval
| aggregator.metricmaps_received | gauge (flush) | aggregator_id | The number of datapoint batches received during the flush interval
| aggregator.aggregation_time | gauge (time) | aggregator_id | The time taken (in ms) to aggregate all counter and timer
| | | | datapoints in this flush interval
Expand Down Expand Up @@ -84,7 +83,6 @@ the samples are reset.

| Channel name | Additional tags | Description
| ------------------------- | --------------- | -----------
| dispatch_aggregator_batch | aggregator_id | Channel to dispatch metrics to a specific aggregator.
| dispatch_aggregator_map | aggregator_id | Channel to dispatch metric maps to a given aggregator.
| backend_events_sem | | Semaphore limiting the number of events in flight at once. Corresponds to
| | | the `--max-concurrent-events` flag.
Expand Down
7 changes: 6 additions & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,10 @@ type Event struct {
AlertType AlertType
}

func (e *Event) AddTagsSetSource(additionalTags Tags, newSource Source) {
e.Tags = e.Tags.Concat(additionalTags)
e.Source = newSource
}

// Events represents a list of events.
type Events []Event
type Events []*Event
42 changes: 0 additions & 42 deletions fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gostatsd

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -24,44 +23,3 @@ func testContext(t *testing.T) (context.Context, func()) {
}()
return ctxTest, completeTest
}

type capturingHandler struct {
mu sync.Mutex
m []*Metric
e []*Event
}

func (ch *capturingHandler) EstimatedTags() int {
return 0
}

func (ch *capturingHandler) DispatchMetrics(ctx context.Context, metrics []*Metric) {
ch.mu.Lock()
defer ch.mu.Unlock()
for _, m := range metrics {
m.DoneFunc = nil // Clear DoneFunc because it contains non-predictable variable data which interferes with the tests
ch.m = append(ch.m, m)
}
}

// DispatchMetricMap re-dispatches a metric map through capturingHandler.DispatchMetrics
func (ch *capturingHandler) DispatchMetricMap(ctx context.Context, mm *MetricMap) {
mm.DispatchMetrics(ctx, ch)
}

func (ch *capturingHandler) DispatchEvent(ctx context.Context, e *Event) {
ch.mu.Lock()
defer ch.mu.Unlock()
ch.e = append(ch.e, e)
}

func (ch *capturingHandler) WaitForEvents() {
}

func (ch *capturingHandler) GetMetrics() []*Metric {
ch.mu.Lock()
defer ch.mu.Unlock()
m := make([]*Metric, len(ch.m))
copy(m, ch.m)
return m
}
80 changes: 80 additions & 0 deletions internal/fixtures/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package fixtures

import (
"fmt"

"github.com/atlassian/gostatsd"
)

type MetricOpt func(m *gostatsd.Metric)

// MakeMetric provides a way to build a metric for tests. Hopefully over
// time this will be used more, bringing more consistency to tests.
func MakeMetric(opts ...MetricOpt) *gostatsd.Metric {
m := &gostatsd.Metric{
Type: gostatsd.COUNTER,
Name: "name",
Rate: 1,
Tags: gostatsd.Tags{
"foo:bar",
"host:baz",
},
Source: "baz",
}
for _, opt := range opts {
opt(m)
}
return m
}

func Name(n string) MetricOpt {
return func(m *gostatsd.Metric) {
m.Name = n
}
}

func AddTag(t ...string) MetricOpt {
return func(m *gostatsd.Metric) {
m.Tags = append(m.Tags, t...)
}
}

func DropSource(m *gostatsd.Metric) {
m.Source = gostatsd.UnknownSource
}

func DropTag(t string) MetricOpt {
return func(m *gostatsd.Metric) {
next := 0
found := false
for _, tag := range m.Tags {
if t == tag {
found = true
m.Tags[next] = tag
next++
}
}
if !found {
panic(fmt.Sprintf("failed to find tag %s while building metric", t))
}
m.Tags = m.Tags[:next]
}
}

// SortCompare func for metrics so they can be compared with require.EqualValues
// Invoke with sort.Slice(x, SortCompare(x))
func SortCompare(ms []*gostatsd.Metric) func(i, j int) bool {
return func(i, j int) bool {
if ms[i].Name == ms[j].Name {
if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data
if ms[i].Type == gostatsd.SET {
return ms[i].StringValue < ms[j].StringValue
} else {
return ms[i].Value < ms[j].Value
}
}
return len(ms[i].Tags) < len(ms[j].Tags)
}
return ms[i].Name < ms[j].Name
}
}
36 changes: 26 additions & 10 deletions metric_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"github.com/tilinna/clock"
)

// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and send the slice to the provided
// channel. Run can be started in a long running goroutine to perform flushing, or Flush can be called externally.
// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and either send the slice to
// the provided channel, or make them available synchronously through Drain/Fill. Run can also be started in a long
// running goroutine to perform flushing, or Flush can be called externally to trigger the channel send.
//
// Used to consolidate metrics such as:
// - counter[name=x, value=1]
Expand All @@ -30,9 +31,7 @@ type MetricConsolidator struct {
func NewMetricConsolidator(spots int, flushInterval time.Duration, sink chan<- []*MetricMap) *MetricConsolidator {
mc := &MetricConsolidator{}
mc.maps = make(chan *MetricMap, spots)
for i := 0; i < spots; i++ {
mc.maps <- NewMetricMap()
}
mc.Fill()
mc.flushInterval = flushInterval
mc.sink = sink
return mc
Expand All @@ -52,10 +51,11 @@ func (mc *MetricConsolidator) Run(ctx context.Context) {
}
}

// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then
// create new MetricMaps for new metrics to land in. Not thread-safe.
func (mc *MetricConsolidator) Flush(ctx context.Context) {
var mms []*MetricMap
// Drain will collect all the MetricMaps in the MetricConsolidator and return them. If the
// context.Context is canceled before everything can be collected, they are returned to the
// MetricConsolidator and nil is returned.
func (mc *MetricConsolidator) Drain(ctx context.Context) []*MetricMap {
mms := make([]*MetricMap, 0, cap(mc.maps))
for i := 0; i < cap(mc.maps); i++ {
select {
case mm := <-mc.maps:
Expand All @@ -66,9 +66,19 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) {
for _, mm := range mms {
mc.maps <- mm
}
return
return nil
}
}
return mms
}

// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then
// create new MetricMaps for new metrics to land in. Not thread-safe.
func (mc *MetricConsolidator) Flush(ctx context.Context) {
mms := mc.Drain(ctx)
if mms == nil {
return
}

// Send the collected data to the sink before putting new maps in place. This allows back-pressure
// to propagate through the system, if the sink can't keep up.
Expand All @@ -77,6 +87,12 @@ func (mc *MetricConsolidator) Flush(ctx context.Context) {
case <-ctx.Done():
}

mc.Fill()
}

// Fill re-populates the MetricConsolidator with empty MetricMaps, it is the pair to Drain and
// must be called after a successful Drain, must not be called after a failed Drain.
func (mc *MetricConsolidator) Fill() {
for i := 0; i < cap(mc.maps); i++ {
mc.maps <- NewMetricMap()
}
Expand Down
15 changes: 11 additions & 4 deletions metric_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gostatsd

import (
"bytes"
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -46,6 +45,14 @@ func (mm *MetricMap) Receive(m *Metric) {
m.Done()
}

func MergeMaps(mms []*MetricMap) *MetricMap {
mm := NewMetricMap()
for _, mmFrom := range mms {
mm.Merge(mmFrom)
}
return mm
}

func (mm *MetricMap) Merge(mmFrom *MetricMap) {
mmFrom.Counters.Each(func(metricName string, tagsKey string, counterFrom Counter) {
v, ok := mm.Counters[metricName]
Expand Down Expand Up @@ -359,8 +366,8 @@ func (mm *MetricMap) String() string {
return buf.String()
}

// DispatchMetrics will synthesize Metrics from the MetricMap and push them to the supplied PipelineHandler
func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandler) {
// AsMetrics will synthesize Metrics from the MetricMap and return them as a slice
func (mm *MetricMap) AsMetrics() []*Metric {
var metrics []*Metric

mm.Counters.Each(func(metricName string, tagsKey string, c Counter) {
Expand Down Expand Up @@ -426,5 +433,5 @@ func (mm *MetricMap) DispatchMetrics(ctx context.Context, handler RawMetricHandl
}
})

handler.DispatchMetrics(ctx, metrics)
return metrics
}
41 changes: 18 additions & 23 deletions metric_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,13 @@ func BenchmarkReceives(b *testing.B) {
}

func TestMetricMapDispatch(t *testing.T) {
ctx, done := testContext(t)
defer done()

mm := NewMetricMap()
metrics := metricsFixtures()
for _, metric := range metrics {
mm.Receive(metric)
}
ch := &capturingHandler{}

mm.DispatchMetrics(ctx, ch)
actual := mm.AsMetrics()

expected := []*Metric{
{Name: "abc.def.g", Value: 3, Rate: 1, Type: GAUGE, Timestamp: 10},
Expand All @@ -173,28 +169,27 @@ func TestMetricMapDispatch(t *testing.T) {
{Name: "uniq.usr", StringValue: "john", Rate: 1, TagsKey: "baz,foo:bar", Tags: Tags{"baz", "foo:bar"}, Type: SET, Timestamp: 10},
}

cmpSort := func(slice []*Metric) func(i, j int) bool {
return func(i, j int) bool {
if slice[i].Name == slice[j].Name {
if len(slice[i].Tags) == len(slice[j].Tags) { // This is not exactly accurate, but close enough with our data
if slice[i].Type == SET {
return slice[i].StringValue < slice[j].StringValue
} else {
return slice[i].Value < slice[j].Value
}
sort.Slice(actual, SortCompare(actual))
sort.Slice(expected, SortCompare(expected))

require.EqualValues(t, expected, actual)
}

// Copied from internal/fixtures because dependency loops
func SortCompare(ms []*Metric) func(i, j int) bool {
return func(i, j int) bool {
if ms[i].Name == ms[j].Name {
if len(ms[i].Tags) == len(ms[j].Tags) { // This is not exactly accurate, but close enough with our data
if ms[i].Type == SET {
return ms[i].StringValue < ms[j].StringValue
} else {
return ms[i].Value < ms[j].Value
}
return len(slice[i].Tags) < len(slice[j].Tags)
}
return slice[i].Name < slice[j].Name
return len(ms[i].Tags) < len(ms[j].Tags)
}
return ms[i].Name < ms[j].Name
}

actual := ch.GetMetrics()

sort.Slice(actual, cmpSort(actual))
sort.Slice(expected, cmpSort(expected))

require.EqualValues(t, expected, actual)
}

func TestMetricMapMerge(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ type Metric struct {
DoneFunc func() // Returns the metric to the pool. May be nil. Call Metric.Done(), not this.
}

func (m *Metric) AddTagsSetSource(additionalTags Tags, newSource Source) {
if len(additionalTags) > 0 {
m.Tags = m.Tags.Concat(additionalTags)
m.TagsKey = ""
}
if newSource != m.Source {
m.Source = newSource
m.TagsKey = ""
}
}

// Reset is used to reset a metric to as clean state, called on re-use from the pool.
func (m *Metric) Reset() {
m.Name = ""
Expand Down
3 changes: 2 additions & 1 deletion pkg/stats/flush_notifier.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stats

import (
"context"
"sync"
"time"
)
Expand Down Expand Up @@ -35,7 +36,7 @@ func (fn *flushNotifier) RegisterFlush() (ch <-chan time.Duration, unregister fu

// NotifyFlush will notify any registered channels that a flush has completed.
// Non-blocking, thread-safe.
func (fn *flushNotifier) NotifyFlush(d time.Duration) {
func (fn *flushNotifier) NotifyFlush(ctx context.Context, d time.Duration) {
fn.lock.RLock()
defer fn.lock.RUnlock()
for _, hook := range fn.flushTargets {
Expand Down
Loading

0 comments on commit 44207f6

Please sign in to comment.