Skip to content

Commit

Permalink
streamaggr: implement delay in total aggr (VictoriaMetrics#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene Ma authored and GitHub Enterprise committed Apr 19, 2024
1 parent a42cdc2 commit 9537a52
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 25 deletions.
4 changes: 3 additions & 1 deletion lib/streamaggr/dedup_timing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package streamaggr

import (
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metrics"
"sync/atomic"
"testing"
"time"
Expand All @@ -19,7 +21,7 @@ func BenchmarkDedupAggr(b *testing.B) {
}

func BenchmarkDedupAggrFlushSerial(b *testing.B) {
as := newTotalAggrState(time.Hour, true, true)
as := newTotalAggrState(time.Hour, time.Hour, 0, true, true, fasttime.UnixTimestamp, metrics.NewSet())
benchSamples := newBenchSamples(100_000)
da := newDedupAggr()
da.pushSamples(benchSamples)
Expand Down
2 changes: 1 addition & 1 deletion lib/streamaggr/deduplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",n
d.flush(pushFunc, time.Hour)
d.MustStop()

result := timeSeriessToString(tssResult)
result := timeSeriessToString(tssResult, false)
resultExpected := `asfjkldsf{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 12322
bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3
Expand Down
32 changes: 28 additions & 4 deletions lib/streamaggr/streamaggr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
Expand Down Expand Up @@ -111,6 +112,9 @@ type Options struct {
//
// This option can be overridden individually per each aggregation via ignore_old_samples option.
IgnoreOldSamples bool

// Use a custom function for getting timestamps. Just used for testing.
Now func() uint64
}

// Config is a configuration for a single stream aggregation.
Expand Down Expand Up @@ -140,6 +144,12 @@ type Config struct {
// The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket.
StalenessInterval string `yaml:"staleness_interval,omitempty"`

// If greater than zero, the number of seconds (inclusive) a sample can be delayed.
// In this mode, sample timestamps are taken into account and aggregated within intervals (defined by intervalSecs).
// Aggregated samples have timestamps aligned with the interval.
// Default: 0
Delay string `yaml:"delay,omitempty"`

// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
Expand Down Expand Up @@ -432,6 +442,15 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
}
}

// check cfg.Delay
var delay time.Duration = 0
if cfg.Delay != "" {
delay, err = time.ParseDuration(cfg.Delay)
if err != nil {
return nil, fmt.Errorf("cannot parse `delay: %q`: %w", cfg.Delay, err)
}
}

// Check cfg.DropInputLabels
dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil {
Expand Down Expand Up @@ -479,6 +498,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
ignoreOldSamples = *v
}

nowFunc := opts.Now
if nowFunc == nil {
nowFunc = fasttime.UnixTimestamp
}

// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
Expand Down Expand Up @@ -512,13 +536,13 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, true)
aggrStates[i] = newTotalAggrState(interval, stalenessInterval, delay, false, true, nowFunc, ms)
case "total_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, false, false)
aggrStates[i] = newTotalAggrState(interval, stalenessInterval, delay, false, false, nowFunc, ms)
case "increase":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, true)
aggrStates[i] = newTotalAggrState(interval, stalenessInterval, delay, true, true, nowFunc, ms)
case "increase_prometheus":
aggrStates[i] = newTotalAggrState(stalenessInterval, true, false)
aggrStates[i] = newTotalAggrState(interval, stalenessInterval, delay, true, false, nowFunc, ms)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
Expand Down
107 changes: 101 additions & 6 deletions lib/streamaggr/streamaggr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,98 @@ func TestAggregatorsFailure(t *testing.T) {
`)
}

func TestAggregatorWithDelay(t *testing.T) {
config := `
- interval: 15s
delay: 10s
without: [pod]
outputs: [total_prometheus]
staleness_interval: 1h
`

var now uint64
opts := &Options{
FlushOnShutdown: true,
Now: func() uint64 {
return now
},
}
var tssOutput []prompbmarshal.TimeSeries
var tssOutputLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssOutputLock.Lock()
tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock()
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}

push := func(inputMetrics string) {
tssInput := mustParsePromMetrics(inputMetrics)
_ = a.Push(tssInput, nil)
}

flush := func(outputMetricsExpected string) {
for _, ag := range a.as {
ag.flush(pushFunc, time.Duration(123*float64(time.Second)), false)
}
// Verify the tssOutput contains the expected metrics
outputMetrics := timeSeriessToString(tssOutput, true)
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
tssOutput = nil
}

// windows: 1000000005, 1000000020, 1000000035, 1000000050, 1000000065, ...

// initialize
now = 1000000025
push(`
histogram{pod="a", le="1"} 0 1000000022000
histogram{pod="a", le="5"} 0 1000000022000
histogram{pod="a", le="10"} 0 1000000022000
`)
flush(``)

// normal case
now = 1000000040
push(`
histogram{pod="a", le="1"} 1 1000000037000
histogram{pod="a", le="5"} 1 1000000037000
histogram{pod="a", le="10"} 1 1000000037000
`)
flush(``)

// missed a bucket
now = 1000000055
push(`
histogram{pod="a", le="1"} 1 1000000052000
histogram{pod="a", le="10"} 4 1000000052000
`)
flush(``)

// found it
now = 1000000057
push(`
histogram{pod="a", le="5"} 4 1000000052000
`)
now = 1000000068
flush(
`histogram:15s_without_pod_total{le="1"} 1 1000000050000
histogram:15s_without_pod_total{le="10"} 1 1000000050000
histogram:15s_without_pod_total{le="5"} 1 1000000050000
`)
now = 1000000081
flush(
`histogram:15s_without_pod_total{le="1"} 1 1000000065000
histogram:15s_without_pod_total{le="10"} 4 1000000065000
histogram:15s_without_pod_total{le="5"} 4 1000000065000
`)
}

func TestAggregatorsEqual(t *testing.T) {
f := func(a, b string, expectedResult bool) {
t.Helper()
Expand Down Expand Up @@ -193,7 +285,7 @@ func TestAggregatorsEqual(t *testing.T) {
f(`
- outputs: [total]
interval: 5m
flush_on_shutdown: true
flush_on_shutdown: true
`, `
- outputs: [total]
interval: 5m
Expand Down Expand Up @@ -237,7 +329,7 @@ func TestAggregatorsSuccess(t *testing.T) {
}

// Verify the tssOutput contains the expected metrics
outputMetrics := timeSeriessToString(tssOutput)
outputMetrics := timeSeriessToString(tssOutput, false)
if outputMetrics != outputMetricsExpected {
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
}
Expand Down Expand Up @@ -909,7 +1001,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
// Verify the tssOutput contains the expected metrics
tsStrings := make([]string, len(tssOutput))
for i, ts := range tssOutput {
tsStrings[i] = timeSeriesToString(ts)
tsStrings[i] = timeSeriesToString(ts, false)
}
sort.Strings(tsStrings)
outputMetrics := strings.Join(tsStrings, "")
Expand Down Expand Up @@ -947,20 +1039,23 @@ foo:1m_sum_samples{baz="qwe"} 10
`, "11111111")
}

func timeSeriessToString(tss []prompbmarshal.TimeSeries) string {
func timeSeriessToString(tss []prompbmarshal.TimeSeries, withTimestamp bool) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
a[i] = timeSeriesToString(ts, withTimestamp)
}
sort.Strings(a)
return strings.Join(a, "")
}

func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
func timeSeriesToString(ts prompbmarshal.TimeSeries, withTimestamp bool) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
if len(ts.Samples) != 1 {
panic(fmt.Errorf("unexpected number of samples for %s: %d; want 1", labelsString, len(ts.Samples)))
}
if withTimestamp {
return fmt.Sprintf("%s %v %d\n", labelsString, ts.Samples[0].Value, ts.Samples[0].Timestamp)
}
return fmt.Sprintf("%s %v\n", labelsString, ts.Samples[0].Value)
}

Expand Down
Loading

0 comments on commit 9537a52

Please sign in to comment.