Skip to content

Commit

Permalink
feat: new stream count limiter (#13006)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
Co-authored-by: JordanRushing <rushing.jordan@gmail.com>
  • Loading branch information
vlad-diachenko and JordanRushing committed May 23, 2024
1 parent 987e551 commit 1111595
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 55 deletions.
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2978,6 +2978,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = true]

# When true an ingester takes into account only the streams that it owns
# according to the ring while applying the stream limit.
# CLI flag: -ingester.use-owned-stream-count
[use_owned_stream_count: <boolean> | default = false]

# Maximum number of active streams per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-streams-per-user
[max_streams_per_user: <int> | default = 0]
Expand Down
75 changes: 45 additions & 30 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ type instance struct {
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limiter *Limiter
limiter *Limiter
streamCountLimiter *streamCountLimiter
ownedStreamsSvc *ownedStreamService

configs *runtime.TenantConfigs

wal WAL
Expand Down Expand Up @@ -147,21 +150,24 @@ func newInstance(
if err != nil {
return nil, err
}

streams := newStreamsMap()
ownedStreamsSvc := newOwnedStreamService(instanceID, limiter)
c := config.SchemaConfig{Configs: periodConfigs}
i := &instance{
cfg: cfg,
streams: newStreamsMap(),
streams: streams,
buf: make([]byte, 0, 1024),
index: invertedIndex,
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

tailers: map[uint32]*tailer{},
limiter: limiter,
configs: configs,
tailers: map[uint32]*tailer{},
limiter: limiter,
streamCountLimiter: newStreamCountLimiter(instanceID, streams.Len, limiter, ownedStreamsSvc),
ownedStreamsSvc: ownedStreamsSvc,
configs: configs,

wal: wal,
metrics: metrics,
Expand Down Expand Up @@ -286,29 +292,11 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
}

if record != nil {
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len())
err = i.streamCountLimiter.AssertNewStreamAllowed(i.instanceID)
}

if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
return i.onStreamCreationError(ctx, pushReqStream, err, labels)
}

fp := i.getHashForLabels(labels)
Expand All @@ -333,21 +321,47 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
i.metrics.recoveredStreamsTotal.Inc()
}

i.onStreamCreated(s)

return s, nil
}

func (i *instance) onStreamCreationError(ctx context.Context, pushReqStream logproto.Stream, err error, labels labels.Labels) (*stream, error) {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}

validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
if i.customStreamsTracker != nil {
i.customStreamsTracker.DiscardedBytesAdd(ctx, i.instanceID, validation.StreamLimit, labels, float64(bytes))
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, labels, i.instanceID)
}

func (i *instance) onStreamCreated(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Inc()
memoryStreamsLabelsBytes.Add(float64(len(s.labels.String())))
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(s)
streamsCountStats.Add(1)

i.ownedStreamsSvc.incOwnedStreamCount()
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
"org_id", i.instanceID,
"stream", pushReqStream.Labels,
"stream", s.labels.String(),
)
}

return s, nil
}

func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*stream, error) {
Expand Down Expand Up @@ -407,6 +421,7 @@ func (i *instance) removeStream(s *stream) {
memoryStreams.WithLabelValues(i.instanceID).Dec()
memoryStreamsLabelsBytes.Sub(float64(len(s.labels.String())))
streamsCountStats.Add(-1)
i.ownedStreamsSvc.decOwnedStreamCount()
}
}

Expand Down
81 changes: 58 additions & 23 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type RingCount interface {

type Limits interface {
UnorderedWrites(userID string) bool
UseOwnedStreamCount(userID string) bool
MaxLocalStreamsPerUser(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PerStreamRateLimit(userID string) validation.RateLimit
Expand Down Expand Up @@ -76,46 +77,39 @@ func (l *Limiter) UnorderedWrites(userID string) bool {
return l.limits.UnorderedWrites(userID)
}

// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
// Until the limiter actually starts, all accesses are successful.
// This is used to disable limits while recovering from the WAL.
l.mtx.RLock()
defer l.mtx.RUnlock()
if l.disabled {
return nil
}

func (l *Limiter) GetStreamCountLimit(tenantID string) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
// Start by setting the local limit either from override or default
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
localLimit = l.limits.MaxLocalStreamsPerUser(tenantID)

// We can assume that streams are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)
globalLimit = l.limits.MaxGlobalStreamsPerUser(tenantID)
adjustedGlobalLimit = l.convertGlobalToLocalLimit(globalLimit)

// Set the calculated limit to the lesser of the local limit or the new calculated global limit
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
calculatedLimit = l.minNonZero(localLimit, adjustedGlobalLimit)

// If both the local and global limits are disabled, we just
// use the largest int value
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}
return
}

if streams < calculatedLimit {
return nil
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
return first
}

func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}

// todo: change to healthyInstancesInZoneCount() once
// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
Expand All @@ -131,12 +125,53 @@ func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
return 0
}

func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
type supplier[T any] func() T

type streamCountLimiter struct {
tenantID string
limiter *Limiter
defaultStreamCountSupplier supplier[int]
ownedStreamSvc *ownedStreamService
}

var noopFixedLimitSupplier = func() int {
return 0
}

func newStreamCountLimiter(tenantID string, defaultStreamCountSupplier supplier[int], limiter *Limiter, service *ownedStreamService) *streamCountLimiter {
return &streamCountLimiter{
tenantID: tenantID,
limiter: limiter,
defaultStreamCountSupplier: defaultStreamCountSupplier,
ownedStreamSvc: service,
}
}

return first
func (l *streamCountLimiter) AssertNewStreamAllowed(tenantID string) error {
streamCountSupplier, fixedLimitSupplier := l.getSuppliers(tenantID)
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit := l.getCurrentLimit(tenantID, fixedLimitSupplier)
actualStreamsCount := streamCountSupplier()
if actualStreamsCount < calculatedLimit {
return nil
}

return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, tenantID, actualStreamsCount, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}

func (l *streamCountLimiter) getCurrentLimit(tenantID string, fixedLimitSupplier supplier[int]) (calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit int) {
calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit = l.limiter.GetStreamCountLimit(tenantID)
fixedLimit := fixedLimitSupplier()
if fixedLimit > calculatedLimit {
calculatedLimit = fixedLimit
}
return
}

func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, fixedLimitSupplier supplier[int]) {
if l.limiter.limits.UseOwnedStreamCount(tenant) {
return l.ownedStreamSvc.getOwnedStreamCount, l.ownedStreamSvc.getFixedLimit
}
return l.defaultStreamCountSupplier, noopFixedLimitSupplier
}

type RateLimiterStrategy interface {
Expand Down
47 changes: 45 additions & 2 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,23 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/time/rate"

"github.com/grafana/loki/v3/pkg/validation"
)

func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
tests := map[string]struct {
maxLocalStreamsPerUser int
maxGlobalStreamsPerUser int
ringReplicationFactor int
ringIngesterCount int
streams int
expected error
useOwnedStreamService bool
fixedLimit int32
ownedStreamCount int64
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
Expand Down Expand Up @@ -94,6 +98,36 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
streams: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"actual limit must be used if it's greater than fixed limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 20,
ownedStreamCount: 3000,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 3000, 300, 500, 1000, 300),
},
"fixed limit must be used if it's greater than actual limit": {
maxLocalStreamsPerUser: 500,
maxGlobalStreamsPerUser: 1000,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: fmt.Errorf(errMaxStreamsPerUserLimitExceeded, "test", 2001, 2000, 500, 1000, 300),
},
"fixed limit must not be used if both limits are disabled": {
maxLocalStreamsPerUser: 0,
maxGlobalStreamsPerUser: 0,
ringReplicationFactor: 3,
ringIngesterCount: 10,
useOwnedStreamService: true,
fixedLimit: 2000,
ownedStreamCount: 2001,
expected: nil,
},
}

for testName, testData := range tests {
Expand All @@ -107,11 +141,20 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: testData.maxLocalStreamsPerUser,
MaxGlobalStreamsPerUser: testData.maxGlobalStreamsPerUser,
UseOwnedStreamCount: testData.useOwnedStreamService,
}, nil)
require.NoError(t, err)

ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount),
}
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)
defaultCountSupplier := func() int {
return testData.streams
}
streamCountLimiter := newStreamCountLimiter("test", defaultCountSupplier, limiter, ownedStreamSvc)
actual := streamCountLimiter.AssertNewStreamAllowed("test")

assert.Equal(t, testData.expected, actual)
})
Expand Down
44 changes: 44 additions & 0 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ingester

import "go.uber.org/atomic"

type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32

//todo: implement job to recalculate it
ownedStreamCount *atomic.Int64
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
ownedStreamCount: atomic.NewInt64(0),
fixedLimit: atomic.NewInt32(0),
}
svc.updateFixedLimit()
return svc
}

func (s *ownedStreamService) getOwnedStreamCount() int {
return int(s.ownedStreamCount.Load())
}

func (s *ownedStreamService) updateFixedLimit() {
limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID)
s.fixedLimit.Store(int32(limit))
}

func (s *ownedStreamService) getFixedLimit() int {
return int(s.fixedLimit.Load())
}

func (s *ownedStreamService) incOwnedStreamCount() {
s.ownedStreamCount.Inc()
}

func (s *ownedStreamService) decOwnedStreamCount() {
s.ownedStreamCount.Dec()
}

0 comments on commit 1111595

Please sign in to comment.