Skip to content

Commit

Permalink
lib/streamaggr: follow-up for 7361971
Browse files Browse the repository at this point in the history
- Use a byte slice instead of a map for tracking indexes for matching series.
  This improves performance, since access by slice index is faster than access by map key.
- Re-use the byte slice for tracking indexes for matching series.
  This removes unnecessary memory allocations and improves stream aggregation performance a bit.
- Add an ability to return to the previous behvaiour by specifying -remoteWrite.streamAggr.dropInput command-line flag.
  In this case all the input samples are dropped when stream aggregation is enabled.
- Backport the new stream aggregation behaviour from vmagent to single-node VictoriaMetrics when -streamAggr.config
  option is set.
- Improve docs regarding this change at docs/CHANGELOG.md
- Document the new behavior at docs/stream-aggregation.md

Updates #4243
Updates #4575
  • Loading branch information
valyala committed Jul 25, 2023
1 parent 470afac commit c049778
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 211 deletions.
7 changes: 5 additions & 2 deletions app/vmagent/README.md
Expand Up @@ -1548,13 +1548,16 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.config array
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval
Supports an array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dedupInterval array
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.dropInput array
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.streamAggr.keepInput array
Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
Supports array of values separated by comma or specified via multiple flags.
-remoteWrite.tlsCAFile array
Optional path to TLS CA file to use for verifying connections to the corresponding -remoteWrite.url. By default, system CA is used
Expand Down
75 changes: 42 additions & 33 deletions app/vmagent/remotewrite/remotewrite.go
Expand Up @@ -67,10 +67,13 @@ var (

streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
"By default, the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
"See https://docs.victoriametrics.com/stream-aggregation.html")
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
)
Expand Down Expand Up @@ -507,6 +510,7 @@ type remoteWriteCtx struct {

sas atomic.Pointer[streamaggr.Aggregators]
streamAggrKeepInput bool
streamAggrDropInput bool

pss []*pendingSeries
pssNextIdx uint64
Expand Down Expand Up @@ -579,6 +583,7 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
}
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
Expand Down Expand Up @@ -630,41 +635,45 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rowsCount := getRowsCount(tss)
rwctx.rowsPushedAfterRelabel.Add(rowsCount)

defer func() {
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
// Apply stream aggregation if any
sas := rwctx.sas.Load()
if sas != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tss, matchIdxs.B)
if !rwctx.streamAggrKeepInput {
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompbmarshal.TimeSeries)
tss = append(*v, tss...)
}
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
}
matchIdxsPool.Put(matchIdxs)
}
rwctx.pushInternal(tss)

// Load stream aggregagation config
sas := rwctx.sas.Load()

// Fast path, no need to track used series
if sas == nil || rwctx.streamAggrKeepInput {
// Apply stream aggregation to the input samples
// it's safe to call sas.Push with sas == nil
sas.Push(tss, nil)

// Push all samples to the remote storage
rwctx.pushInternal(tss)

return
// Return back relabeling contexts to the pool
if rctx != nil {
*v = prompbmarshal.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}
}

// Track series which were used for stream aggregation.
ut := streamaggr.NewTssUsageTracker(len(tss))
sas.Push(tss, ut.Matched)

unmatchedSeries := tssPool.Get().(*[]prompbmarshal.TimeSeries)
// Push only unmatched series to the remote storage
*unmatchedSeries = ut.GetUnmatched(tss, *unmatchedSeries)
rwctx.pushInternal(*unmatchedSeries)
var matchIdxsPool bytesutil.ByteBufferPool

*unmatchedSeries = prompbmarshal.ResetTimeSeries(*unmatchedSeries)
tssPool.Put(unmatchedSeries)
func dropAggregatedSeries(src []prompbmarshal.TimeSeries, matchIdxs []byte, dropInput bool) []prompbmarshal.TimeSeries {
dst := src[:0]
for i, match := range matchIdxs {
if match == 0 {
continue
}
dst = append(dst, src[i])
}
tail := src[len(dst):]
_ = prompbmarshal.ResetTimeSeries(tail)
return dst
}

func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
Expand Down
20 changes: 15 additions & 5 deletions docs/CHANGELOG.md
Expand Up @@ -24,9 +24,18 @@ The following `tip` changes can be tested by building VictoriaMetrics components

## tip

**Update notes:** release contains breaking change to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) `-remoteWrite.streamAggr.keepInput` command-line flag.
Default behaviour has changed to keep metrics which were not matched by any aggregation rule when `-remoteWrite.streamAggr.keepInput` is set to false (default value).
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243) for details.
**Update note: starting from this release, [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) writes
to the configured storage the following samples by default:

- aggregated samples;
- the original input samples, which match zero `match` options from the provided [config](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config).

Previously only aggregated samples were written to the storage by default.
The previous behavior can be restored in the following ways:

- by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics;
- by passing `-remoteWrite.streamAggr.dropInput` command-line flag per each configured `-remoteWrite.streamAggr.config` at `vmagent`.
**

* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
Expand All @@ -40,6 +49,9 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243)
- `WITH (w = 5m) m[w]` is automatically transformed to `m[5m]`
- `WITH (f(window, step, off) = m[window:step] offset off) f(5m, 10s, 1h)` is automatically transformed to `m[5m:10s] offset 1h`
Thanks to @lujiajing1126 for the initial idea and [implementation](https://github.com/VictoriaMetrics/metricsql/pull/13). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4025).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page with the list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598) and [these docs](https://docs.victoriametrics.com/#active-queries).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): preserve input samples, which match zero `match` options from the [configured aggregations](https://docs.victoriametrics.com/stream-aggregation.html#stream-aggregation-config). Previously all the input samples were dropped by default, so only the aggregated samples are written to the output storage. The previous behavior can be restored by passing `-streamAggr.dropInput` command-line flag to single-node VictoriaMetrics or by passing `-remoteWrite.streamAggr.dropInput` command-line flag to `vmagent`.
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add verbose output for docker installations or when TTY isn't available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4081).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): interrupt backoff retries when import process is cancelled. The change makes vmctl more responsive in case of errors during the import. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4442).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): update backoff policy on retries to reduce probability of overloading for `source` or `destination` databases. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4402).
Expand All @@ -56,13 +68,11 @@ See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4243)
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth.html): expose `vmauth_user_request_duration_seconds` and `vmauth_unauthorized_user_request_duration_seconds` summary metrics for measuring requests latency per user.
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): show backup progress percentage in log during backup uploading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
* FEATURE: [vmrestore](https://docs.victoriametrics.com/vmrestore.html): show restoring progress percentage in log during backup downloading. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4460).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring staleness interval in [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) config. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4667) for details.
* FEATURE: add ability to fine-tune Graphite API limits via the following command-line flags:
`-search.maxGraphiteTagKeys` for limiting the number of tag keys returned from [Graphite API for tags](https://docs.victoriametrics.com/#graphite-tags-api-usage)
`-search.maxGraphiteTagValues` for limiting the number of tag values returned from [Graphite API for tag values](https://docs.victoriametrics.com/#graphite-tags-api-usage)
`-search.maxGraphiteSeries` for limiting the number of series (aka paths) returned from [Graphite API for series](https://docs.victoriametrics.com/#graphite-tags-api-usage)
See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4339).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): added a new page to display a list of currently running queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4598).

* BUGFIX: properly return series from [/api/v1/series](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#prometheus-querying-api-usage) if it finds more than the `limit` series (`limit` is an optional query arg passed to this API). Previously the `limit exceeded error` error was returned in this case. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2841#issuecomment-1560055631).
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix application routing issues and problems with manual URL changes. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4408) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4604).
Expand Down
6 changes: 4 additions & 2 deletions docs/README.md
Expand Up @@ -2642,11 +2642,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput
Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string
Expand Down
6 changes: 4 additions & 2 deletions docs/Single-server-VictoriaMetrics.md
Expand Up @@ -2650,11 +2650,13 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-storageDataPath string
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html
-streamAggr.keepInput
Whether to keep input samples after the aggregation with -streamAggr.config. By default, the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregated samples are dropped, while the remaining samples are stored in the database. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html
-tls
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
-tlsCertFile string
Expand Down

0 comments on commit c049778

Please sign in to comment.