diff --git a/CHANGELOG.md b/CHANGELOG.md index 2361ec8567..5963c991fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6692](https://github.com/thanos-io/thanos/pull/6692) Store: Fix matching bug when using empty alternative in regex matcher, for example (a||b). - [#6679](https://github.com/thanos-io/thanos/pull/6697) Store: fix block deduplication +- [#6706](https://github.com/thanos-io/thanos/pull/6706) Store: Series responses should always be sorted ### Added diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 6b9b00b953..ce1e24e734 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -365,21 +365,6 @@ func runReceive( grpcserver.WithTLSConfig(tlsCfg), ) - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - dbs.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - g.Add( func() error { level.Info(logger).Log("msg", "listening for StoreAPI and WritableStoreAPI gRPC", "address", conf.grpcConfig.bindAddress) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 9c3761a517..56e418b154 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -654,21 +654,6 @@ func runRule( ) storeServer := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, tsdbStore), reg, conf.storeRateLimits) options = append(options, grpcserver.WithServer(store.RegisterStoreServer(storeServer, logger))) - - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - tsdbStore.UpdateLabelNames(ctx) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } options = append(options, grpcserver.WithServer( diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 0386f13217..12fd3e01dc 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -48,7 +48,6 @@ import ( "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/targets" "github.com/thanos-io/thanos/pkg/tls" ) @@ -114,9 +113,8 @@ func runSidecar( mint: conf.limitMinTime.PrometheusTimestamp(), maxt: math.MaxInt64, - limitMinTime: conf.limitMinTime, - client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), - labelNamesSet: stringset.AllStrings(), + limitMinTime: conf.limitMinTime, + client: promclient.NewWithTracingClient(logger, httpClient, "thanos-sidecar"), } confContentYaml, err := conf.objStore.Content() @@ -249,19 +247,6 @@ func runSidecar( }, func(error) { cancel() }) - - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - m.UpdateLabelNames(context.Background()) - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) } { ctx, cancel := context.WithCancel(context.Background()) @@ -275,7 +260,7 @@ func runSidecar( { c := promclient.NewWithTracingClient(logger, httpClient, httpconfig.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.LabelNamesSet, m.Version) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps, m.Version) if err != nil { return errors.Wrap(err, "create Prometheus store") } @@ -477,8 +462,6 @@ type promMetadata struct { limitMinTime thanosmodel.TimeOrDurationValue client *promclient.Client - - labelNamesSet stringset.Set } func (s *promMetadata) UpdateLabels(ctx context.Context) error { @@ -506,30 +489,6 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { s.maxt = maxt } -func (s *promMetadata) UpdateLabelNames(ctx context.Context) { - mint, _ := s.Timestamps() - labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli()) - if err != nil { - s.mtx.Lock() - defer s.mtx.Unlock() - - s.labelNamesSet = stringset.AllStrings() - return - } - - filter := stringset.NewFromStrings(labelNames...) - s.mtx.Lock() - s.labelNamesSet = filter - s.mtx.Unlock() -} - -func (s *promMetadata) LabelNamesSet() stringset.Set { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.labelNamesSet -} - func (s *promMetadata) Labels() labels.Labels { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index fb6a8e702e..64297b5699 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -498,23 +498,6 @@ func runStore( }) } - { - ctx, cancel := context.WithCancel(context.Background()) - level.Debug(logger).Log("msg", "setting up periodic update for label names") - g.Add(func() error { - return runutil.Repeat(10*time.Second, ctx.Done(), func() error { - level.Debug(logger).Log("msg", "Starting label names update") - - bs.UpdateLabelNames() - - level.Debug(logger).Log("msg", "Finished label names update") - return nil - }) - }, func(err error) { - cancel() - }) - - } // Add bucket UI for loaded blocks. { ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) diff --git a/docs/components/query.md b/docs/components/query.md index 87733ff02d..237eb83794 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -103,16 +103,6 @@ thanos query \ This logic can also be controlled via parameter on QueryAPI. More details below. -### Deduplication on non-external labels - -In `v0.31.0` we have implemented an [optimization](../proposals-accepted/20221129-avoid-global-sort.md) which broke deduplication on non-external labels. We think that it was just a coincidence that deduplication worked at all on non-external labels in previous versions. - -External labels always override any labels a series might have and this makes it so that it is possible to remove replica labels on series returned by a StoreAPI as an optimization. If deduplication happens on internal labels then that might lead to unsorted series from a StoreAPI and that breaks deduplication. - -To fix this use-case, in 0.32.0 we've implemented a cuckoo filter on label names that is updated every 10 seconds. Using it we can detect whether deduplication was requested on internal labels. If that is the case then the series set is resorted before being sent off to the querier. It is strongly recommended to set replica labels which are external labels because otherwise the optimization cannot be applied and your queries will be slower by 20-30%. - -In the future we have plans to expose this cuckoo filter through the InfoAPI. This will allow better scoping queries to StoreAPIs. - ## Experimental PromQL Engine By default, Thanos querier comes with standard Prometheus PromQL engine. However, when `--query.promql-engine=thanos` is specified, Thanos will use [experimental Thanos PromQL engine](http://github.com/thanos-community/promql-engine) which is a drop-in, efficient implementation of PromQL engine with query planner and optimizers. diff --git a/go.mod b/go.mod index b9b2c27d22..404442782e 100644 --- a/go.mod +++ b/go.mod @@ -118,14 +118,12 @@ require ( require ( github.com/onsi/gomega v1.27.10 - github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb go.opentelemetry.io/contrib/propagators/autoprop v0.38.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 ) require ( - github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect diff --git a/go.sum b/go.sum index b18e2af8d2..2f61a3076b 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE= github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= -github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/digitalocean/godo v1.99.0 h1:gUHO7n9bDaZFWvbzOum4bXE0/09ZuYA9yA8idQHX57E= github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= @@ -851,8 +849,6 @@ github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHi github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= github.com/scaleway/scaleway-sdk-go v1.0.0-beta.20 h1:a9hSJdJcd16e0HoMsnFvaHvxB3pxSD+SC7+CISp7xY0= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIOUX+osiMraVgIrMR27uMXnRJWGm1+GL8/63U= -github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg= github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8= github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ= github.com/shirou/gopsutil/v3 v3.21.2/go.mod h1:ghfMypLDrFSWN2c9cDYFLHyynQ+QUht0cv/18ZqVczw= diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 4dadb97343..509e9f3535 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -867,19 +867,6 @@ func (t *MultiTSDB) extractTenantsLabels(tenantID string, initialLset labels.Lab return initialLset, nil } -func (t *MultiTSDB) UpdateLabelNames(ctx context.Context) { - t.mtx.RLock() - defer t.mtx.RUnlock() - - for _, tenant := range t.tenants { - db := tenant.storeTSDB - if db == nil { - continue - } - db.UpdateLabelNames(ctx) - } -} - // extendLabels extends external labels of the initial label set. // If an external label shares same name with a label in the initial label set, // use the label in the initial label set and inform user about it. diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index f6a5ef55ec..c22c27bf3c 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -28,7 +28,6 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -238,9 +237,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "n", Value: "1"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -251,7 +250,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -270,9 +269,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "missing", Value: ""}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -295,8 +294,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".+"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -306,9 +305,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: ".*"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), labels.FromStrings("n", "2.5", "region", "eu-west"), }, @@ -332,8 +331,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: ""}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -352,8 +351,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NEQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -363,9 +362,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "^1$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -376,7 +375,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -387,8 +386,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), }, }, { @@ -422,9 +421,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.*$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -435,8 +434,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^.+$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -489,8 +488,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a$"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), }, }, { @@ -501,7 +500,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^a?$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -512,8 +511,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_NRE, Name: "i", Value: "^$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -545,7 +544,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_EQ, Name: "i", Value: "a"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -557,7 +556,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "^(b|a).*$"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), }, }, { @@ -567,9 +566,9 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "n", Value: "(1|2)"}, }, expectedLabels: []labels.Labels{ + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), labels.FromStrings("n", "1", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), labels.FromStrings("n", "2", "region", "eu-west"), }, }, @@ -580,8 +579,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "a|b"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -591,8 +590,8 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset {Type: storepb.LabelMatcher_RE, Name: "i", Value: "(a|b)"}, }, expectedLabels: []labels.Labels{ - labels.FromStrings("n", "1", "i", "a", "region", "eu-west"), - labels.FromStrings("n", "1", "i", "b", "region", "eu-west"), + labels.FromStrings("i", "a", "n", "1", "region", "eu-west"), + labels.FromStrings("i", "b", "n", "1", "region", "eu-west"), }, }, { @@ -706,12 +705,15 @@ func testStoreAPIsAcceptance(t *testing.T, startStore func(t *testing.T, extLset } testutil.Ok(t, err) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) + receivedLabels := make([]labels.Labels, 0) for _, s := range srv.SeriesSet { receivedLabels = append(receivedLabels, s.PromLabels()) } - slices.SortFunc(c.expectedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) - slices.SortFunc(receivedLabels, func(a, b labels.Labels) bool { return labels.Compare(a, b) < 0 }) + testutil.Equals(t, c.expectedLabels, receivedLabels) }) } @@ -824,7 +826,6 @@ func TestPrometheusStore_Acceptance(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return extLset }, func() (int64, int64) { return timestamp.FromTime(minTime), timestamp.FromTime(maxTime) }, - func() stringset.Set { return stringset.AllStrings() }, func() string { return version }) testutil.Ok(tt, err) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 4a8eae4572..54150d178a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -59,7 +59,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -387,9 +386,6 @@ type BucketStore struct { enabledLazyExpandedPostings bool - bmtx sync.Mutex - labelNamesSet stringset.Set - blockEstimatedMaxSeriesFunc BlockEstimator blockEstimatedMaxChunkFunc BlockEstimator } @@ -543,7 +539,6 @@ func NewBucketStore( enableSeriesResponseHints: enableSeriesResponseHints, enableChunkHashCalculation: enableChunkHashCalculation, seriesBatchSize: SeriesBatchSize, - labelNamesSet: stringset.AllStrings(), } for _, option := range options { @@ -1334,7 +1329,8 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv, sortingStrategyNone) + if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) @@ -1464,44 +1460,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store return errors.Wrapf(err, "fetch postings for block %s", blk.meta.ULID) } - // If we have inner replica labels we need to resort. - s.mtx.Lock() - needsEagerRetrival := len(req.WithoutReplicaLabels) > 0 && s.labelNamesSet.HasAny(req.WithoutReplicaLabels) - s.mtx.Unlock() - - var resp respSet - if needsEagerRetrival { - labelsToRemove := make(map[string]struct{}) - for _, replicaLabel := range req.WithoutReplicaLabels { - labelsToRemove[replicaLabel] = struct{}{} - } - resp = newEagerRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - labelsToRemove, - ) - } else { - resp = newLazyRespSet( - srv.Context(), - span, - 10*time.Minute, - blk.meta.ULID.String(), - []labels.Labels{blk.extLset}, - onClose, - blockClient, - shardMatcher, - false, - s.metrics.emptyPostingCount, - ) - } + resp := newEagerRespSet( + srv.Context(), + span, + 10*time.Minute, + blk.meta.ULID.String(), + []labels.Labels{blk.extLset}, + onClose, + blockClient, + shardMatcher, + false, + s.metrics.emptyPostingCount, + nil, + ) mtx.Lock() respSets = append(respSets, resp) @@ -1814,38 +1785,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq }, nil } -func (s *BucketStore) UpdateLabelNames() { - s.mtx.RLock() - defer s.mtx.RUnlock() - - newSet := stringset.New() - for _, b := range s.blocks { - labelNames, err := b.indexHeaderReader.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "block", b.meta.ULID, "err", err.Error()) - s.updateLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range labelNames { - newSet.Insert(l) - } - } - s.updateLabelNamesSet(newSet) -} - -func (s *BucketStore) updateLabelNamesSet(newSet stringset.Set) { - s.bmtx.Lock() - s.labelNamesSet = newSet - s.bmtx.Unlock() -} - -func (b *BucketStore) LabelNamesSet() stringset.Set { - b.bmtx.Lock() - defer b.bmtx.Unlock() - - return b.labelNamesSet -} - func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) { // We filter external labels from matchers so we won't try to match series on them. var result []*labels.Matcher diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 7262dee155..ebd1ffa709 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -779,30 +779,6 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) { }) } -func TestBucketStore_LabelNamesSet_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - dir := t.TempDir() - - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) - s.cache.SwapWith(noopCache{}) - - mint, maxt := s.store.TimeRange() - testutil.Equals(t, s.minTime, mint) - testutil.Equals(t, s.maxTime, maxt) - - s.store.UpdateLabelNames() - for _, b := range s.store.blocks { - waitTimeout(t, &b.pendingReaders, 5*time.Second) - } - - filter := s.store.LabelNamesSet() - for _, n := range []string{"a", "b", "c"} { - testutil.Assert(t, filter.Has(n), "expected filter to have %s", n) - } - testutil.Equals(t, 3, filter.Count()) - }) -} - func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) { cases := map[string]struct { maxSeriesLimit uint64 diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 82d42dd9d7..846fea2e98 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -61,7 +61,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -1661,7 +1660,6 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { chunksLimiterFactory: NewChunksLimiterFactory(0), seriesLimiterFactory: NewSeriesLimiterFactory(0), bytesLimiterFactory: NewBytesLimiterFactory(0), - labelNamesSet: stringset.AllStrings(), } t.Run("invoke series for one block. Fill the cache on the way.", func(t *testing.T) { @@ -3471,9 +3469,6 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) - // make sure to have updated inner label names - bucketStore.UpdateLabelNames() - srv := newStoreSeriesServer(context.Background()) testutil.Ok(t, bucketStore.Series(&storepb.SeriesRequest{ WithoutReplicaLabels: []string{"replica"}, @@ -3484,5 +3479,8 @@ func TestBucketStoreDedupOnBlockSeriesSet(t *testing.T) { }, }, srv)) + testutil.Equals(t, true, slices.IsSortedFunc(srv.SeriesSet, func(x, y storepb.Series) bool { + return labels.Compare(x.PromLabels(), y.PromLabels()) < 0 + })) testutil.Equals(t, 2, len(srv.SeriesSet)) } diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index c41b67d152..e6cadfbea9 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -9,24 +9,35 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" +) + +type sortingStrategy uint64 + +const ( + sortingStrategyStore sortingStrategy = iota + 1 + sortingStrategyNone ) // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. type flushableServer interface { storepb.Store_SeriesServer + Flush() error } func newFlushableServer( upstream storepb.Store_SeriesServer, - labelNames stringset.Set, - replicaLabels []string, + sortingsortingStrategy sortingStrategy, ) flushableServer { - if labelNames.HasAny(replicaLabels) { + switch sortingsortingStrategy { + case sortingStrategyStore: return &resortingServer{Store_SeriesServer: upstream} + case sortingStrategyNone: + return &passthroughServer{Store_SeriesServer: upstream} + default: + // should not happen. + panic("unexpected sorting strategy") } - return &passthroughServer{Store_SeriesServer: upstream} } // passthroughServer is a flushableServer that forwards all data to diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 244ae5592d..fd6d4c0195 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -42,7 +42,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -54,7 +53,6 @@ type PrometheusStore struct { buffers sync.Pool component component.StoreAPI externalLabelsFn func() labels.Labels - labelNamesSet func() stringset.Set promVersion func() string timestamps func() (mint int64, maxt int64) @@ -81,7 +79,6 @@ func NewPrometheusStore( component component.StoreAPI, externalLabelsFn func() labels.Labels, timestamps func() (mint int64, maxt int64), - labelNamesSet func() stringset.Set, promVersion func() string, ) (*PrometheusStore, error) { if logger == nil { @@ -95,7 +92,6 @@ func NewPrometheusStore( externalLabelsFn: externalLabelsFn, promVersion: promVersion, timestamps: timestamps, - labelNamesSet: labelNamesSet, remoteReadAcceptableResponses: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS, prompb.ReadRequest_SAMPLES}, buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) @@ -149,7 +145,8 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) + s := newFlushableServer(seriesSrv, sortingStrategyStore) + extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 82965672c7..d0597b6e9c 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -26,7 +26,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" - "github.com/thanos-io/thanos/pkg/stringset" "github.com/thanos-io/thanos/pkg/testutil/custom" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) @@ -72,7 +71,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) // MaxTime does not matter. testutil.Ok(t, err) @@ -234,7 +232,6 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }, - func() stringset.Set { return stringset.AllStrings() }, nil, ) testutil.Ok(t, err) @@ -417,7 +414,6 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -481,7 +477,6 @@ func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) srv := newStoreSeriesServer(ctx) @@ -512,7 +507,6 @@ func TestPrometheusStore_Info(t *testing.T) { proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) @@ -592,7 +586,6 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }, - func() stringset.Set { return stringset.AllStrings() }, nil) testutil.Ok(t, err) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ea18b134d..51631b388a 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -164,9 +164,7 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse { // tournament trees need n-1 auxiliary nodes so there // might not be much of a difference. type ProxyResponseHeap struct { - nodes []ProxyResponseHeapNode - iLblsScratch labels.Labels - jLblsScratch labels.Labels + nodes []ProxyResponseHeapNode } func (h *ProxyResponseHeap) Less(i, j int) bool { @@ -174,26 +172,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - // Response sets are sorted before adding external labels. - // This comparison excludes those labels to keep the same order. - iStoreLbls := h.nodes[i].rs.StoreLabels() - jStoreLbls := h.nodes[j].rs.StoreLabels() - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - copyLabels(&h.iLblsScratch, iLbls) - copyLabels(&h.jLblsScratch, jLbls) - - var iExtLbls, jExtLbls labels.Labels - h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) - h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) - - c := labels.Compare(h.iLblsScratch, h.jLblsScratch) - if c != 0 { - return c < 0 - } - return labels.Compare(iExtLbls, jExtLbls) < 0 + return labels.Compare(iLbls, jLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { @@ -774,9 +756,9 @@ func newEagerRespSet( // This should be used only for stores that does not support doing this on server side. // See docs/proposals-accepted/20221129-avoid-global-sort.md for details. - if len(l.removeLabels) > 0 { - sortWithoutLabels(l.bufferedResponses, l.removeLabels) - } + // NOTE. Client is not guaranteed to give a sorted response when extLset is added + // Generally we need to resort here. + sortWithoutLabels(l.bufferedResponses, l.removeLabels) }(ret) @@ -794,34 +776,6 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels return l } -// dropLabels removes labels from the given label set and returns the removed labels. -func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) { - cutoff := len(l) - for i := 0; i < len(l); i++ { - if i == cutoff { - break - } - if _, ok := labelsToDrop[l[i].Name]; !ok { - continue - } - - lbl := l[i] - l = append(append(l[:i], l[i+1:]...), lbl) - cutoff-- - i-- - } - - return l[:cutoff], l[cutoff:] -} - -func copyLabels(dest *labels.Labels, src labels.Labels) { - if len(*dest) < cap(src) { - *dest = make([]labels.Label, len(src)) - } - *dest = (*dest)[:len(src)] - copy(*dest, src) -} - // sortWithoutLabels removes given labels from series and re-sorts the series responses that the same // series with different labels are coming right after each other. Other types of responses are moved to front. func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) { @@ -831,7 +785,9 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] continue } - ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + if len(labelsToRemove) > 0 { + ser.Labels = labelpb.ZLabelsFromPromLabels(rmLabels(labelpb.ZLabelsToPromLabels(ser.Labels), labelsToRemove)) + } } // With the re-ordered label sets, re-sorting all series aligns the same series diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index fdfec178ca..50fe2d46be 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -82,33 +82,6 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), }, }, - { - title: "merge duplicated sets that were ordered before adding external labels", - input: []respSet{ - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - }, { title: "merge repeated series in stores with different external labels", input: []respSet{ @@ -190,6 +163,37 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), }, }, + { + title: "test", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + }, } { t.Run(tcase.title, func(t *testing.T) { h := NewProxyResponseHeap(tcase.input...) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73604b9236..b5182f3008 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "sync" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -26,7 +25,6 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) const RemoteReadFrameLimit = 1048576 @@ -46,9 +44,6 @@ type TSDBStore struct { buffers sync.Pool maxBytesPerFrame int - lmx sync.RWMutex - labelNamesSet stringset.Set - extLset labels.Labels mtx sync.RWMutex } @@ -77,7 +72,6 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI component: component, extLset: extLset, maxBytesPerFrame: RemoteReadFrameLimit, - labelNamesSet: stringset.AllStrings(), buffers: sync.Pool{New: func() interface{} { b := make([]byte, 0, initialBufSize) return &b @@ -175,7 +169,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv, sortingStrategyStore) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil { @@ -376,38 +370,3 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque return &storepb.LabelValuesResponse{Values: values}, nil } - -func (s *TSDBStore) UpdateLabelNames(ctx context.Context) { - newSet := stringset.New() - q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) - if err != nil { - level.Warn(s.logger).Log("msg", "error creating tsdb querier", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names") - - res, _, err := q.LabelNames() - if err != nil { - level.Warn(s.logger).Log("msg", "error getting label names", "err", err.Error()) - s.setLabelNamesSet(stringset.AllStrings()) - return - } - for _, l := range res { - newSet.Insert(l) - } - s.setLabelNamesSet(newSet) -} - -func (s *TSDBStore) setLabelNamesSet(newSet stringset.Set) { - s.lmx.Lock() - s.labelNamesSet = newSet - s.lmx.Unlock() -} - -func (b *TSDBStore) LabelNamesSet() stringset.Set { - b.lmx.RLock() - defer b.lmx.RUnlock() - - return b.labelNamesSet -} diff --git a/pkg/stringset/set.go b/pkg/stringset/set.go deleted file mode 100644 index 080071570f..0000000000 --- a/pkg/stringset/set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package stringset - -import ( - cuckoo "github.com/seiflotfy/cuckoofilter" -) - -type Set interface { - Has(string) bool - HasAny([]string) bool - // Count returns the number of elements in the set. - // A value of -1 indicates infinite size and can be returned by a - // set representing all possible string values. - Count() int -} - -type fixedSet struct { - cuckoo *cuckoo.Filter -} - -func (f fixedSet) HasAny(strings []string) bool { - for _, s := range strings { - if f.Has(s) { - return true - } - } - return false -} - -func NewFromStrings(items ...string) Set { - f := cuckoo.NewFilter(uint(len(items))) - for _, label := range items { - f.InsertUnique([]byte(label)) - } - - return &fixedSet{cuckoo: f} -} - -func (f fixedSet) Has(s string) bool { - return f.cuckoo.Lookup([]byte(s)) -} - -func (f fixedSet) Count() int { - return int(f.cuckoo.Count()) -} - -type mutableSet struct { - cuckoo *cuckoo.ScalableCuckooFilter -} - -type MutableSet interface { - Set - Insert(string) -} - -func New() MutableSet { - return &mutableSet{ - cuckoo: cuckoo.NewScalableCuckooFilter(), - } -} - -func (e mutableSet) Insert(s string) { - e.cuckoo.InsertUnique([]byte(s)) -} - -func (e mutableSet) Has(s string) bool { - return e.cuckoo.Lookup([]byte(s)) -} - -func (e mutableSet) HasAny(strings []string) bool { - for _, s := range strings { - if e.Has(s) { - return true - } - } - return false -} - -func (e mutableSet) Count() int { - return int(e.cuckoo.Count()) -} - -type allStringsSet struct{} - -func (e allStringsSet) HasAny(_ []string) bool { - return true -} - -func AllStrings() *allStringsSet { - return &allStringsSet{} -} - -func (e allStringsSet) Has(_ string) bool { - return true -} - -func (e allStringsSet) Count() int { - return -1 -}