Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

app/vmagent: removed deprecated -remoteWrite.multitenantURL flag support #6253

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions app/vmagent/remotewrite/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
}
rcs.global = global
}
if len(*relabelConfigPaths) > (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url or -remoteWrite.multitenantURL args: %d",
len(*relabelConfigPaths), (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*relabelConfigPaths), (len(*remoteWriteURLs)))
}
rcs.perURL = make([]*promrelabel.ParsedConfigs, (len(*remoteWriteURLs) + len(*remoteWriteMultitenantURLs)))
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
for i, path := range *relabelConfigPaths {
if len(path) == 0 {
// Skip empty relabel config.
Expand Down
101 changes: 20 additions & 81 deletions app/vmagent/remotewrite/remotewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
)
Expand All @@ -39,10 +38,6 @@ var (
"or Prometheus remote_write protocol. Example url: http://<victoriametrics-host>:8428/api/v1/write . "+
"Pass multiple -remoteWrite.url options in order to replicate the collected data to multiple remote storage systems. "+
"The data can be sharded among the configured remote storage systems if -remoteWrite.shardByURL flag is set")
remoteWriteMultitenantURLs = flagutil.NewArrayString("remoteWrite.multitenantURL", "Base path for multitenant remote storage URL to write data to. "+
"See https://docs.victoriametrics.com/vmagent/#multitenancy for details. Example url: http://<vminsert>:8480 . "+
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. "+
"This flag is deprecated in favor of -enableMultitenantHandlers . See https://docs.victoriametrics.com/vmagent/#multitenancy")
enableMultitenantHandlers = flag.Bool("enableMultitenantHandlers", false, "Whether to process incoming data via multitenant insert handlers according to "+
"https://docs.victoriametrics.com/cluster-victoriametrics/#url-format . By default incoming data is processed via single-node insert handlers "+
"according to https://docs.victoriametrics.com/#how-to-import-time-series-data ."+
Expand Down Expand Up @@ -118,14 +113,10 @@ var (
)

var (
// rwctxsDefault contains statically populated entries when -remoteWrite.url is specified.
rwctxsDefault []*remoteWriteCtx
// rwctxs contains statically populated entries when -remoteWrite.url is specified.
rwctxs []*remoteWriteCtx

// rwctxsMap contains dynamically populated entries when -remoteWrite.multitenantURL is specified.
rwctxsMap = make(map[tenantmetrics.TenantID][]*remoteWriteCtx)
rwctxsMapLock sync.Mutex

// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
// Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified.
defaultAuthToken = &auth.Token{}

// ErrQueueFullHTTPRetry must be returned when TryPush() returns false.
Expand All @@ -140,9 +131,9 @@ var (
disableOnDiskQueueAll bool
)

// MultitenancyEnabled returns true if -enableMultitenantHandlers or -remoteWrite.multitenantURL is specified.
// MultitenancyEnabled returns true if -enableMultitenantHandlers is specified.
func MultitenancyEnabled() bool {
return *enableMultitenantHandlers || len(*remoteWriteMultitenantURLs) > 0
return *enableMultitenantHandlers
}

// Contains the current relabelConfigs.
Expand Down Expand Up @@ -173,11 +164,8 @@ var (
//
// Stop must be called for graceful shutdown.
func Init() {
if len(*remoteWriteURLs) == 0 && len(*remoteWriteMultitenantURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` or `-remoteWrite.multitenantURL` command-line flag must be set")
}
if len(*remoteWriteURLs) > 0 && len(*remoteWriteMultitenantURLs) > 0 {
logger.Fatalf("cannot set both `-remoteWrite.url` and `-remoteWrite.multitenantURL` command-line flags")
if len(*remoteWriteURLs) == 0 {
logger.Fatalf("at least one `-remoteWrite.url` command-line flag must be set")
}
if *maxHourlySeries > 0 {
hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour)
Expand Down Expand Up @@ -228,7 +216,7 @@ func Init() {
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())

if len(*remoteWriteURLs) > 0 {
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}

disableOnDiskQueueAll = true
Expand Down Expand Up @@ -261,21 +249,15 @@ func dropDanglingQueues() {
if *keepDanglingQueues {
return
}
if len(*remoteWriteMultitenantURLs) > 0 {
// Do not drop dangling queues for *remoteWriteMultitenantURLs, since it is impossible to determine
// unused queues for multitenant urls - they are created on demand when new sample for the given
// tenant is pushed to remote storage.
return
}
// Remove dangling persistent queues, if any.
// This is required for the case when the number of queues has been changed or URL have been changed.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014
//
// In case if there were many persistent queues with identical *remoteWriteURLs
// the queue with the last index will be dropped.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140
existingQueues := make(map[string]struct{}, len(rwctxsDefault))
for _, rwctx := range rwctxsDefault {
existingQueues := make(map[string]struct{}, len(rwctxs))
for _, rwctx := range rwctxs {
existingQueues[rwctx.fq.Dirname()] = struct{}{}
}

Expand All @@ -292,7 +274,7 @@ func dropDanglingQueues() {
}
}
if removed > 0 {
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxsDefault))
logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs))
}
}

Expand Down Expand Up @@ -320,18 +302,6 @@ var (
)

func reloadStreamAggrConfigs() {
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for _, rwctxs := range rwctxsMap {
reinitStreamAggr(rwctxs)
}
rwctxsMapLock.Unlock()
} else {
reinitStreamAggr(rwctxsDefault)
}
}

func reinitStreamAggr(rwctxs []*remoteWriteCtx) {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
Expand Down Expand Up @@ -411,18 +381,10 @@ func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()

for _, rwctx := range rwctxsDefault {
for _, rwctx := range rwctxs {
rwctx.MustStop()
}
rwctxsDefault = nil

// There is no need in locking rwctxsMapLock here, since nobody should call TryPush during the Stop call.
for _, rwctxs := range rwctxsMap {
for _, rwctx := range rwctxs {
rwctx.MustStop()
}
}
rwctxsMap = nil
rwctxs = nil

if sl := hourlySeriesLimiter; sl != nil {
sl.MustStop()
Expand All @@ -432,20 +394,14 @@ func Stop() {
}
}

// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
//
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
//
// PushDropSamplesOnFailure can modify wr contents.
func PushDropSamplesOnFailure(at *auth.Token, wr *prompbmarshal.WriteRequest) {
_ = tryPush(at, wr, true)
}

// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url and -remoteWrite.multitenantURL
//
// If at is nil, then the data is pushed to the configured -remoteWrite.url.
// If at isn't nil, the data is pushed to the configured -remoteWrite.multitenantURL.
// TryPush tries sending wr to the configured remote storage systems set via -remoteWrite.url
//
// TryPush can modify wr contents, so the caller must re-initialize wr before calling TryPush() after unsuccessful attempt.
// TryPush may send partial data from wr on unsuccessful attempt, so repeated call for the same wr may send the data multiple times.
Expand All @@ -464,28 +420,11 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}

var tenantRctx *relabelCtx
var rwctxs []*remoteWriteCtx
if at == nil {
rwctxs = rwctxsDefault
} else if len(*remoteWriteMultitenantURLs) == 0 {
if at != nil {
// Convert at to (vm_account_id, vm_project_id) labels.
tenantRctx = getRelabelCtx()
defer putRelabelCtx(tenantRctx)
rwctxs = rwctxsDefault
} else {
rwctxsMapLock.Lock()
tenantID := tenantmetrics.TenantID{
AccountID: at.AccountID,
ProjectID: at.ProjectID,
}
rwctxs = rwctxsMap[tenantID]
if rwctxs == nil {
rwctxs = newRemoteWriteCtxs(at, *remoteWriteMultitenantURLs)
rwctxsMap[tenantID] = rwctxs
}
rwctxsMapLock.Unlock()
}

rowsCount := getRowsCount(tss)

// Quick check whether writes to configured remote storage systems are blocked.
Expand Down Expand Up @@ -552,14 +491,14 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if !tryPushBlockToRemoteStorages(rwctxs, tssBlock, forceDropSamplesOnFailure) {
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
return false
}
}
return true
}

func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
if len(tssBlock) == 0 {
// Nothing to push
return true
Expand All @@ -578,7 +517,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
if replicas <= 0 {
replicas = 1
}
return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas, forceDropSamplesOnFailure)
return tryShardingBlockAmongRemoteStorages(tssBlock, replicas, forceDropSamplesOnFailure)
}

// Replicate tssBlock samples among rwctxs.
Expand All @@ -599,7 +538,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
return !anyPushFailed.Load()
}

func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
func tryShardingBlockAmongRemoteStorages(tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool {
x := getTSSShards(len(rwctxs))
defer putTSSShards(x)

Expand Down
5 changes: 5 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).

## tip

**Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).**


* SECURITY: upgrade Go builder from Go1.22.2 to Go1.22.3. See [the list of issues addressed in Go1.22.3](https://github.com/golang/go/issues?q=milestone%3AGo1.22.3+label%3ACherryPickApproved).

* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): support selecting of multiple instances on the dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5869) for details.
Expand All @@ -46,6 +49,8 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `<output>_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix.
* BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830).

* DEPRECATION: [vmagent](https://docs.victoriametrics.com/vmagent/): removed deprecated `-remoteWrite.multitenantURL` flag from vmagent. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).

## [v1.101.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.101.0)

Released at 2024-04-26
Expand Down
4 changes: 0 additions & 4 deletions docs/vmagent.md
Original file line number Diff line number Diff line change
Expand Up @@ -2099,10 +2099,6 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
The maximum number of unique series vmagent can send to remote storage systems during the last hour. Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter
-remoteWrite.maxRowsPerBlock int
The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize (default 10000)
-remoteWrite.multitenantURL array
Base path for multitenant remote storage URL to write data to. See https://docs.victoriametrics.com/vmagent/#multitenancy for details. Example url: http://<vminsert>:8480 . Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. This flag is deprecated in favor of -enableMultitenantHandlers . See https://docs.victoriametrics.com/vmagent/#multitenancy
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.oauth2.clientID array
Optional OAuth2 clientID to use for the corresponding -remoteWrite.url
Supports an array of values separated by comma or specified via multiple flags.
Expand Down
Loading