Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Add a way to export gauge metrics and other changes. (#604)
Browse files Browse the repository at this point in the history
* Add a way to export gauge metrics and other changes.

= Introduce the ability to transform metrics before exporting. Currently implementing two types of transformations:
  - Add "failure" metric to EventMetrics.
  - Convert cumulative metrics to gauge metrics.

= To calculate gauge metrics from cumulative metrics, we keep a "last-value" cache of EventMetrics by a key that consists of metric names and label keys and values. We subtract last EventMetrics from the new EventMetrics and export the results.

= This requires implementing subtract logic through the metrics stack: from EventMetrics to individual value types. We need to also take into account that metrics may be reset.

Ref: #494
PiperOrigin-RevId: 377179096
  • Loading branch information
manugarg committed Jun 3, 2021
1 parent ebb3de5 commit 6a737ea
Show file tree
Hide file tree
Showing 16 changed files with 515 additions and 76 deletions.
39 changes: 33 additions & 6 deletions metrics/dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,25 +123,52 @@ func (d *Distribution) AddFloat64(f float64) {
// Add adds a distribution to the receiver distribution. If both distributions
// don't have the same buckets, an error is returned.
func (d *Distribution) Add(val Value) error {
_, err := d.addOrSubtract(val, false)
return err
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (d *Distribution) SubtractCounter(lastVal Value) (bool, error) {
return d.addOrSubtract(lastVal, true)
}

func (d *Distribution) addOrSubtract(val Value, subtract bool) (bool, error) {
delta, ok := val.(*Distribution)
if !ok {
return errors.New("incompatible value to add to distribution")
return false, errors.New("dist: incompatible value to add or subtract")
}

if !reflect.DeepEqual(d.lowerBounds, delta.lowerBounds) {
return fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds)
return false, fmt.Errorf("incompatible delta value, Bucket lower bounds in receiver distribution: %v, and in delta distribution: %v", d.lowerBounds, delta.lowerBounds)
}
d.mu.Lock()
defer d.mu.Unlock()
delta.mu.RLock()
defer delta.mu.RUnlock()

if subtract {
// If receiver count is less than lastVal' count, assume reset and return.
if d.count < delta.count {
return true, nil
}
d.count -= delta.count
d.sum -= delta.sum
} else {
d.count += delta.count
d.sum += delta.sum
}

for i := 0; i < len(d.bucketCounts); i++ {
d.bucketCounts[i] += delta.bucketCounts[i]
if subtract {
d.bucketCounts[i] -= delta.bucketCounts[i]
} else {
d.bucketCounts[i] += delta.bucketCounts[i]
}
}
d.count += delta.count
d.sum += delta.sum
return nil

return false, nil
}

// String returns a string representation of the distribution:
Expand Down
19 changes: 19 additions & 0 deletions metrics/dist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,25 @@ func TestDistAdd(t *testing.T) {
verifyBucketCount(t, d, []int{0, 1, 2, 3, 4, 5}, []int64{1, 2, 0, 2, 0, 1})
}

func TestDistSubtractCounter(t *testing.T) {
lb := []float64{1, 5, 15, 30, 45}
d := NewDistribution(lb)

for _, s := range []float64{0.5, 4, 17} {
d.AddSample(s)
}

d2 := d.Clone().(*Distribution)
for _, s := range []float64{3.5, 21, 300} {
d2.AddSample(s)
}

if wasReset, err := d2.SubtractCounter(d); err != nil || wasReset {
t.Errorf("SubtractCounter error: %v, wasReset: %v", err, wasReset)
}
verifyBucketCount(t, d2, []int{0, 1, 2, 3, 4, 5}, []int64{0, 1, 0, 1, 0, 1})
}

func TestDistData(t *testing.T) {
lb := []float64{1, 5, 15, 30, 45}
d := NewDistribution(lb)
Expand Down
45 changes: 45 additions & 0 deletions metrics/eventmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@ func (em *EventMetrics) Update(in *EventMetrics) error {
}
}

// SubtractLast subtracts the provided (last) EventMetrics from the receiver
// EventMetrics and return the result as a GAUGE EventMetrics.
func (em *EventMetrics) SubtractLast(lastEM *EventMetrics) (*EventMetrics, error) {
if em.Kind != CUMULATIVE || lastEM.Kind != CUMULATIVE {
return nil, fmt.Errorf("incorrect eventmetrics kind (current: %v, last: %v), SubtractLast works only for CUMULATIVE metrics", em.Kind, lastEM.Kind)
}

gaugeEM := em.Clone()
gaugeEM.Kind = GAUGE

for name, lastVal := range lastEM.metrics {
val, ok := gaugeEM.metrics[name]
if !ok {
return nil, fmt.Errorf("receiver EventMetrics doesn't have %s metric", name)
}
wasReset, err := val.SubtractCounter(lastVal)
if err != nil {
return nil, err
}

// If any metric is reset, consider it a full reset of EventMetrics.
// TODO(manugarg): See if we can track this event somehow.
if wasReset {
gaugeEM := em.Clone()
gaugeEM.Kind = GAUGE
return gaugeEM, nil
}
}

return gaugeEM, nil
}

// String returns the string representation of the EventMetrics.
// Note that this is compatible with what vmwatcher understands.
// Example output string:
Expand Down Expand Up @@ -217,3 +249,16 @@ func (em *EventMetrics) String() string {
}
return b.String()
}

// Key returns a string key that uniquely identifies an eventmetrics.
func (em *EventMetrics) Key() string {
em.mu.RLock()
defer em.mu.RUnlock()

var keys []string
keys = append(keys, em.metricsKeys...)
for _, k := range em.LabelsKeys() {
keys = append(keys, k+"="+em.labels[k])
}
return strings.Join(keys, ",")
}
62 changes: 62 additions & 0 deletions metrics/eventmetrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,68 @@ func TestEventMetricsUpdate(t *testing.T) {
}
}

func TestEventMetricsSubtractCounters(t *testing.T) {
rttVal := NewInt(0)
rttVal.Str = func(i int64) string {
return fmt.Sprintf("%.3f", float64(i)/1000)
}
m := newEventMetrics(10, 10, 1000, make(map[string]int64))
m.AddLabel("ptype", "http")

// First run
m2 := newEventMetrics(32, 22, 220100, map[string]int64{
"200": 22,
})
gEM, err := m2.SubtractLast(m)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 22, 12, 219100, map[string]int64{
"200": 22,
})

// Second run
m3 := newEventMetrics(42, 31, 300100, map[string]int64{
"200": 24,
"204": 8,
})

gEM, err = m3.SubtractLast(m2)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 10, 9, 80000, map[string]int64{
"200": 2,
"204": 8,
})

// Third run, expect reset
m4 := newEventMetrics(10, 8, 1100, map[string]int64{
"200": 8,
})
gEM, err = m4.SubtractLast(m3)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
verifyEventMetrics(t, gEM, 10, 8, 1100, map[string]int64{
"200": 8,
})
}

func TestKey(t *testing.T) {
m := newEventMetrics(42, 31, 300100, map[string]int64{
"200": 24,
"204": 8,
}).AddLabel("probe", "google-homepage")

key := m.Key()
wantKey := "sent,rcvd,rtt,resp-code,probe=google-homepage"

if key != wantKey {
t.Errorf("Got key: %s, wanted: %s", key, wantKey)
}
}

func BenchmarkEventMetricsStringer(b *testing.B) {
em := newEventMetrics(32, 22, 220100, map[string]int64{
"200": 22,
Expand Down
16 changes: 16 additions & 0 deletions metrics/float.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ func (f *Float) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (f *Float) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(*Float)
if !ok {
return false, errors.New("incompatible value to add")
}
if f.f < lv.f {
return true, nil
}

f.f -= lv.f
return false, nil
}

// AddInt64 adds an int64 to the receiver Float.
func (f *Float) AddInt64(i int64) {
f.f += float64(i)
Expand Down
44 changes: 42 additions & 2 deletions metrics/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ func (i *Int) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal", assuming that value
// represents a counter, i.e. if "value" is less than "lastVal", we assume that
// counter has been reset and don't subtract.
func (i *Int) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(*Int)
if !ok {
return false, errors.New("incompatible value to subtract")
}

if i.i < lv.i {
return true, nil
}

i.i -= lv.i
return false, nil
}

// AddInt64 adds an int64 to the receiver Int.
func (i *Int) AddInt64(ii int64) {
i.i += ii
Expand All @@ -93,8 +110,9 @@ func (i *Int) String() string {
return strconv.FormatInt(i.Int64(), 10)
}

// AtomicInt implements NumValue with int64 storage and atomic operations. If concurrency-safety
// is not a requirement, e.g. for use in already mutex protected map, you could use Int.
// AtomicInt implements NumValue with int64 storage and atomic operations. If
// concurrency-safety is not a requirement, e.g. for use in already mutex
// protected map, you could use Int.
type AtomicInt struct {
i int64
// If Str is defined, this is method used to convert AtomicInt into a string.
Expand Down Expand Up @@ -147,6 +165,28 @@ func (i *AtomicInt) Add(val Value) error {
return nil
}

// SubtractCounter subtracts the provided "lastVal". Note that this function
// is not fully atomic: we first load the values, compare them, and then update
// the receiver if required. There is a possibility that either receiver, or
// lastVal may change between loading of the values and updating them. We
// should still not get negative values though, as we use the snapshots to
// finally update the value.
func (i *AtomicInt) SubtractCounter(lastVal Value) (bool, error) {
lv, ok := lastVal.(NumValue)
if !ok {
return false, errors.New("incompatible value to subtract")
}

valS := i.Int64()
lvS := lv.Int64()

if valS < lvS {
return true, nil
}
atomic.StoreInt64(&i.i, valS-lvS)
return false, nil
}

// AddInt64 adds an int64 to the receiver Int.
func (i *AtomicInt) AddInt64(ii int64) {
atomic.AddInt64(&i.i, ii)
Expand Down
Loading

0 comments on commit 6a737ea

Please sign in to comment.