Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support per-tenant limits in metrics generator local blocks processor #2442

Merged
merged 3 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,29 @@ func (cfg *ProcessorConfig) copyWithOverrides(o metricsGeneratorOverrides, userI
copyCfg.SpanMetrics.FilterPolicies = filterPolicies
}

if max := o.MetricsGeneratorProcessorLocalBlocksMaxLiveTraces(userID); max > 0 {
copyCfg.LocalBlocks.MaxLiveTraces = max
}

if max := o.MetricsGeneratorProcessorLocalBlocksMaxBlockDuration(userID); max > 0 {
copyCfg.LocalBlocks.MaxBlockDuration = max
}

if max := o.MetricsGeneratorProcessorLocalBlocksMaxBlockBytes(userID); max > 0 {
copyCfg.LocalBlocks.MaxBlockBytes = max
}

if period := o.MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod(userID); period > 0 {
copyCfg.LocalBlocks.FlushCheckPeriod = period
}

if period := o.MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod(userID); period > 0 {
copyCfg.LocalBlocks.TraceIdlePeriod = period
}

if timeout := o.MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout(userID); timeout > 0 {
copyCfg.LocalBlocks.CompleteBlockTimeout = timeout
}

return copyCfg, nil
}
8 changes: 8 additions & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package generator

import (
"time"

"github.com/grafana/tempo/modules/generator/registry"
"github.com/grafana/tempo/modules/overrides"
filterconfig "github.com/grafana/tempo/pkg/spanfilter/config"
Expand All @@ -16,6 +18,12 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorSpanMetricsDimensions(userID string) []string
MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions(userID string) map[string]bool
MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID string) []filterconfig.FilterPolicy
MetricsGeneratorProcessorLocalBlocksMaxLiveTraces(userID string) uint64
MetricsGeneratorProcessorLocalBlocksMaxBlockDuration(userID string) time.Duration
MetricsGeneratorProcessorLocalBlocksMaxBlockBytes(userID string) uint64
MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod(userID string) time.Duration
MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod(userID string) time.Duration
MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout(userID string) time.Duration
}

var _ metricsGeneratorOverrides = (*overrides.Overrides)(nil)
44 changes: 37 additions & 7 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ import (
)

type mockOverrides struct {
processors map[string]struct{}
serviceGraphsHistogramBuckets []float64
serviceGraphsDimensions []string
spanMetricsHistogramBuckets []float64
spanMetricsDimensions []string
spanMetricsIntrinsicDimensions map[string]bool
spanMetricsFilterPolicies []filterconfig.FilterPolicy
processors map[string]struct{}
serviceGraphsHistogramBuckets []float64
serviceGraphsDimensions []string
spanMetricsHistogramBuckets []float64
spanMetricsDimensions []string
spanMetricsIntrinsicDimensions map[string]bool
spanMetricsFilterPolicies []filterconfig.FilterPolicy
localBlocksMaxLiveTraces uint64
localBlocksMaxBlockDuration time.Duration
localBlocksMaxBlockBytes uint64
localBlocksFlushCheckPeriod time.Duration
localBlocksTraceIdlePeriod time.Duration
localBlocksCompleteBlockTimeout time.Duration
}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
Expand Down Expand Up @@ -57,3 +63,27 @@ func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions(
func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID string) []filterconfig.FilterPolicy {
return m.spanMetricsFilterPolicies
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksMaxLiveTraces(userID string) uint64 {
return m.localBlocksMaxLiveTraces
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksMaxBlockDuration(userID string) time.Duration {
return m.localBlocksMaxBlockDuration
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksMaxBlockBytes(userID string) uint64 {
return m.localBlocksMaxBlockBytes
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod(userID string) time.Duration {
return m.localBlocksTraceIdlePeriod
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod(userID string) time.Duration {
return m.localBlocksFlushCheckPeriod
}

func (m *mockOverrides) MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout(userID string) time.Duration {
return m.localBlocksCompleteBlockTimeout
}
1 change: 1 addition & 0 deletions modules/generator/processor/localblocks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Config struct {
MaxBlockDuration time.Duration `yaml:"max_block_duration"`
MaxBlockBytes uint64 `yaml:"max_block_bytes"`
CompleteBlockTimeout time.Duration `yaml:"complete_block_timeout"`
MaxLiveTraces uint64 `yaml:"max_live_traces"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand Down
19 changes: 17 additions & 2 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/pkg/errors"
)

var errMaxExceeded = errors.New("asdf")

type liveTrace struct {
id []byte
timestamp time.Time
Expand All @@ -32,16 +35,27 @@ func (l *liveTraces) token(traceID []byte) uint64 {
return l.hash.Sum64()
}

func (l *liveTraces) Push(batch *v1.ResourceSpans) {
func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return
return nil
}

traceID := batch.ScopeSpans[0].Spans[0].TraceId
token := l.token(traceID)

tr := l.traces[token]
if tr == nil {

// Before adding this check against max
// Zero means no limit
if max > 0 && uint64(len(l.traces)) >= max {
return errMaxExceeded
}

tr = &liveTrace{
id: traceID,
}
Expand All @@ -50,6 +64,7 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans) {

tr.Batches = append(tr.Batches, batch)
tr.timestamp = time.Now()
return nil
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
Expand Down
40 changes: 40 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package localblocks

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
namespace = "tempo"
subsystem = "metrics_generator_processor_local_blocks"

reasonLiveTracesExceeded = "live_traces_exceeded"
)

var (
metricTotalTraces = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "traces_total",
Help: "Total number of traces created",
}, []string{"tenant"})
metricLiveTraces = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "live_traces",
Help: "Number of live traces",
}, []string{"tenant"})
metricDroppedTraces = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "traces_dropped_total",
Help: "Number of traces dropped",
}, []string{"tenant", "reason"})
metricBlockSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bytes",
Help: "Total size of local blocks",
}, []string{"tenant"})
)
57 changes: 54 additions & 3 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ func New(cfg Config, tenant string, wal *wal.WAL) (*Processor, error) {
return nil, errors.Wrap(err, "replaying blocks")
}

p.wg.Add(3)
p.wg.Add(4)
go p.flushLoop()
go p.deleteLoop()
go p.completeLoop()
go p.metricLoop()

return p, nil
}
Expand All @@ -80,11 +81,21 @@ func (p *Processor) PushSpans(ctx context.Context, req *tempopb.PushSpansRequest
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

before := p.liveTraces.Len()

for _, batch := range req.Batches {
if batch = filterBatch(batch); batch != nil {
p.liveTraces.Push(batch)
switch err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces); err {
case errMaxExceeded:
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
}
}

after := p.liveTraces.Len()

// Number of new traces is the delta
metricTotalTraces.WithLabelValues(p.tenant).Add(float64(after - before))
}

func (p *Processor) Shutdown(ctx context.Context) {
Expand Down Expand Up @@ -168,6 +179,24 @@ func (p *Processor) completeLoop() {
}
}

func (p *Processor) metricLoop() {
defer p.wg.Done()

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Instead of reacting to every block flush/update, just run on a timer.
p.recordBlockBytes()

case <-p.closeCh:
return
}
}
}

func (p *Processor) completeBlock() error {
p.blocksMtx.Lock()
defer p.blocksMtx.Unlock()
Expand Down Expand Up @@ -235,7 +264,7 @@ func (p *Processor) deleteOldBlocks() (err error) {
if err != nil {
return err
}
delete(p.walBlocks, id)
delete(p.completeBlocks, id)
}
}

Expand All @@ -246,6 +275,9 @@ func (p *Processor) cutIdleTraces(immediate bool) error {
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

// Record live traces before flushing so we know the high water mark
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))

since := time.Now().Add(-p.Cfg.TraceIdlePeriod)
if immediate {
since = time.Time{}
Expand Down Expand Up @@ -420,6 +452,25 @@ func (p *Processor) reloadBlocks() error {
return nil
}

func (p *Processor) recordBlockBytes() {
p.blocksMtx.Lock()
defer p.blocksMtx.Unlock()

sum := uint64(0)

if p.headBlock != nil {
sum += p.headBlock.DataLength()
}
for _, b := range p.walBlocks {
sum += b.DataLength()
}
for _, b := range p.completeBlocks {
sum += b.BlockMeta().Size
}

metricBlockSize.WithLabelValues(p.tenant).Set(float64(sum))
}

// filterBatch to only spans with kind==server. Does not modify the input
// but returns a new struct referencing the same input pointers. Returns nil
// if there were no matching spans.
Expand Down
32 changes: 19 additions & 13 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,25 @@ type Limits struct {
Forwarders []string `yaml:"forwarders" json:"forwarders"`

// Metrics-generator config
MetricsGeneratorRingSize int `yaml:"metrics_generator_ring_size" json:"metrics_generator_ring_size"`
MetricsGeneratorProcessors ListToMap `yaml:"metrics_generator_processors" json:"metrics_generator_processors"`
MetricsGeneratorMaxActiveSeries uint32 `yaml:"metrics_generator_max_active_series" json:"metrics_generator_max_active_series"`
MetricsGeneratorCollectionInterval time.Duration `yaml:"metrics_generator_collection_interval" json:"metrics_generator_collection_interval"`
MetricsGeneratorDisableCollection bool `yaml:"metrics_generator_disable_collection" json:"metrics_generator_disable_collection"`
MetricsGeneratorForwarderQueueSize int `yaml:"metrics_generator_forwarder_queue_size" json:"metrics_generator_forwarder_queue_size"`
MetricsGeneratorForwarderWorkers int `yaml:"metrics_generator_forwarder_workers" json:"metrics_generator_forwarder_workers"`
MetricsGeneratorProcessorServiceGraphsHistogramBuckets []float64 `yaml:"metrics_generator_processor_service_graphs_histogram_buckets" json:"metrics_generator_processor_service_graphs_histogram_buckets"`
MetricsGeneratorProcessorServiceGraphsDimensions []string `yaml:"metrics_generator_processor_service_graphs_dimensions" json:"metrics_generator_processor_service_graphs_dimensions"`
MetricsGeneratorProcessorSpanMetricsHistogramBuckets []float64 `yaml:"metrics_generator_processor_span_metrics_histogram_buckets" json:"metrics_generator_processor_span_metrics_histogram_buckets"`
MetricsGeneratorProcessorSpanMetricsDimensions []string `yaml:"metrics_generator_processor_span_metrics_dimensions" json:"metrics_generator_processor_span_metrics_dimensions"`
MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions map[string]bool `yaml:"metrics_generator_processor_span_metrics_intrinsic_dimensions" json:"metrics_generator_processor_span_metrics_intrinsic_dimensions"`
MetricsGeneratorProcessorSpanMetricsFilterPolicies []filterconfig.FilterPolicy `yaml:"metrics_generator_processor_span_metrics_filter_policies" json:"metrics_generator_processor_span_metrics_filter_policies"`
MetricsGeneratorRingSize int `yaml:"metrics_generator_ring_size" json:"metrics_generator_ring_size"`
MetricsGeneratorProcessors ListToMap `yaml:"metrics_generator_processors" json:"metrics_generator_processors"`
MetricsGeneratorMaxActiveSeries uint32 `yaml:"metrics_generator_max_active_series" json:"metrics_generator_max_active_series"`
MetricsGeneratorCollectionInterval time.Duration `yaml:"metrics_generator_collection_interval" json:"metrics_generator_collection_interval"`
MetricsGeneratorDisableCollection bool `yaml:"metrics_generator_disable_collection" json:"metrics_generator_disable_collection"`
MetricsGeneratorForwarderQueueSize int `yaml:"metrics_generator_forwarder_queue_size" json:"metrics_generator_forwarder_queue_size"`
MetricsGeneratorForwarderWorkers int `yaml:"metrics_generator_forwarder_workers" json:"metrics_generator_forwarder_workers"`
MetricsGeneratorProcessorServiceGraphsHistogramBuckets []float64 `yaml:"metrics_generator_processor_service_graphs_histogram_buckets" json:"metrics_generator_processor_service_graphs_histogram_buckets"`
MetricsGeneratorProcessorServiceGraphsDimensions []string `yaml:"metrics_generator_processor_service_graphs_dimensions" json:"metrics_generator_processor_service_graphs_dimensions"`
MetricsGeneratorProcessorSpanMetricsHistogramBuckets []float64 `yaml:"metrics_generator_processor_span_metrics_histogram_buckets" json:"metrics_generator_processor_span_metrics_histogram_buckets"`
MetricsGeneratorProcessorSpanMetricsDimensions []string `yaml:"metrics_generator_processor_span_metrics_dimensions" json:"metrics_generator_processor_span_metrics_dimensions"`
MetricsGeneratorProcessorSpanMetricsIntrinsicDimensions map[string]bool `yaml:"metrics_generator_processor_span_metrics_intrinsic_dimensions" json:"metrics_generator_processor_span_metrics_intrinsic_dimensions"`
MetricsGeneratorProcessorSpanMetricsFilterPolicies []filterconfig.FilterPolicy `yaml:"metrics_generator_processor_span_metrics_filter_policies" json:"metrics_generator_processor_span_metrics_filter_policies"`
MetricsGeneratorProcessorLocalBlocksMaxLiveTraces uint64 `yaml:"metrics_generator_processor_local_blocks_max_live_traces" json:"metrics_generator_processor_local_blocks_max_live_traces"`
MetricsGeneratorProcessorLocalBlocksMaxBlockDuration time.Duration `yaml:"metrics_generator_processor_local_blocks_max_block_duration" json:"metrics_generator_processor_local_blocks_max_block_duration"`
MetricsGeneratorProcessorLocalBlocksMaxBlockBytes uint64 `yaml:"metrics_generator_processor_local_blocks_max_block_bytes" json:"metrics_generator_processor_local_blocks_max_block_bytes"`
MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod time.Duration `yaml:"metrics_generator_processor_local_blocks_flush_check_period" json:"metrics_generator_processor_local_blocks_flush_check_period"`
MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod time.Duration `yaml:"metrics_generator_processor_local_blocks_trace_idle_period" json:"metrics_generator_processor_local_blocks_trace_idle_period"`
MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout time.Duration `yaml:"metrics_generator_processor_local_blocks_complete_block_timeout" json:"metrics_generator_processor_local_blocks_complete_block_timeout"`

// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
Expand Down
24 changes: 24 additions & 0 deletions modules/overrides/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,30 @@ func (o *Overrides) MetricsGeneratorProcessorSpanMetricsFilterPolicies(userID st
return o.getOverridesForUser(userID).MetricsGeneratorProcessorSpanMetricsFilterPolicies
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksMaxLiveTraces(userID string) uint64 {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksMaxLiveTraces
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksMaxBlockDuration(userID string) time.Duration {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksMaxBlockDuration
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksMaxBlockBytes(userID string) uint64 {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksMaxBlockBytes
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod(userID string) time.Duration {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksTraceIdlePeriod
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod(userID string) time.Duration {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksFlushCheckPeriod
}

func (o *Overrides) MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout(userID string) time.Duration {
return o.getOverridesForUser(userID).MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout
}

// BlockRetention is the duration of the block retention for this tenant.
func (o *Overrides) BlockRetention(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).BlockRetention)
Expand Down