Skip to content

Commit

Permalink
Store: store responses should always be sorted (thanos-io#6706)
Browse files Browse the repository at this point in the history
* Store: always sort, just compare labelset in proxy heap

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

* Store: add escape hatch to skip store resorting

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

* Store: remove stringset

This is the wrong approach to detect if we need to resort. It cannot
detect if we might end up with an unsorted series set if we add
extLabels.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

* Docs: drop paragraph about deduplication on inner labels

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

---------

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
Co-authored-by: Michael Hoffmann <michael.hoffmann@aiven.io>
  • Loading branch information
2 people authored and coleenquadros committed Sep 18, 2023
1 parent aa094e2 commit 90fa395
Show file tree
Hide file tree
Showing 20 changed files with 117 additions and 500 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b).
- [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication
- [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted

### Added

Expand Down
15 changes: 0 additions & 15 deletions cmd/thanos/receive.go
Expand Up @@ -365,21 +365,6 @@ func runReceive(
grpcserver.WithTLSConfig(tlsCfg),
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

dbs.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

g.Add(
func() error {
level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress)
Expand Down
15 changes: 0 additions & 15 deletions cmd/thanos/rule.go
Expand Up @@ -654,21 +654,6 @@ func runRule(
)
storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger)))

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

tsdbStore.UpdateLabelNames(ctx)

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}

options = append(options, grpcserver.WithServer(
Expand Down
47 changes: 3 additions & 44 deletions cmd/thanos/sidecar.go
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/stringset"
"github.com/thanos-io/thanos/pkg/targets"
"github.com/thanos-io/thanos/pkg/tls"
)
Expand Down Expand Up @@ -114,9 +113,8 @@ func runSidecar(
mint: conf.limitMinTime.PrometheusTimestamp(),
maxt: math.MaxInt64,

limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
labelNamesSet: stringset.AllStrings(),
limitMinTime: conf.limitMinTime,
client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"),
}

confContentYaml, err := conf.objStore.Content()
Expand Down Expand Up @@ -249,19 +247,6 @@ func runSidecar(
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})
}
{
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -275,7 +260,7 @@ func runSidecar(
{
c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent)

promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version)
promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -477,8 +462,6 @@ type promMetadata struct {
limitMinTime thanosmodel.TimeOrDurationValue

client *promclient.Client

labelNamesSet stringset.Set
}

func (s *promMetadata) UpdateLabels(ctx context.Context) error {
Expand Down Expand Up @@ -506,30 +489,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labelNamesSet = stringset.AllStrings()
return
}

filter := stringset.NewFromStrings(labelNames...)
s.mtx.Lock()
s.labelNamesSet = filter
s.mtx.Unlock()
}

func (s *promMetadata) LabelNamesSet() stringset.Set {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.labelNamesSet
}

func (s *promMetadata) Labels() labels.Labels {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
17 changes: 0 additions & 17 deletions cmd/thanos/store.go
Expand Up @@ -498,23 +498,6 @@ func runStore(
})
}

{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names update")

bs.UpdateLabelNames()

level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
cancel()
})

}
// Add bucket UI for loaded blocks.
{
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
Expand Down
10 changes: 0 additions & 10 deletions docs/components/query.md
Expand Up @@ -103,16 +103,6 @@ thanos query \

This logic can also be controlled via parameter on QueryAPI. More details below.

### Deduplication on non-external labels

In `v0.31.0` we have implemented an [optimization](../proposals-accepted/20221129-avoid-global-sort.md) which broke deduplication on non-external labels. We think that it was just a coincidence that deduplication worked at all on non-external labels in previous versions.

External labels always override any labels a series might have and this makes it so that it is possible to remove replica labels on series returned by a StoreAPI as an optimization. If deduplication happens on internal labels then that might lead to unsorted series from a StoreAPI and that breaks deduplication.

To fix this use-case, in 0.32.0 we've implemented a cuckoo filter on label names that is updated every 10 seconds. Using it we can detect whether deduplication was requested on internal labels. If that is the case then the series set is resorted before being sent off to the querier. It is strongly recommended to set replica labels which are external labels because otherwise the optimization cannot be applied and your queries will be slower by 20-30%.

In the future we have plans to expose this cuckoo filter through the InfoAPI. This will allow better scoping queries to StoreAPIs.

## Experimental PromQL Engine

By default, Thanos querier comes with standard Prometheus PromQL engine. However, when `--query.promql-engine=thanos` is specified, Thanos will use [experimental Thanos PromQL engine](http://github.com/thanos-community/promql-engine) which is a drop-in, efficient implementation of PromQL engine with query planner and optimizers.
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Expand Up @@ -118,14 +118,12 @@ require (

require (
github.com/onsi/gomega v1.27.10
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb
go.opentelemetry.io/contrib/propagators/autoprop v0.38.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
)

require (
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Expand Up @@ -223,8 +223,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E=
github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI=
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=
Expand Down Expand Up @@ -851,8 +849,6 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U=
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw=
Expand Down
13 changes: 0 additions & 13 deletions pkg/receive/multitsdb.go
Expand Up @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab
return initialLset, nil
}

func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) {
t.mtx.RLock()
defer t.mtx.RUnlock()

for _, tenant := range t.tenants {
db := tenant.storeTSDB
if db == nil {
continue
}
db.UpdateLabelNames(ctx)
}
}

// extendLabels extends external labels of the initial label set.
// If an external label shares same name with a label in the initial label set,
// use the label in the initial label set and inform user about it.
Expand Down

0 comments on commit 90fa395

Please sign in to comment.