Skip to content

Commit

Permalink
Receive: Allow specifying tenant-specific external labels in RouterIn…
Browse files Browse the repository at this point in the history
…gestor (thanos-io#5777)

Signed-off-by: haanhvu <haanh6594@gmail.com>
  • Loading branch information
haanhvu authored and pedro-stanaka committed Jun 27, 2023
1 parent 3d339b7 commit e1fb4ee
Show file tree
Hide file tree
Showing 11 changed files with 1,067 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6167](https://github.com/thanos-io/thanos/pull/6195) Receive: add flag `tsdb.too-far-in-future.time-window` to prevent clock skewed samples to pollute TSDB head and block all valid incoming samples.
- [#6273](https://github.com/thanos-io/thanos/pull/6273) Mixin: Allow specifying an instance name filter in dashboards
- [#6163](https://github.com/thanos-io/thanos/pull/6163) Receiver: Add hidden flag `--receive-forward-max-backoff` to configure the max backoff for forwarding requests.
- [#5777](https://github.com/thanos-io/thanos/pull/5777) Receive: Allow specifying tenant-specific external labels in Router Ingestor.

### Fixed

Expand Down
42 changes: 27 additions & 15 deletions cmd/thanos/receive.go
Expand Up @@ -281,7 +281,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up hashring")
{
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil {
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, dbs); err != nil {
return err
}
}
Expand Down Expand Up @@ -454,12 +454,13 @@ func setupHashring(g *run.Group,
webHandler *receive.Handler,
statusProber prober.Probe,
enableIngestion bool,
dbs *receive.MultiTSDB,
) error {
// Note: the hashring configuration watcher
// is the sender and thus closes the chan.
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
updates := make(chan []receive.HashringConfig, 1)
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)

// The Hashrings config file path is given initializing config watcher.
Expand All @@ -478,33 +479,28 @@ func setupHashring(g *run.Group,

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw)
return receive.ConfigFromWatcher(ctx, updates, cw)
}, func(error) {
cancel()
})
} else {
var (
ring receive.Hashring
err error
cf []receive.HashringConfig
err error
)
// The Hashrings config file content given initialize configuration from content.
if len(conf.hashringsFileContent) > 0 {
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent)
cf, err = receive.ParseConfig([]byte(conf.hashringsFileContent))
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
return errors.Wrap(err, "failed to validate hashring configuration content")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
level.Info(logger).Log("msg", "the hashring file is not specified use single node hashring.")
ring = receive.SingleNodeHashring(conf.endpoint)
}

cancel := make(chan struct{})
g.Add(func() error {
defer close(updates)
updates <- ring
updates <- cf
<-cancel
return nil
}, func(error) {
Expand All @@ -521,11 +517,27 @@ func setupHashring(g *run.Group,

for {
select {
case h, ok := <-updates:
case c, ok := <-updates:
if !ok {
return nil
}
webHandler.Hashring(h)

if c == nil {
webHandler.Hashring(receive.SingleNodeHashring(conf.endpoint))
level.Info(logger).Log("msg", "Empty hashring config. Set up single node hashring.")
} else {
h, err := receive.NewMultiHashring(algorithm, conf.replicationFactor, c)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
webHandler.Hashring(h)
level.Info(logger).Log("msg", "Set up hashring for the given hashring config.")
}

if err := dbs.SetHashringConfig(c); err != nil {
return errors.Wrap(err, "failed to set hashring config in MultiTSDB")
}

// If ingestion is enabled, send a signal to TSDB to flush.
if enableIngestion {
hashringChangedChan <- struct{}{}
Expand Down
24 changes: 21 additions & 3 deletions pkg/exemplars/tsdb.go
Expand Up @@ -4,6 +4,8 @@
package exemplars

import (
"sync"

"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -16,8 +18,10 @@ import (

// TSDB allows fetching exemplars from a TSDB instance.
type TSDB struct {
db storage.ExemplarQueryable
db storage.ExemplarQueryable

extLabels labels.Labels
mtx sync.RWMutex
}

// NewTSDB creates new exemplars.TSDB.
Expand All @@ -28,9 +32,23 @@ func NewTSDB(db storage.ExemplarQueryable, extLabels labels.Labels) *TSDB {
}
}

func (t *TSDB) SetExtLabels(extLabels labels.Labels) {
t.mtx.Lock()
defer t.mtx.Unlock()

t.extLabels = extLabels
}

func (t *TSDB) getExtLabels() labels.Labels {
t.mtx.RLock()
defer t.mtx.RUnlock()

return t.extLabels
}

// Exemplars returns all specified exemplars from a TSDB instance.
func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemplarspb.Exemplars_ExemplarsServer) error {
match, selectors := selectorsMatchesExternalLabels(matchers, t.extLabels)
match, selectors := selectorsMatchesExternalLabels(matchers, t.getExtLabels())

if !match {
return nil
Expand All @@ -53,7 +71,7 @@ func (t *TSDB) Exemplars(matchers [][]*labels.Matcher, start, end int64, s exemp
for _, e := range exemplars {
exd := exemplarspb.ExemplarData{
SeriesLabels: labelpb.ZLabelSet{
Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.extLabels)),
Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(e.SeriesLabels, t.getExtLabels())),
},
Exemplars: exemplarspb.ExemplarsFromPromExemplars(e.Exemplars),
}
Expand Down
42 changes: 30 additions & 12 deletions pkg/receive/config.go
Expand Up @@ -40,10 +40,11 @@ const (
// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
Hashring string `json:"hashring,omitempty"`
Tenants []string `json:"tenants,omitempty"`
Endpoints []string `json:"endpoints"`
Algorithm HashringAlgorithm `json:"algorithm,omitempty"`
ExternalLabels map[string]string `json:"external_labels,omitempty"`
}

// ConfigWatcher is able to watch a file containing a hashring configuration
Expand Down Expand Up @@ -255,14 +256,38 @@ func (cw *ConfigWatcher) refresh(ctx context.Context) {
}
}

func ConfigFromWatcher(ctx context.Context, updates chan<- []HashringConfig, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

for {
select {
case cfg, ok := <-cw.C():
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
updates <- cfg
case <-ctx.Done():
return ctx.Err()
}
}
}

// ParseConfig parses the raw configuration content and returns a HashringConfig.
func ParseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
}

// loadConfig loads raw configuration content and returns a configuration.
func loadConfig(logger log.Logger, path string) ([]HashringConfig, float64, error) {
cfgContent, err := readFile(logger, path)
if err != nil {
return nil, 0, errors.Wrap(err, "failed to read configuration file")
}

config, err := parseConfig(cfgContent)
config, err := ParseConfig(cfgContent)
if err != nil {
return nil, 0, errors.Wrapf(errParseConfigurationFile, "failed to parse configuration file: %v", err)
}
Expand Down Expand Up @@ -290,13 +315,6 @@ func readFile(logger log.Logger, path string) ([]byte, error) {
return io.ReadAll(fd)
}

// parseConfig parses the raw configuration content and returns a HashringConfig.
func parseConfig(content []byte) ([]HashringConfig, error) {
var config []HashringConfig
err := json.Unmarshal(content, &config)
return config, err
}

// hashAsMetricValue generates metric value from hash of data.
func hashAsMetricValue(data []byte) float64 {
sum := md5.Sum(data)
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/handler_test.go
Expand Up @@ -205,7 +205,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin
hashringAlgo = AlgorithmHashmod
}

hashring, err := newMultiHashring(hashringAlgo, replicationFactor, cfg)
hashring, err := NewMultiHashring(hashringAlgo, replicationFactor, cfg)
if err != nil {
return nil, nil, err
}
Expand Down
46 changes: 1 addition & 45 deletions pkg/receive/hashring.go
Expand Up @@ -4,7 +4,6 @@
package receive

import (
"context"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -239,7 +238,7 @@ func (m *multiHashring) GetN(tenant string, ts *prompb.TimeSeries, n uint64) (st
// groups.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg []HashringConfig) (Hashring, error) {
m := &multiHashring{
cache: make(map[string]Hashring),
}
Expand Down Expand Up @@ -268,49 +267,6 @@ func newMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
return m, nil
}

// HashringFromConfigWatcher creates multi-tenant hashrings from a
// hashring configuration file watcher.
// The configuration file is watched for updates.
// Hashrings are returned on the updates channel.
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
// The updates chan is closed before exiting.
func HashringFromConfigWatcher(ctx context.Context, algorithm HashringAlgorithm, replicationFactor uint64, updates chan<- Hashring, cw *ConfigWatcher) error {
defer close(updates)
go cw.Run(ctx)

for {
select {
case cfg, ok := <-cw.C():
if !ok {
return errors.New("hashring config watcher stopped unexpectedly")
}
h, err := newMultiHashring(algorithm, replicationFactor, cfg)
if err != nil {
return errors.Wrap(err, "unable to create new hashring from config")
}
updates <- h
case <-ctx.Done():
return ctx.Err()
}
}
}

// HashringFromConfig loads raw configuration content and returns a Hashring if the given configuration is not valid.
func HashringFromConfig(algorithm HashringAlgorithm, replicationFactor uint64, content string) (Hashring, error) {
config, err := parseConfig([]byte(content))
if err != nil {
return nil, errors.Wrapf(err, "failed to parse configuration")
}

// If hashring is empty, return an error.
if len(config) == 0 {
return nil, errors.Wrapf(err, "failed to load configuration")
}

return newMultiHashring(algorithm, replicationFactor, config)
}

func newHashring(algorithm HashringAlgorithm, endpoints []string, replicationFactor uint64, hashring string, tenants []string) (Hashring, error) {
switch algorithm {
case AlgorithmHashmod:
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/hashring_test.go
Expand Up @@ -136,7 +136,7 @@ func TestHashringGet(t *testing.T) {
},
},
} {
hs, err := newMultiHashring(AlgorithmHashmod, 3, tc.cfg)
hs, err := NewMultiHashring(AlgorithmHashmod, 3, tc.cfg)
require.NoError(t, err)

h, err := hs.Get(tc.tenant, ts)
Expand Down

0 comments on commit e1fb4ee

Please sign in to comment.