Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@

## 1.21.0 in progress

* [BUGFIX] KV store: Fix false-positive `status_code="500"` metrics for HA tracker CAS operations when using memberlist. #7408
* [BUGFIX] Fix nil when ingester_query_max_attempts > 1. #7369
* [BUGFIX] Alertmanager: Fix disappearing user config and state when ring is temporarily unreachable. #7372
* [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7375
* [BUGFIX] Fix memory leak in `ReuseWriteRequestV2` by explicitly clearing the `Symbols` backing array string pointers before returning the object to `sync.Pool`. #7373

* [CHANGE] Ruler: Graduate Ruler API from experimental. #7312
* Flag: Renamed `-experimental.ruler.enable-api` to `-ruler.enable-api`. The old flag is kept as deprecated.
* Ruler API is no longer marked as experimental.
Expand Down
21 changes: 14 additions & 7 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,10 @@ func (am *MultitenantAlertmanager) userIndexUpdateLoop(ctx context.Context) {
level.Error(am.logger).Log("msg", "context timeout, exit user index update loop", "err", ctx.Err())
return
case <-ticker.C:
owned := am.isUserOwned(userID)
owned, err := am.isUserOwned(userID)
if err != nil {
continue
}
if !owned {
continue
}
Expand Down Expand Up @@ -804,7 +807,11 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)

// Filter out users not owned by this shard.
for _, userID := range allUserIDs {
if am.isUserOwned(userID) {
owned, err := am.isUserOwned(userID)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to check if user %s is owned", userID)
}
if owned {
ownedUserIDs = append(ownedUserIDs, userID)
}
}
Expand All @@ -821,24 +828,24 @@ func (am *MultitenantAlertmanager) loadAlertmanagerConfigs(ctx context.Context)
return allUserIDs, configs, nil
}

func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
func (am *MultitenantAlertmanager) isUserOwned(userID string) (bool, error) {
if !am.allowedTenants.IsAllowed(userID) {
return false
return false, nil
}

// If sharding is disabled, any alertmanager instance owns all users.
if !am.cfg.ShardingEnabled {
return true
return true, nil
}

alertmanagers, err := am.ring.Get(users.ShardByUser(userID), getSyncRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
am.ringCheckErrors.Inc()
level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err)
return false
return false, err
}

return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr())
return alertmanagers.Includes(am.ringLifecycler.GetInstanceAddr()), nil
}

func (am *MultitenantAlertmanager) syncConfigs(cfgs map[string]alertspb.AlertConfigDesc) {
Expand Down
125 changes: 125 additions & 0 deletions pkg/alertmanager/multitenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,3 +2501,128 @@ func (m *mockAlertManagerLimits) AlertmanagerMaxSilencesCount(_ string) int {
func (m *mockAlertManagerLimits) AlertmanagerMaxSilenceSizeBytes(_ string) int {
return m.maxSilencesSizeBytes
}

func TestMultitenantAlertmanager_isUserOwned(t *testing.T) {
ctx := context.Background()
_ = ctx
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

// We don't start the AM, so the ring is empty.
// When sharding is enabled, isUserOwned will call am.ring.Get, which should fail because the ring is empty.
owned, err := am.isUserOwned("user-1")
require.Error(t, err)
require.Equal(t, ring.ErrEmptyRing, err)
require.False(t, owned)

// If sharding is disabled, it should return true, nil
am.cfg.ShardingEnabled = false
owned, err = am.isUserOwned("user-1")
require.NoError(t, err)
require.True(t, owned)
}

func TestMultitenantAlertmanager_loadAndSyncConfigs_deletesUserFromStore(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

// We don't start the AM. We just manually insert state, then call loadAndSyncConfigs.
user1 := "user-1"

// 1. Create a FullState (remote state) for user1 in the store.
fullState := alertspb.FullStateDesc{
State: &clusterpb.FullState{},
}
err = alertStore.SetFullState(ctx, user1, fullState)
require.NoError(t, err)

// Since we did NOT SetAlertConfig for user1, the store.ListAllUsers will return empty.
allUsers, err := alertStore.ListAllUsers(ctx)
require.NoError(t, err)
require.Empty(t, allUsers)

// Verify user1's FullState is written to the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)

// 2. Call loadAndSyncConfigs. It should fetch all users (which is empty),
// and since sharding is enabled, it should clean up unused remote state for any users not in the list.
// user1 has state but no config, so its state should be pruned.
err = am.loadAndSyncConfigs(ctx, reasonPeriodic)
require.NoError(t, err)

// 3. Verify user1's FullState is deleted from the store.
_, err = alertStore.GetFullState(ctx, user1)
require.Error(t, err)
require.Equal(t, alertspb.ErrNotFound, err)
}

func TestMultitenantAlertmanager_loadAndSyncConfigs_DoesNotDeleteUserFromStoreWhenRingUnreachable(t *testing.T) {
ctx := context.Background()
amConfig := mockAlertmanagerConfig(t)
amConfig.ShardingEnabled = true

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

alertStore, err := prepareInMemoryAlertStore()
require.NoError(t, err)

am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
require.NoError(t, err)

user1 := "user-1"

// 1. Create a FullState (remote state) for user1 in the store.
fullState := alertspb.FullStateDesc{
State: &clusterpb.FullState{},
}
err = alertStore.SetFullState(ctx, user1, fullState)
require.NoError(t, err)

// Since we DO SetAlertConfig for user1, the store.ListAllUsers will return the user.
err = alertStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
User: user1,
RawConfig: simpleConfigOne,
Templates: []*alertspb.TemplateDesc{},
})
require.NoError(t, err)

allUsers, err := alertStore.ListAllUsers(ctx)
require.NoError(t, err)
require.Len(t, allUsers, 1)

// Verify user1's FullState is written to the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)

// 2. Call loadAndSyncConfigs. It should fetch all users.
// Since sharding is enabled but ring is empty (unreachable), isUserOwned will fail.
// loadAndSyncConfigs will return early with an error before pruning any states!
err = am.loadAndSyncConfigs(ctx, reasonPeriodic)
require.Error(t, err)
require.Contains(t, err.Error(), ring.ErrEmptyRing.Error())

// 3. Verify user1's FullState is STILL in the store.
_, err = alertStore.GetFullState(ctx, user1)
require.NoError(t, err)
}
4 changes: 4 additions & 0 deletions pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
req.data = nil
}
req.Source = 0

for i := range req.Symbols {
req.Symbols[i] = ""
}
req.Symbols = req.Symbols[:0]
if req.Timeseries != nil {
ReuseSliceV2(req.Timeseries)
Expand Down
32 changes: 32 additions & 0 deletions pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,38 @@ func TestTimeseriesV2FromPool(t *testing.T) {
})
}

func TestReuseWriteRequestV2(t *testing.T) {
req := PreallocWriteRequestV2FromPool()

// Populate req with some data.
req.Source = RULE
req.Symbols = append(req.Symbols, "", "__name__", "test")

tsSlice := PreallocTimeseriesV2SliceFromPool()
tsSlice = append(tsSlice, PreallocTimeseriesV2{TimeSeriesV2: TimeseriesV2FromPool()})
req.Timeseries = tsSlice

// Capture backing array before reuse
symbolsBackingArray := req.Symbols[:cap(req.Symbols)]
require.Equal(t, "__name__", symbolsBackingArray[1])
require.Equal(t, "test", symbolsBackingArray[2])

// Put the request back into the pool
ReuseWriteRequestV2(req)

// Verify clearing directly on the backing array
for i, s := range symbolsBackingArray[:3] {
assert.Equalf(t, "", s, "symbol at index %d not cleared", i)
}

// Source is reset to default
assert.Equal(t, API, req.Source)
// The symbol length is properly reset to 0.
assert.Len(t, req.Symbols, 0)
// Timeseries slice is nil
assert.Nil(t, req.Timeseries)
}

func BenchmarkMarshallWriteRequestV2(b *testing.B) {
ts := PreallocTimeseriesV2SliceFromPool()

Expand Down
16 changes: 15 additions & 1 deletion pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,13 @@ func (q *distributorQuerier) queryWithRetry(ctx context.Context, queryFunc func(
retries.Wait()
}

// If the loop never executed (e.g. context cancelled before the first
// attempt), result and err are both nil. Return the context error so
// callers don't receive a nil result with no error.
if err == nil {
err = ctx.Err()
}

return result, err
}

Expand Down Expand Up @@ -289,7 +296,7 @@ func (q *distributorQuerier) LabelNames(ctx context.Context, hints *storage.Labe
}

func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc func() ([]string, error)) ([]string, error) {
if q.ingesterQueryMaxAttempts == 1 {
if q.ingesterQueryMaxAttempts <= 1 {
return labelsFunc()
}

Expand All @@ -312,6 +319,13 @@ func (q *distributorQuerier) labelsWithRetry(ctx context.Context, labelsFunc fun
retries.Wait()
}

// If the loop never executed (e.g. context cancelled before the first
// attempt), result and err are both nil. Return the context error so
// callers don't receive a nil result with no error.
if err == nil {
err = ctx.Err()
}

return result, err
}

Expand Down
86 changes: 86 additions & 0 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,92 @@ func TestDistributorQuerier_Retry(t *testing.T) {
}
}

// TestDistributorQuerier_Select_CancelledContext_NoRetry verifies that with
// ingesterQueryMaxAttempts=1, a cancelled context does not panic because the
// direct code path (no retry loop) is used.
func TestDistributorQuerier_Select_CancelledContext_NoRetry(t *testing.T) {
t.Parallel()

ctx := user.InjectOrgID(context.Background(), "0")
ctx, cancel := context.WithCancel(ctx)
cancel()

d := &MockDistributor{}
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, context.Canceled)

ingesterQueryMaxAttempts := 1
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool {
return true
}, ingesterQueryMaxAttempts)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

require.NotPanics(t, func() {
seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt})
_ = seriesSet.Err()
})
}

// TestDistributorQuerier_Select_CancelledContext reproduces the panic described
// in https://github.com/cortexproject/cortex/issues/7364.
//
// When ingesterQueryMaxAttempts > 1 and the context is cancelled before the
// retry loop starts (e.g. query timeout or another querier goroutine failing),
// backoff.Ongoing() returns false immediately. queryWithRetry now propagates
// ctx.Err() so callers always see a non-nil error.
func TestDistributorQuerier_Select_CancelledContext(t *testing.T) {
t.Parallel()

// Create a context that is already cancelled.
ctx := user.InjectOrgID(context.Background(), "0")
ctx, cancel := context.WithCancel(ctx)
cancel()

d := &MockDistributor{}
// No mock expectations needed — QueryStream should never be called
// because the context is already cancelled.

ingesterQueryMaxAttempts := 2
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool {
return true
}, ingesterQueryMaxAttempts)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

seriesSet := querier.Select(ctx, true, &storage.SelectHints{Start: mint, End: maxt})
require.ErrorIs(t, seriesSet.Err(), context.Canceled)
}

// TestDistributorQuerier_Labels_CancelledContext verifies that labelsWithRetry
// propagates ctx.Err() when the context is cancelled before the retry loop
// executes.
func TestDistributorQuerier_Labels_CancelledContext(t *testing.T) {
t.Parallel()

ctx := user.InjectOrgID(context.Background(), "0")
ctx, cancel := context.WithCancel(ctx)
cancel()

d := &MockDistributor{}

ingesterQueryMaxAttempts := 2
queryable := newDistributorQueryable(d, true, true, batch.NewChunkMergeIterator, 0, func(string) bool {
return true
}, ingesterQueryMaxAttempts)
querier, err := queryable.Querier(mint, maxt)
require.NoError(t, err)

t.Run("LabelNames", func(t *testing.T) {
_, _, err := querier.LabelNames(ctx, nil)
require.ErrorIs(t, err, context.Canceled)
})

t.Run("LabelValues", func(t *testing.T) {
_, _, err := querier.LabelValues(ctx, "foo", nil)
require.ErrorIs(t, err, context.Canceled)
})
}

func TestDistributorQuerier_LabelNames(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 3 additions & 1 deletion pkg/ring/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kv

import (
"context"
"errors"
"strconv"
"time"

Expand Down Expand Up @@ -33,7 +34,8 @@ func getCasErrorCode(err error) string {

// If the error has been returned to abort the CAS operation, then we shouldn't
// consider it an error when tracking metrics.
if casErr, ok := err.(interface{ IsOperationAborted() bool }); ok && casErr.IsOperationAborted() {
var casAborted interface{ IsOperationAborted() bool }
if errors.As(err, &casAborted) && casAborted.IsOperationAborted() {
return "200"
}

Expand Down
Loading
Loading