diff --git a/CHANGELOG.md b/CHANGELOG.md index 89eceff1a1a..0f3f03a5974 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ ## 1.21.0 in progress +* [BUGFIX] KV store: Fix false-positive `status_code="500"` metrics for HA tracker CAS operations when using memberlist. #7408 +* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369 +* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372 +* [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373 +* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7375 +* [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373 + * [CHANGE] Ruler: Graduate Ruler API from experimental. #7312 * Flag: Renamed `-experimental.ruler.enable-api` to `-ruler.enable-api`. The old flag is kept as deprecated. * Ruler API is no longer marked as experimental. diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index f473a66c1bb..9494ce77c74 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -714,7 +714,10 @@ func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) { level.Error(am.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err()) return case <-ticker.C: - owned := am.isUserOwned(userID) + owned, err := am.isUserOwned(userID) + if err != nil { + continue + } if !owned { continue } @@ -804,7 +807,11 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) // Filter out users not owned by this shard. for _, userID := range allUserIDs { - if am.isUserOwned(userID) { + owned, err := am.isUserOwned(userID) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to check if user %s is owned", userID) + } + if owned { ownedUserIDs = append(ownedUserIDs, userID) } } @@ -821,24 +828,24 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context) return allUserIDs, configs, nil } -func (am *MultitenantAlertmanager) isUserOwned(userID string) bool { +func (am *MultitenantAlertmanager) isUserOwned(userID string) (bool, error) { if !am.allowedTenants.IsAllowed(userID) { - return false + return false, nil } // If sharding is disabled, any alertmanager instance owns all users. if !am.cfg.ShardingEnabled { - return true + return true, nil } alertmanagers, err := am.ring.Get(users.ShardByUser(userID), getSyncRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil) if err != nil { am.ringCheckErrors.Inc() level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err) - return false + return false, err } - return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()) + return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()), nil } func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertConfigDesc) { diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index c1e6a483af2..7cde6689d55 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -2501,3 +2501,128 @@ func (m *mockAlertManagerLimits) AlertmanagerMaxSilencesCount(_ string) int { func (m *mockAlertManagerLimits) AlertmanagerMaxSilenceSizeBytes(_ string) int { return m.maxSilencesSizeBytes } + +func TestMultitenantAlertmanager_isUserOwned(t *testing.T) { + ctx := context.Background() + _ = ctx + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + alertStore, err := prepareInMemoryAlertStore() + require.NoError(t, err) + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil) + require.NoError(t, err) + + // We don't start the AM, so the ring is empty. + // When sharding is enabled, isUserOwned will call am.ring.Get, which should fail because the ring is empty. + owned, err := am.isUserOwned("user-1") + require.Error(t, err) + require.Equal(t, ring.ErrEmptyRing, err) + require.False(t, owned) + + // If sharding is disabled, it should return true, nil + am.cfg.ShardingEnabled = false + owned, err = am.isUserOwned("user-1") + require.NoError(t, err) + require.True(t, owned) +} + +func TestMultitenantAlertmanager_loadAndSyncConfigs_deletesUserFromStore(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + alertStore, err := prepareInMemoryAlertStore() + require.NoError(t, err) + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil) + require.NoError(t, err) + + // We don't start the AM. We just manually insert state, then call loadAndSyncConfigs. + user1 := "user-1" + + // 1. Create a FullState (remote state) for user1 in the store. + fullState := alertspb.FullStateDesc{ + State: &clusterpb.FullState{}, + } + err = alertStore.SetFullState(ctx, user1, fullState) + require.NoError(t, err) + + // Since we did NOT SetAlertConfig for user1, the store.ListAllUsers will return empty. + allUsers, err := alertStore.ListAllUsers(ctx) + require.NoError(t, err) + require.Empty(t, allUsers) + + // Verify user1's FullState is written to the store. + _, err = alertStore.GetFullState(ctx, user1) + require.NoError(t, err) + + // 2. Call loadAndSyncConfigs. It should fetch all users (which is empty), + // and since sharding is enabled, it should clean up unused remote state for any users not in the list. + // user1 has state but no config, so its state should be pruned. + err = am.loadAndSyncConfigs(ctx, reasonPeriodic) + require.NoError(t, err) + + // 3. Verify user1's FullState is deleted from the store. + _, err = alertStore.GetFullState(ctx, user1) + require.Error(t, err) + require.Equal(t, alertspb.ErrNotFound, err) +} + +func TestMultitenantAlertmanager_loadAndSyncConfigs_DoesNotDeleteUserFromStoreWhenRingUnreachable(t *testing.T) { + ctx := context.Background() + amConfig := mockAlertmanagerConfig(t) + amConfig.ShardingEnabled = true + + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + alertStore, err := prepareInMemoryAlertStore() + require.NoError(t, err) + + am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil) + require.NoError(t, err) + + user1 := "user-1" + + // 1. Create a FullState (remote state) for user1 in the store. + fullState := alertspb.FullStateDesc{ + State: &clusterpb.FullState{}, + } + err = alertStore.SetFullState(ctx, user1, fullState) + require.NoError(t, err) + + // Since we DO SetAlertConfig for user1, the store.ListAllUsers will return the user. + err = alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{ + User: user1, + RawConfig: simpleConfigOne, + Templates: []*alertspb.TemplateDesc{}, + }) + require.NoError(t, err) + + allUsers, err := alertStore.ListAllUsers(ctx) + require.NoError(t, err) + require.Len(t, allUsers, 1) + + // Verify user1's FullState is written to the store. + _, err = alertStore.GetFullState(ctx, user1) + require.NoError(t, err) + + // 2. Call loadAndSyncConfigs. It should fetch all users. + // Since sharding is enabled but ring is empty (unreachable), isUserOwned will fail. + // loadAndSyncConfigs will return early with an error before pruning any states! + err = am.loadAndSyncConfigs(ctx, reasonPeriodic) + require.Error(t, err) + require.Contains(t, err.Error(), ring.ErrEmptyRing.Error()) + + // 3. Verify user1's FullState is STILL in the store. + _, err = alertStore.GetFullState(ctx, user1) + require.NoError(t, err) +} diff --git a/pkg/cortexpb/timeseriesv2.go b/pkg/cortexpb/timeseriesv2.go index 291ac32789e..084dde52110 100644 --- a/pkg/cortexpb/timeseriesv2.go +++ b/pkg/cortexpb/timeseriesv2.go @@ -77,6 +77,10 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) { req.data = nil } req.Source = 0 + + for i := range req.Symbols { + req.Symbols[i] = "" + } req.Symbols = req.Symbols[:0] if req.Timeseries != nil { ReuseSliceV2(req.Timeseries) diff --git a/pkg/cortexpb/timeseriesv2_test.go b/pkg/cortexpb/timeseriesv2_test.go index 270a8597111..bd6999406cd 100644 --- a/pkg/cortexpb/timeseriesv2_test.go +++ b/pkg/cortexpb/timeseriesv2_test.go @@ -52,6 +52,38 @@ func TestTimeseriesV2FromPool(t *testing.T) { }) } +func TestReuseWriteRequestV2(t *testing.T) { + req := PreallocWriteRequestV2FromPool() + + // Populate req with some data. + req.Source = RULE + req.Symbols = append(req.Symbols, "", "__name__", "test") + + tsSlice := PreallocTimeseriesV2SliceFromPool() + tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()}) + req.Timeseries = tsSlice + + // Capture backing array before reuse + symbolsBackingArray := req.Symbols[:cap(req.Symbols)] + require.Equal(t, "__name__", symbolsBackingArray[1]) + require.Equal(t, "test", symbolsBackingArray[2]) + + // Put the request back into the pool + ReuseWriteRequestV2(req) + + // Verify clearing directly on the backing array + for i, s := range symbolsBackingArray[:3] { + assert.Equalf(t, "", s, "symbol at index %d not cleared", i) + } + + // Source is reset to default + assert.Equal(t, API, req.Source) + // The symbol length is properly reset to 0. + assert.Len(t, req.Symbols, 0) + // Timeseries slice is nil + assert.Nil(t, req.Timeseries) +} + func BenchmarkMarshallWriteRequestV2(b *testing.B) { ts := PreallocTimeseriesV2SliceFromPool() diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index f57f32bda55..8d82cd78878 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -226,6 +226,13 @@ func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func( retries.Wait() } + // If the loop never executed (e.g. context cancelled before the first + // attempt), result and err are both nil. Return the context error so + // callers don't receive a nil result with no error. + if err == nil { + err = ctx.Err() + } + return result, err } @@ -289,7 +296,7 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe } func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) { - if q.ingesterQueryMaxAttempts == 1 { + if q.ingesterQueryMaxAttempts <= 1 { return labelsFunc() } @@ -312,6 +319,13 @@ func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc fun retries.Wait() } + // If the loop never executed (e.g. context cancelled before the first + // attempt), result and err are both nil. Return the context error so + // callers don't receive a nil result with no error. + if err == nil { + err = ctx.Err() + } + return result, err } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 825da08860f..7ec3f583a1c 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -387,6 +387,92 @@ func TestDistributorQuerier_Retry(t *testing.T) { } } +// TestDistributorQuerier_Select_CancelledContext_NoRetry verifies that with +// ingesterQueryMaxAttempts=1, a cancelled context does not panic because the +// direct code path (no retry loop) is used. +func TestDistributorQuerier_Select_CancelledContext_NoRetry(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "0") + ctx, cancel := context.WithCancel(ctx) + cancel() + + d := &MockDistributor{} + d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, context.Canceled) + + ingesterQueryMaxAttempts := 1 + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + return true + }, ingesterQueryMaxAttempts) + querier, err := queryable.Querier(mint, maxt) + require.NoError(t, err) + + require.NotPanics(t, func() { + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}) + _ = seriesSet.Err() + }) +} + +// TestDistributorQuerier_Select_CancelledContext reproduces the panic described +// in https://github.com/cortexproject/cortex/issues/7364. +// +// When ingesterQueryMaxAttempts > 1 and the context is cancelled before the +// retry loop starts (e.g. query timeout or another querier goroutine failing), +// backoff.Ongoing() returns false immediately. queryWithRetry now propagates +// ctx.Err() so callers always see a non-nil error. +func TestDistributorQuerier_Select_CancelledContext(t *testing.T) { + t.Parallel() + + // Create a context that is already cancelled. + ctx := user.InjectOrgID(context.Background(), "0") + ctx, cancel := context.WithCancel(ctx) + cancel() + + d := &MockDistributor{} + // No mock expectations needed — QueryStream should never be called + // because the context is already cancelled. + + ingesterQueryMaxAttempts := 2 + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + return true + }, ingesterQueryMaxAttempts) + querier, err := queryable.Querier(mint, maxt) + require.NoError(t, err) + + seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt}) + require.ErrorIs(t, seriesSet.Err(), context.Canceled) +} + +// TestDistributorQuerier_Labels_CancelledContext verifies that labelsWithRetry +// propagates ctx.Err() when the context is cancelled before the retry loop +// executes. +func TestDistributorQuerier_Labels_CancelledContext(t *testing.T) { + t.Parallel() + + ctx := user.InjectOrgID(context.Background(), "0") + ctx, cancel := context.WithCancel(ctx) + cancel() + + d := &MockDistributor{} + + ingesterQueryMaxAttempts := 2 + queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool { + return true + }, ingesterQueryMaxAttempts) + querier, err := queryable.Querier(mint, maxt) + require.NoError(t, err) + + t.Run("LabelNames", func(t *testing.T) { + _, _, err := querier.LabelNames(ctx, nil) + require.ErrorIs(t, err, context.Canceled) + }) + + t.Run("LabelValues", func(t *testing.T) { + _, _, err := querier.LabelValues(ctx, "foo", nil) + require.ErrorIs(t, err, context.Canceled) + }) +} + func TestDistributorQuerier_LabelNames(t *testing.T) { t.Parallel() diff --git a/pkg/ring/kv/metrics.go b/pkg/ring/kv/metrics.go index 30ed8ff4aa3..3d2fa8928c8 100644 --- a/pkg/ring/kv/metrics.go +++ b/pkg/ring/kv/metrics.go @@ -2,6 +2,7 @@ package kv import ( "context" + "errors" "strconv" "time" @@ -33,7 +34,8 @@ func getCasErrorCode(err error) string { // If the error has been returned to abort the CAS operation, then we shouldn't // consider it an error when tracking metrics. - if casErr, ok := err.(interface{ IsOperationAborted() bool }); ok && casErr.IsOperationAborted() { + var casAborted interface{ IsOperationAborted() bool } + if errors.As(err, &casAborted) && casAborted.IsOperationAborted() { return "200" } diff --git a/pkg/ring/kv/metrics_test.go b/pkg/ring/kv/metrics_test.go new file mode 100644 index 00000000000..e7887aa3ec5 --- /dev/null +++ b/pkg/ring/kv/metrics_test.go @@ -0,0 +1,52 @@ +package kv + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// operationAbortedError is a local stub that mimics ha.ReplicasNotMatchError, +// implementing IsOperationAborted() to avoid an import cycle. +type operationAbortedError struct{} + +func (e operationAbortedError) Error() string { return "operation aborted" } +func (e operationAbortedError) IsOperationAborted() bool { return true } + +func TestGetCasErrorCode(t *testing.T) { + abortedErr := operationAbortedError{} + + tests := map[string]struct { + err error + expected string + }{ + "nil error": { + err: nil, + expected: "200", + }, + "operation aborted error (direct)": { + err: abortedErr, + expected: "200", + }, + "operation aborted error (single-wrapped by memberlist)": { + err: fmt.Errorf("fn returned error: %w", abortedErr), + expected: "200", + }, + "operation aborted error (double-wrapped by memberlist)": { + err: fmt.Errorf("failed to CAS-update key X: %w", + fmt.Errorf("fn returned error: %w", abortedErr)), + expected: "200", + }, + "generic error": { + err: fmt.Errorf("some real error"), + expected: "500", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.expected, getCasErrorCode(tc.err)) + }) + } +}