diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 961f5bd24..d51face9a 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -954,6 +954,7 @@ func main() { queryEngine = promql.NewEngine(opts) + ruleQueryOffset := time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: adapter, // PP_CHANGES.md: rebuild on cpp Queryable: adapter, // PP_CHANGES.md: rebuild on cpp @@ -969,7 +970,7 @@ func main() { MaxConcurrentEvals: cfg.maxConcurrentEvals, ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, DefaultRuleQueryOffset: func() time.Duration { - return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) + return ruleQueryOffset }, }) } diff --git a/pp-pkg/storage/appender.go b/pp-pkg/storage/appender.go index 2db4f3b73..d46c5bc7b 100644 --- a/pp-pkg/storage/appender.go +++ b/pp-pkg/storage/appender.go @@ -34,6 +34,7 @@ type TimeSeriesAppender struct { adapter *Adapter state *cppbridge.StateV2 batch *timeSeriesBatch + lsb *model.LabelSetBuilder } func newTimeSeriesAppender( @@ -45,7 +46,8 @@ func newTimeSeriesAppender( ctx: ctx, adapter: adapter, state: state, - batch: &timeSeriesBatch{}, + batch: &timeSeriesBatch{timeSeries: make([]model.TimeSeries, 0, 10)}, + lsb: model.NewLabelSetBuilderSize(10), } } @@ -56,13 +58,13 @@ func (a *TimeSeriesAppender) Append( t int64, v float64, ) (storage.SeriesRef, error) { - lsb := model.NewLabelSetBuilder() + a.lsb.Reset() l.Range(func(label labels.Label) { - lsb.Add(label.Name, label.Value) + a.lsb.Add(label.Name, label.Value) }) a.batch.timeSeries = append(a.batch.timeSeries, model.TimeSeries{ - LabelSet: lsb.Build(), + LabelSet: a.lsb.Build(), Timestamp: uint64(t), // #nosec G115 // no overflow Value: v, }) diff --git a/pp/go/cppbridge/metrics.go b/pp/go/cppbridge/metrics.go index d4c42479b..2bec33ea6 100644 --- a/pp/go/cppbridge/metrics.go +++ b/pp/go/cppbridge/metrics.go @@ -1,8 +1,8 @@ package cppbridge import ( - "github.com/golang/protobuf/proto" dto "github.com/prometheus/client_model/go" + "google.golang.org/protobuf/proto" ) func CppMetrics(f func(metric *dto.Metric) bool) { diff --git a/pp/go/model/labelset.go b/pp/go/model/labelset.go index 54b73a0db..1f498f817 100644 --- a/pp/go/model/labelset.go +++ b/pp/go/model/labelset.go @@ -454,6 +454,13 @@ func NewLabelSetBuilder() *LabelSetBuilder { } } +// NewLabelSetBuilderSize is a constructor with container size. +func NewLabelSetBuilderSize(size int) *LabelSetBuilder { + return &LabelSetBuilder{ + pairs: make(map[string]string, size), + } +} + // Build label set func (builder *LabelSetBuilder) Build() LabelSet { return LabelSetFromMap(builder.pairs) diff --git a/promql/engine.go b/promql/engine.go index 6cc4c69cf..400fd739f 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -722,7 +722,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval setOffsetForAtModifier(timeMilliseconds(s.Start), s.Expr) evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) // Instant evaluation. This is executed as a range evaluation with one step. - if s.Start == s.End && s.Interval == 0 { + if s.Start.Equal(s.End) && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ startTimestamp: start, diff --git a/rules/group.go b/rules/group.go index 201d3a67d..7057f89db 100644 --- a/rules/group.go +++ b/rules/group.go @@ -476,166 +476,21 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var ( - samplesTotal atomic.Float64 - wg sync.WaitGroup - ) - - ruleQueryOffset := g.QueryOffset() - - for i, rule := range g.rules { - select { - case <-g.done: - return - default: - } - - eval := func(i int, rule Rule, cleanup func()) { - if cleanup != nil { - defer cleanup() - } + var samplesTotal float64 - logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) - ctx, sp := otel.Tracer("").Start(ctx, "rule") - sp.SetAttributes(attribute.String("name", rule.Name())) - defer func(t time.Time) { - sp.End() - - since := time.Since(t) - g.metrics.EvalDuration.Observe(since.Seconds()) - rule.SetEvaluationDuration(since) - rule.SetEvaluationTimestamp(t) - }(time.Now()) - - if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { - logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) - } - - g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) - if err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - // Canceled queries are intentional termination of queries. This normally - // happens on shutdown and thus we skip logging of any errors here. - var eqc promql.ErrQueryCanceled - if !errors.As(err, &eqc) { - level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) - } - return - } - rule.SetHealth(HealthGood) - rule.SetLastError(nil) - samplesTotal.Add(float64(len(vector))) - - if ar, ok := rule.(*AlertingRule); ok { - ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) - } - var ( - numOutOfOrder = 0 - numTooOld = 0 - numDuplicates = 0 - ) - - app := g.opts.Appendable.Appender(ctx) - seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) - defer func() { - if err := app.Commit(); err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - level.Warn(logger).Log("msg", "Rule sample appending failed", "err", err) - return - } - g.seriesInPreviousEval[i] = seriesReturned - }() - - for _, s := range vector { - if s.H != nil { - _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) - } else { - _, err = app.Append(0, s.Metric, s.T, s.F) - } - - if err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): - numOutOfOrder++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrTooOldSample): - numTooOld++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - numDuplicates++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - default: - level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - } - } else { - buf := [1024]byte{} - seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric - } - } - if numOutOfOrder > 0 { - level.Warn(logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) - } - if numTooOld > 0 { - level.Warn(logger).Log("msg", "Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) - } - if numDuplicates > 0 { - level.Warn(logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) - } - - for metric, lset := range g.seriesInPreviousEval[i] { - if _, ok := seriesReturned[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case unwrappedErr == nil: - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), - errors.Is(unwrappedErr, storage.ErrTooOldSample), - errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - // Do not count these in logging, as this is expected if series - // is exposed from a different rule. - default: - level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) - } - } - } - } - - if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { - wg.Add(1) - - go eval(i, rule, func() { - wg.Done() - ctrl.Done(ctx) - }) - } else { - eval(i, rule, nil) - } + if g.concurrencyController.IsConcurrent() { + samplesTotal = g.concurrencyEval(ctx, ts) + } else { + samplesTotal = g.sequentiallyEval(ctx, ts, g.rules) } - wg.Wait() + select { + case <-g.done: + return + default: + } - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) g.cleanupStaleSeries(ctx, ts) } @@ -1092,3 +947,380 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +// concurrencyEval evaluates the rules concurrently. +func (g *Group) concurrencyEval(ctx context.Context, ts time.Time) float64 { + var ( + samplesTotal atomic.Float64 + wg sync.WaitGroup + mtx sync.Mutex + concurrencyApp = g.opts.Appendable.Appender(ctx) + ) + + ruleQueryOffset := g.QueryOffset() + seriesInPreviousEval := make([]map[string]labels.Labels, len(g.rules)) + concurrencyEval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) + ctx, sp := otel.Tracer("").Start(ctx, "rule") + sp.SetAttributes(attribute.String("name", rule.Name())) + defer func(t time.Time) { + sp.End() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { + logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) + } + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + var eqc promql.ErrQueryCanceled + if !errors.As(err, &eqc) { + level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) + } + return + } + rule.SetHealth(HealthGood) + rule.SetLastError(nil) + samplesTotal.Add(float64(len(vector))) + + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numTooOld = 0 + numDuplicates = 0 + ) + + seriesInPreviousEval[i] = make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + buf := [1024]byte{} + mtx.Lock() + defer mtx.Unlock() + for _, s := range vector { + if s.H != nil { + _, err = concurrencyApp.AppendHistogram(0, s.Metric, s.T, nil, s.H) + } else { + _, err = concurrencyApp.Append(0, s.Metric, s.T, s.F) + } + + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): + numOutOfOrder++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrTooOldSample): + numTooOld++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + numDuplicates++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + default: + level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + } + + continue + } + + seriesInPreviousEval[i][string(s.Metric.Bytes(buf[:]))] = s.Metric + } + if numOutOfOrder > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting out-of-order result from rule evaluation", + "num_dropped", numOutOfOrder, + ) + } + if numTooOld > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting too old result from rule evaluation", + "num_dropped", numTooOld, + ) + } + if numDuplicates > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", + "num_dropped", numDuplicates, + ) + } + + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesInPreviousEval[i][metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = concurrencyApp.Append( + 0, + lset, + timestamp.FromTime(ts.Add(-ruleQueryOffset)), + math.Float64frombits(value.StaleNaN), + ) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case unwrappedErr == nil: + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), + errors.Is(unwrappedErr, storage.ErrTooOldSample), + errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) + } + } + } + } + + sequentiallyRules := make([]Rule, len(g.rules)) + for i, rule := range g.rules { + select { + case <-g.done: + return samplesTotal.Load() + default: + } + + if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { + wg.Add(1) + + go concurrencyEval(i, rule, func() { + wg.Done() + ctrl.Done(ctx) + }) + sequentiallyRules[i] = nil // placeholder for the series + } else { + sequentiallyRules[i] = rule + } + } + + wg.Wait() + + if err := concurrencyApp.Commit(); err != nil { + groupKey := GroupKey(g.File(), g.Name()) + for i := range g.rules { + if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, g.rules[i]) { + g.rules[i].SetHealth(HealthBad) + g.rules[i].SetLastError(err) + g.metrics.EvalFailures.WithLabelValues(groupKey).Inc() + ctrl.Done(ctx) + } + } + + level.Warn(g.logger).Log( + "msg", "Rule sample committing failed", + "group_key", groupKey, + "err", err, + ) + + return samplesTotal.Add(g.sequentiallyEval(ctx, ts, sequentiallyRules)) + } + + for i, series := range seriesInPreviousEval { + if series == nil { + continue + } + + g.seriesInPreviousEval[i] = series + } + + return samplesTotal.Add(g.sequentiallyEval(ctx, ts, sequentiallyRules)) +} + +// sequentiallyEval evaluates the rules sequentially. +func (g *Group) sequentiallyEval( + ctx context.Context, + ts time.Time, + sequentiallyRules []Rule, +) float64 { + if len(sequentiallyRules) == 0 { + return 0 + } + + var samplesTotal float64 + + ruleQueryOffset := g.QueryOffset() + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) + ctx, sp := otel.Tracer("").Start(ctx, "rule") + sp.SetAttributes(attribute.String("name", rule.Name())) + defer func(t time.Time) { + sp.End() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { + logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) + } + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + var eqc promql.ErrQueryCanceled + if !errors.As(err, &eqc) { + level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) + } + return + } + rule.SetHealth(HealthGood) + rule.SetLastError(nil) + samplesTotal += float64(len(vector)) + + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numTooOld = 0 + numDuplicates = 0 + ) + + app := g.opts.Appendable.Appender(ctx) + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + level.Warn(logger).Log("msg", "Rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() + + buf := [1024]byte{} + for _, s := range vector { + if s.H != nil { + _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) + } else { + _, err = app.Append(0, s.Metric, s.T, s.F) + } + + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): + numOutOfOrder++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrTooOldSample): + numTooOld++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + numDuplicates++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + default: + level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + } + + continue + } + + seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + } + if numOutOfOrder > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting out-of-order result from rule evaluation", + "num_dropped", numOutOfOrder, + ) + } + if numTooOld > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting too old result from rule evaluation", + "num_dropped", numTooOld, + ) + } + if numDuplicates > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", + "num_dropped", numDuplicates, + ) + } + + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Append( + 0, + lset, + timestamp.FromTime(ts.Add(-ruleQueryOffset)), + math.Float64frombits(value.StaleNaN), + ) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case unwrappedErr == nil: + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), + errors.Is(unwrappedErr, storage.ErrTooOldSample), + errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) + } + } + } + } + + for i, rule := range sequentiallyRules { + select { + case <-g.done: + return samplesTotal + default: + } + + if rule == nil { + continue + } + + eval(i, rule, nil) + } + + return samplesTotal +} diff --git a/rules/manager.go b/rules/manager.go index efb017854..161c6ef7e 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -465,6 +465,9 @@ type RuleConcurrencyController interface { // Done releases a concurrent evaluation slot. Done(ctx context.Context) + + // IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. + IsConcurrent() bool } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. @@ -478,13 +481,13 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle } } -func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { +func (c *concurrentRuleEvalController) Allow(ctx context.Context, _ *Group, rule Rule) bool { // To allow a rule to be executed concurrently, we need 3 conditions: // 1. The rule must not have any rules that depend on it. // 2. The rule itself must not depend on any other rules. // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. - if rule.NoDependentRules() && rule.NoDependencyRules() { - return c.sema.TryAcquire(1) + if rule.NoDependencyRules() { + return c.sema.Acquire(ctx, 1) == nil } return false @@ -494,6 +497,11 @@ func (c *concurrentRuleEvalController) Done(_ context.Context) { c.sema.Release(1) } +// IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. +func (*concurrentRuleEvalController) IsConcurrent() bool { + return true +} + // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. type sequentialRuleEvalController struct{} @@ -502,3 +510,8 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) } func (c sequentialRuleEvalController) Done(_ context.Context) {} + +// IsConcurrent returns false if the controller is a sequential controller, true if it is a concurrent controller. +func (c sequentialRuleEvalController) IsConcurrent() bool { + return false +}